#include "dlm_internal.h"
#include "lockspace.h"
#include "member.h"
#include "lowcomms.h"
#include "rcom.h"
#include "config.h"
#include "memory.h"
#include "recover.h"
#include "util.h"
#include "lock.h"
#include "dir.h"
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
{
uint32_t node;
if (ls->ls_num_nodes == 1)
return dlm_our_nodeid();
else {
node = (hash >> 16) % ls->ls_total_weight;
return ls->ls_node_array[node];
}
}
int dlm_dir_nodeid(struct dlm_rsb *r)
{
return r->res_dir_nodeid;
}
void dlm_recover_dir_nodeid(struct dlm_ls *ls)
{
struct dlm_rsb *r;
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
}
up_read(&ls->ls_root_sem);
}
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_member *memb;
char *b, *last_name = NULL;
int error = -ENOMEM, last_len, nodeid, result;
uint16_t namelen;
unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
log_rinfo(ls, "dlm_recover_directory");
if (dlm_no_directory(ls))
goto out_status;
last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
if (!last_name)
goto out;
list_for_each_entry(memb, &ls->ls_nodes, list) {
if (memb->nodeid == dlm_our_nodeid())
continue;
memset(last_name, 0, DLM_RESNAME_MAXLEN);
last_len = 0;
for (;;) {
int left;
if (dlm_recovery_stopped(ls)) {
error = -EINTR;
goto out_free;
}
error = dlm_rcom_names(ls, memb->nodeid,
last_name, last_len, seq);
if (error)
goto out_free;
cond_resched();
b = ls->ls_recover_buf->rc_buf;
left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
left -= sizeof(struct dlm_rcom);
for (;;) {
__be16 v;
error = -EINVAL;
if (left < sizeof(__be16))
goto out_free;
memcpy(&v, b, sizeof(__be16));
namelen = be16_to_cpu(v);
b += sizeof(__be16);
left -= sizeof(__be16);
if (namelen == 0xFFFF)
goto done;
if (!namelen)
break;
if (namelen > left)
goto out_free;
if (namelen > DLM_RESNAME_MAXLEN)
goto out_free;
error = dlm_master_lookup(ls, memb->nodeid,
b, namelen,
DLM_LU_RECOVER_DIR,
&nodeid, &result);
if (error) {
log_error(ls, "recover_dir lookup %d",
error);
goto out_free;
}
if (result == DLM_LU_MATCH &&
nodeid != memb->nodeid) {
count_bad++;
log_error(ls, "recover_dir lookup %d "
"nodeid %d memb %d bad %u",
result, nodeid, memb->nodeid,
count_bad);
print_hex_dump_bytes("dlm_recover_dir ",
DUMP_PREFIX_NONE,
b, namelen);
}
if (result == DLM_LU_MATCH &&
nodeid == memb->nodeid) {
count_match++;
}
if (result == DLM_LU_ADD) {
count_add++;
}
last_len = namelen;
memcpy(last_name, b, namelen);
b += namelen;
left -= namelen;
count++;
}
}
done:
;
}
out_status:
error = 0;
dlm_set_recover_status(ls, DLM_RS_DIR);
log_rinfo(ls, "dlm_recover_directory %u in %u new",
count, count_add);
out_free:
kfree(last_name);
out:
return error;
}
static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
int len)
{
struct dlm_rsb *r;
uint32_t hash, bucket;
int rv;
hash = jhash(name, len, 0);
bucket = hash & (ls->ls_rsbtbl_size - 1);
spin_lock(&ls->ls_rsbtbl[bucket].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
if (rv)
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
name, len, &r);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
if (!rv)
return r;
down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (len == r->res_length && !memcmp(name, r->res_name, len)) {
up_read(&ls->ls_root_sem);
log_debug(ls, "find_rsb_root revert to root_list %s",
r->res_name);
return r;
}
}
up_read(&ls->ls_root_sem);
return NULL;
}
void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid)
{
struct list_head *list;
struct dlm_rsb *r;
int offset = 0, dir_nodeid;
__be16 be_namelen;
down_read(&ls->ls_root_sem);
if (inlen > 1) {
r = find_rsb_root(ls, inbuf, inlen);
if (!r) {
log_error(ls, "copy_master_names from %d start %d %.*s",
nodeid, inlen, inlen, inbuf);
goto out;
}
list = r->res_root_list.next;
} else {
list = ls->ls_root_list.next;
}
for (offset = 0; list != &ls->ls_root_list; list = list->next) {
r = list_entry(list, struct dlm_rsb, res_root_list);
if (r->res_nodeid)
continue;
dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid != nodeid)
continue;
if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
be_namelen = cpu_to_be16(0);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
ls->ls_recover_dir_sent_msg++;
goto out;
}
be_namelen = cpu_to_be16(r->res_length);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
memcpy(outbuf + offset, r->res_name, r->res_length);
offset += r->res_length;
ls->ls_recover_dir_sent_res++;
}
if ((list == &ls->ls_root_list) &&
(offset + sizeof(uint16_t) <= outlen)) {
be_namelen = cpu_to_be16(0xFFFF);
memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
offset += sizeof(__be16);
ls->ls_recover_dir_sent_msg++;
}
out:
up_read(&ls->ls_root_sem);
}