Loading...
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "lowcomms.h"
18#include "rcom.h"
19#include "config.h"
20#include "memory.h"
21#include "recover.h"
22#include "util.h"
23#include "lock.h"
24#include "dir.h"
25
26
27static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28{
29 spin_lock(&ls->ls_recover_list_lock);
30 list_add(&de->list, &ls->ls_recover_list);
31 spin_unlock(&ls->ls_recover_list_lock);
32}
33
34static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35{
36 int found = 0;
37 struct dlm_direntry *de;
38
39 spin_lock(&ls->ls_recover_list_lock);
40 list_for_each_entry(de, &ls->ls_recover_list, list) {
41 if (de->length == len) {
42 list_del(&de->list);
43 de->master_nodeid = 0;
44 memset(de->name, 0, len);
45 found = 1;
46 break;
47 }
48 }
49 spin_unlock(&ls->ls_recover_list_lock);
50
51 if (!found)
52 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53 return de;
54}
55
56void dlm_clear_free_entries(struct dlm_ls *ls)
57{
58 struct dlm_direntry *de;
59
60 spin_lock(&ls->ls_recover_list_lock);
61 while (!list_empty(&ls->ls_recover_list)) {
62 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 list);
64 list_del(&de->list);
65 kfree(de);
66 }
67 spin_unlock(&ls->ls_recover_list_lock);
68}
69
70/*
71 * We use the upper 16 bits of the hash value to select the directory node.
72 * Low bits are used for distribution of rsb's among hash buckets on each node.
73 *
74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75 * num_nodes to the hash value. This value in the desired range is used as an
76 * offset into the sorted list of nodeid's to give the particular nodeid.
77 */
78
79int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80{
81 struct list_head *tmp;
82 struct dlm_member *memb = NULL;
83 uint32_t node, n = 0;
84 int nodeid;
85
86 if (ls->ls_num_nodes == 1) {
87 nodeid = dlm_our_nodeid();
88 goto out;
89 }
90
91 if (ls->ls_node_array) {
92 node = (hash >> 16) % ls->ls_total_weight;
93 nodeid = ls->ls_node_array[node];
94 goto out;
95 }
96
97 /* make_member_array() failed to kmalloc ls_node_array... */
98
99 node = (hash >> 16) % ls->ls_num_nodes;
100
101 list_for_each(tmp, &ls->ls_nodes) {
102 if (n++ != node)
103 continue;
104 memb = list_entry(tmp, struct dlm_member, list);
105 break;
106 }
107
108 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 ls->ls_num_nodes, n, node););
110 nodeid = memb->nodeid;
111 out:
112 return nodeid;
113}
114
115int dlm_dir_nodeid(struct dlm_rsb *r)
116{
117 return dlm_hash2nodeid(r->res_ls, r->res_hash);
118}
119
120static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121{
122 uint32_t val;
123
124 val = jhash(name, len, 0);
125 val &= (ls->ls_dirtbl_size - 1);
126
127 return val;
128}
129
130static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131{
132 uint32_t bucket;
133
134 bucket = dir_hash(ls, de->name, de->length);
135 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136}
137
138static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139 int namelen, uint32_t bucket)
140{
141 struct dlm_direntry *de;
142
143 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 if (de->length == namelen && !memcmp(name, de->name, namelen))
145 goto out;
146 }
147 de = NULL;
148 out:
149 return de;
150}
151
152void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153{
154 struct dlm_direntry *de;
155 uint32_t bucket;
156
157 bucket = dir_hash(ls, name, namelen);
158
159 spin_lock(&ls->ls_dirtbl[bucket].lock);
160
161 de = search_bucket(ls, name, namelen, bucket);
162
163 if (!de) {
164 log_error(ls, "remove fr %u none", nodeid);
165 goto out;
166 }
167
168 if (de->master_nodeid != nodeid) {
169 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 goto out;
171 }
172
173 list_del(&de->list);
174 kfree(de);
175 out:
176 spin_unlock(&ls->ls_dirtbl[bucket].lock);
177}
178
179void dlm_dir_clear(struct dlm_ls *ls)
180{
181 struct list_head *head;
182 struct dlm_direntry *de;
183 int i;
184
185 DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187 for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 spin_lock(&ls->ls_dirtbl[i].lock);
189 head = &ls->ls_dirtbl[i].list;
190 while (!list_empty(head)) {
191 de = list_entry(head->next, struct dlm_direntry, list);
192 list_del(&de->list);
193 put_free_de(ls, de);
194 }
195 spin_unlock(&ls->ls_dirtbl[i].lock);
196 }
197}
198
199int dlm_recover_directory(struct dlm_ls *ls)
200{
201 struct dlm_member *memb;
202 struct dlm_direntry *de;
203 char *b, *last_name = NULL;
204 int error = -ENOMEM, last_len, count = 0;
205 uint16_t namelen;
206
207 log_debug(ls, "dlm_recover_directory");
208
209 if (dlm_no_directory(ls))
210 goto out_status;
211
212 dlm_dir_clear(ls);
213
214 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215 if (!last_name)
216 goto out;
217
218 list_for_each_entry(memb, &ls->ls_nodes, list) {
219 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 last_len = 0;
221
222 for (;;) {
223 int left;
224 error = dlm_recovery_stopped(ls);
225 if (error)
226 goto out_free;
227
228 error = dlm_rcom_names(ls, memb->nodeid,
229 last_name, last_len);
230 if (error)
231 goto out_free;
232
233 schedule();
234
235 /*
236 * pick namelen/name pairs out of received buffer
237 */
238
239 b = ls->ls_recover_buf->rc_buf;
240 left = ls->ls_recover_buf->rc_header.h_length;
241 left -= sizeof(struct dlm_rcom);
242
243 for (;;) {
244 __be16 v;
245
246 error = -EINVAL;
247 if (left < sizeof(__be16))
248 goto out_free;
249
250 memcpy(&v, b, sizeof(__be16));
251 namelen = be16_to_cpu(v);
252 b += sizeof(__be16);
253 left -= sizeof(__be16);
254
255 /* namelen of 0xFFFFF marks end of names for
256 this node; namelen of 0 marks end of the
257 buffer */
258
259 if (namelen == 0xFFFF)
260 goto done;
261 if (!namelen)
262 break;
263
264 if (namelen > left)
265 goto out_free;
266
267 if (namelen > DLM_RESNAME_MAXLEN)
268 goto out_free;
269
270 error = -ENOMEM;
271 de = get_free_de(ls, namelen);
272 if (!de)
273 goto out_free;
274
275 de->master_nodeid = memb->nodeid;
276 de->length = namelen;
277 last_len = namelen;
278 memcpy(de->name, b, namelen);
279 memcpy(last_name, b, namelen);
280 b += namelen;
281 left -= namelen;
282
283 add_entry_to_hash(ls, de);
284 count++;
285 }
286 }
287 done:
288 ;
289 }
290
291 out_status:
292 error = 0;
293 log_debug(ls, "dlm_recover_directory %d entries", count);
294 out_free:
295 kfree(last_name);
296 out:
297 dlm_clear_free_entries(ls);
298 return error;
299}
300
301static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302 int namelen, int *r_nodeid)
303{
304 struct dlm_direntry *de, *tmp;
305 uint32_t bucket;
306
307 bucket = dir_hash(ls, name, namelen);
308
309 spin_lock(&ls->ls_dirtbl[bucket].lock);
310 de = search_bucket(ls, name, namelen, bucket);
311 if (de) {
312 *r_nodeid = de->master_nodeid;
313 spin_unlock(&ls->ls_dirtbl[bucket].lock);
314 if (*r_nodeid == nodeid)
315 return -EEXIST;
316 return 0;
317 }
318
319 spin_unlock(&ls->ls_dirtbl[bucket].lock);
320
321 if (namelen > DLM_RESNAME_MAXLEN)
322 return -EINVAL;
323
324 de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325 if (!de)
326 return -ENOMEM;
327
328 de->master_nodeid = nodeid;
329 de->length = namelen;
330 memcpy(de->name, name, namelen);
331
332 spin_lock(&ls->ls_dirtbl[bucket].lock);
333 tmp = search_bucket(ls, name, namelen, bucket);
334 if (tmp) {
335 kfree(de);
336 de = tmp;
337 } else {
338 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339 }
340 *r_nodeid = de->master_nodeid;
341 spin_unlock(&ls->ls_dirtbl[bucket].lock);
342 return 0;
343}
344
345int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346 int *r_nodeid)
347{
348 return get_entry(ls, nodeid, name, namelen, r_nodeid);
349}
350
351static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352{
353 struct dlm_rsb *r;
354 uint32_t hash, bucket;
355 int rv;
356
357 hash = jhash(name, len, 0);
358 bucket = hash & (ls->ls_rsbtbl_size - 1);
359
360 spin_lock(&ls->ls_rsbtbl[bucket].lock);
361 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r);
362 if (rv)
363 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
364 name, len, 0, &r);
365 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
366
367 if (!rv)
368 return r;
369
370 down_read(&ls->ls_root_sem);
371 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
372 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
373 up_read(&ls->ls_root_sem);
374 log_error(ls, "find_rsb_root revert to root_list %s",
375 r->res_name);
376 return r;
377 }
378 }
379 up_read(&ls->ls_root_sem);
380 return NULL;
381}
382
383/* Find the rsb where we left off (or start again), then send rsb names
384 for rsb's we're master of and whose directory node matches the requesting
385 node. inbuf is the rsb name last sent, inlen is the name's length */
386
387void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
388 char *outbuf, int outlen, int nodeid)
389{
390 struct list_head *list;
391 struct dlm_rsb *r;
392 int offset = 0, dir_nodeid;
393 __be16 be_namelen;
394
395 down_read(&ls->ls_root_sem);
396
397 if (inlen > 1) {
398 r = find_rsb_root(ls, inbuf, inlen);
399 if (!r) {
400 inbuf[inlen - 1] = '\0';
401 log_error(ls, "copy_master_names from %d start %d %s",
402 nodeid, inlen, inbuf);
403 goto out;
404 }
405 list = r->res_root_list.next;
406 } else {
407 list = ls->ls_root_list.next;
408 }
409
410 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
411 r = list_entry(list, struct dlm_rsb, res_root_list);
412 if (r->res_nodeid)
413 continue;
414
415 dir_nodeid = dlm_dir_nodeid(r);
416 if (dir_nodeid != nodeid)
417 continue;
418
419 /*
420 * The block ends when we can't fit the following in the
421 * remaining buffer space:
422 * namelen (uint16_t) +
423 * name (r->res_length) +
424 * end-of-block record 0x0000 (uint16_t)
425 */
426
427 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
428 /* Write end-of-block record */
429 be_namelen = cpu_to_be16(0);
430 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
431 offset += sizeof(__be16);
432 goto out;
433 }
434
435 be_namelen = cpu_to_be16(r->res_length);
436 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
437 offset += sizeof(__be16);
438 memcpy(outbuf + offset, r->res_name, r->res_length);
439 offset += r->res_length;
440 }
441
442 /*
443 * If we've reached the end of the list (and there's room) write a
444 * terminating record.
445 */
446
447 if ((list == &ls->ls_root_list) &&
448 (offset + sizeof(uint16_t) <= outlen)) {
449 be_namelen = cpu_to_be16(0xFFFF);
450 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
451 offset += sizeof(__be16);
452 }
453
454 out:
455 up_read(&ls->ls_root_sem);
456}
457
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "lowcomms.h"
18#include "rcom.h"
19#include "config.h"
20#include "memory.h"
21#include "recover.h"
22#include "util.h"
23#include "lock.h"
24#include "dir.h"
25
26/*
27 * We use the upper 16 bits of the hash value to select the directory node.
28 * Low bits are used for distribution of rsb's among hash buckets on each node.
29 *
30 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
31 * num_nodes to the hash value. This value in the desired range is used as an
32 * offset into the sorted list of nodeid's to give the particular nodeid.
33 */
34
35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
36{
37 uint32_t node;
38
39 if (ls->ls_num_nodes == 1)
40 return dlm_our_nodeid();
41 else {
42 node = (hash >> 16) % ls->ls_total_weight;
43 return ls->ls_node_array[node];
44 }
45}
46
47int dlm_dir_nodeid(struct dlm_rsb *r)
48{
49 return r->res_dir_nodeid;
50}
51
52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
53{
54 struct dlm_rsb *r;
55
56 down_read(&ls->ls_root_sem);
57 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
58 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
59 }
60 up_read(&ls->ls_root_sem);
61}
62
63int dlm_recover_directory(struct dlm_ls *ls)
64{
65 struct dlm_member *memb;
66 char *b, *last_name = NULL;
67 int error = -ENOMEM, last_len, nodeid, result;
68 uint16_t namelen;
69 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
70
71 log_rinfo(ls, "dlm_recover_directory");
72
73 if (dlm_no_directory(ls))
74 goto out_status;
75
76 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
77 if (!last_name)
78 goto out;
79
80 list_for_each_entry(memb, &ls->ls_nodes, list) {
81 if (memb->nodeid == dlm_our_nodeid())
82 continue;
83
84 memset(last_name, 0, DLM_RESNAME_MAXLEN);
85 last_len = 0;
86
87 for (;;) {
88 int left;
89 error = dlm_recovery_stopped(ls);
90 if (error)
91 goto out_free;
92
93 error = dlm_rcom_names(ls, memb->nodeid,
94 last_name, last_len);
95 if (error)
96 goto out_free;
97
98 cond_resched();
99
100 /*
101 * pick namelen/name pairs out of received buffer
102 */
103
104 b = ls->ls_recover_buf->rc_buf;
105 left = ls->ls_recover_buf->rc_header.h_length;
106 left -= sizeof(struct dlm_rcom);
107
108 for (;;) {
109 __be16 v;
110
111 error = -EINVAL;
112 if (left < sizeof(__be16))
113 goto out_free;
114
115 memcpy(&v, b, sizeof(__be16));
116 namelen = be16_to_cpu(v);
117 b += sizeof(__be16);
118 left -= sizeof(__be16);
119
120 /* namelen of 0xFFFFF marks end of names for
121 this node; namelen of 0 marks end of the
122 buffer */
123
124 if (namelen == 0xFFFF)
125 goto done;
126 if (!namelen)
127 break;
128
129 if (namelen > left)
130 goto out_free;
131
132 if (namelen > DLM_RESNAME_MAXLEN)
133 goto out_free;
134
135 error = dlm_master_lookup(ls, memb->nodeid,
136 b, namelen,
137 DLM_LU_RECOVER_DIR,
138 &nodeid, &result);
139 if (error) {
140 log_error(ls, "recover_dir lookup %d",
141 error);
142 goto out_free;
143 }
144
145 /* The name was found in rsbtbl, but the
146 * master nodeid is different from
147 * memb->nodeid which says it is the master.
148 * This should not happen. */
149
150 if (result == DLM_LU_MATCH &&
151 nodeid != memb->nodeid) {
152 count_bad++;
153 log_error(ls, "recover_dir lookup %d "
154 "nodeid %d memb %d bad %u",
155 result, nodeid, memb->nodeid,
156 count_bad);
157 print_hex_dump_bytes("dlm_recover_dir ",
158 DUMP_PREFIX_NONE,
159 b, namelen);
160 }
161
162 /* The name was found in rsbtbl, and the
163 * master nodeid matches memb->nodeid. */
164
165 if (result == DLM_LU_MATCH &&
166 nodeid == memb->nodeid) {
167 count_match++;
168 }
169
170 /* The name was not found in rsbtbl and was
171 * added with memb->nodeid as the master. */
172
173 if (result == DLM_LU_ADD) {
174 count_add++;
175 }
176
177 last_len = namelen;
178 memcpy(last_name, b, namelen);
179 b += namelen;
180 left -= namelen;
181 count++;
182 }
183 }
184 done:
185 ;
186 }
187
188 out_status:
189 error = 0;
190 dlm_set_recover_status(ls, DLM_RS_DIR);
191
192 log_rinfo(ls, "dlm_recover_directory %u in %u new",
193 count, count_add);
194 out_free:
195 kfree(last_name);
196 out:
197 return error;
198}
199
200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
201{
202 struct dlm_rsb *r;
203 uint32_t hash, bucket;
204 int rv;
205
206 hash = jhash(name, len, 0);
207 bucket = hash & (ls->ls_rsbtbl_size - 1);
208
209 spin_lock(&ls->ls_rsbtbl[bucket].lock);
210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211 if (rv)
212 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213 name, len, &r);
214 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215
216 if (!rv)
217 return r;
218
219 down_read(&ls->ls_root_sem);
220 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222 up_read(&ls->ls_root_sem);
223 log_debug(ls, "find_rsb_root revert to root_list %s",
224 r->res_name);
225 return r;
226 }
227 }
228 up_read(&ls->ls_root_sem);
229 return NULL;
230}
231
232/* Find the rsb where we left off (or start again), then send rsb names
233 for rsb's we're master of and whose directory node matches the requesting
234 node. inbuf is the rsb name last sent, inlen is the name's length */
235
236void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237 char *outbuf, int outlen, int nodeid)
238{
239 struct list_head *list;
240 struct dlm_rsb *r;
241 int offset = 0, dir_nodeid;
242 __be16 be_namelen;
243
244 down_read(&ls->ls_root_sem);
245
246 if (inlen > 1) {
247 r = find_rsb_root(ls, inbuf, inlen);
248 if (!r) {
249 inbuf[inlen - 1] = '\0';
250 log_error(ls, "copy_master_names from %d start %d %s",
251 nodeid, inlen, inbuf);
252 goto out;
253 }
254 list = r->res_root_list.next;
255 } else {
256 list = ls->ls_root_list.next;
257 }
258
259 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260 r = list_entry(list, struct dlm_rsb, res_root_list);
261 if (r->res_nodeid)
262 continue;
263
264 dir_nodeid = dlm_dir_nodeid(r);
265 if (dir_nodeid != nodeid)
266 continue;
267
268 /*
269 * The block ends when we can't fit the following in the
270 * remaining buffer space:
271 * namelen (uint16_t) +
272 * name (r->res_length) +
273 * end-of-block record 0x0000 (uint16_t)
274 */
275
276 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277 /* Write end-of-block record */
278 be_namelen = cpu_to_be16(0);
279 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280 offset += sizeof(__be16);
281 ls->ls_recover_dir_sent_msg++;
282 goto out;
283 }
284
285 be_namelen = cpu_to_be16(r->res_length);
286 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287 offset += sizeof(__be16);
288 memcpy(outbuf + offset, r->res_name, r->res_length);
289 offset += r->res_length;
290 ls->ls_recover_dir_sent_res++;
291 }
292
293 /*
294 * If we've reached the end of the list (and there's room) write a
295 * terminating record.
296 */
297
298 if ((list == &ls->ls_root_list) &&
299 (offset + sizeof(uint16_t) <= outlen)) {
300 be_namelen = cpu_to_be16(0xFFFF);
301 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302 offset += sizeof(__be16);
303 ls->ls_recover_dir_sent_msg++;
304 }
305 out:
306 up_read(&ls->ls_root_sem);
307}
308