Loading...
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "lowcomms.h"
18#include "rcom.h"
19#include "config.h"
20#include "memory.h"
21#include "recover.h"
22#include "util.h"
23#include "lock.h"
24#include "dir.h"
25
26/*
27 * We use the upper 16 bits of the hash value to select the directory node.
28 * Low bits are used for distribution of rsb's among hash buckets on each node.
29 *
30 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
31 * num_nodes to the hash value. This value in the desired range is used as an
32 * offset into the sorted list of nodeid's to give the particular nodeid.
33 */
34
35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
36{
37 uint32_t node;
38
39 if (ls->ls_num_nodes == 1)
40 return dlm_our_nodeid();
41 else {
42 node = (hash >> 16) % ls->ls_total_weight;
43 return ls->ls_node_array[node];
44 }
45}
46
47int dlm_dir_nodeid(struct dlm_rsb *r)
48{
49 return r->res_dir_nodeid;
50}
51
52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
53{
54 struct dlm_rsb *r;
55
56 down_read(&ls->ls_root_sem);
57 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
58 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
59 }
60 up_read(&ls->ls_root_sem);
61}
62
63int dlm_recover_directory(struct dlm_ls *ls)
64{
65 struct dlm_member *memb;
66 char *b, *last_name = NULL;
67 int error = -ENOMEM, last_len, nodeid, result;
68 uint16_t namelen;
69 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
70
71 log_rinfo(ls, "dlm_recover_directory");
72
73 if (dlm_no_directory(ls))
74 goto out_status;
75
76 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
77 if (!last_name)
78 goto out;
79
80 list_for_each_entry(memb, &ls->ls_nodes, list) {
81 if (memb->nodeid == dlm_our_nodeid())
82 continue;
83
84 memset(last_name, 0, DLM_RESNAME_MAXLEN);
85 last_len = 0;
86
87 for (;;) {
88 int left;
89 error = dlm_recovery_stopped(ls);
90 if (error)
91 goto out_free;
92
93 error = dlm_rcom_names(ls, memb->nodeid,
94 last_name, last_len);
95 if (error)
96 goto out_free;
97
98 cond_resched();
99
100 /*
101 * pick namelen/name pairs out of received buffer
102 */
103
104 b = ls->ls_recover_buf->rc_buf;
105 left = ls->ls_recover_buf->rc_header.h_length;
106 left -= sizeof(struct dlm_rcom);
107
108 for (;;) {
109 __be16 v;
110
111 error = -EINVAL;
112 if (left < sizeof(__be16))
113 goto out_free;
114
115 memcpy(&v, b, sizeof(__be16));
116 namelen = be16_to_cpu(v);
117 b += sizeof(__be16);
118 left -= sizeof(__be16);
119
120 /* namelen of 0xFFFFF marks end of names for
121 this node; namelen of 0 marks end of the
122 buffer */
123
124 if (namelen == 0xFFFF)
125 goto done;
126 if (!namelen)
127 break;
128
129 if (namelen > left)
130 goto out_free;
131
132 if (namelen > DLM_RESNAME_MAXLEN)
133 goto out_free;
134
135 error = dlm_master_lookup(ls, memb->nodeid,
136 b, namelen,
137 DLM_LU_RECOVER_DIR,
138 &nodeid, &result);
139 if (error) {
140 log_error(ls, "recover_dir lookup %d",
141 error);
142 goto out_free;
143 }
144
145 /* The name was found in rsbtbl, but the
146 * master nodeid is different from
147 * memb->nodeid which says it is the master.
148 * This should not happen. */
149
150 if (result == DLM_LU_MATCH &&
151 nodeid != memb->nodeid) {
152 count_bad++;
153 log_error(ls, "recover_dir lookup %d "
154 "nodeid %d memb %d bad %u",
155 result, nodeid, memb->nodeid,
156 count_bad);
157 print_hex_dump_bytes("dlm_recover_dir ",
158 DUMP_PREFIX_NONE,
159 b, namelen);
160 }
161
162 /* The name was found in rsbtbl, and the
163 * master nodeid matches memb->nodeid. */
164
165 if (result == DLM_LU_MATCH &&
166 nodeid == memb->nodeid) {
167 count_match++;
168 }
169
170 /* The name was not found in rsbtbl and was
171 * added with memb->nodeid as the master. */
172
173 if (result == DLM_LU_ADD) {
174 count_add++;
175 }
176
177 last_len = namelen;
178 memcpy(last_name, b, namelen);
179 b += namelen;
180 left -= namelen;
181 count++;
182 }
183 }
184 done:
185 ;
186 }
187
188 out_status:
189 error = 0;
190 dlm_set_recover_status(ls, DLM_RS_DIR);
191
192 log_rinfo(ls, "dlm_recover_directory %u in %u new",
193 count, count_add);
194 out_free:
195 kfree(last_name);
196 out:
197 return error;
198}
199
200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
201{
202 struct dlm_rsb *r;
203 uint32_t hash, bucket;
204 int rv;
205
206 hash = jhash(name, len, 0);
207 bucket = hash & (ls->ls_rsbtbl_size - 1);
208
209 spin_lock(&ls->ls_rsbtbl[bucket].lock);
210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211 if (rv)
212 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213 name, len, &r);
214 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215
216 if (!rv)
217 return r;
218
219 down_read(&ls->ls_root_sem);
220 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222 up_read(&ls->ls_root_sem);
223 log_debug(ls, "find_rsb_root revert to root_list %s",
224 r->res_name);
225 return r;
226 }
227 }
228 up_read(&ls->ls_root_sem);
229 return NULL;
230}
231
232/* Find the rsb where we left off (or start again), then send rsb names
233 for rsb's we're master of and whose directory node matches the requesting
234 node. inbuf is the rsb name last sent, inlen is the name's length */
235
236void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237 char *outbuf, int outlen, int nodeid)
238{
239 struct list_head *list;
240 struct dlm_rsb *r;
241 int offset = 0, dir_nodeid;
242 __be16 be_namelen;
243
244 down_read(&ls->ls_root_sem);
245
246 if (inlen > 1) {
247 r = find_rsb_root(ls, inbuf, inlen);
248 if (!r) {
249 inbuf[inlen - 1] = '\0';
250 log_error(ls, "copy_master_names from %d start %d %s",
251 nodeid, inlen, inbuf);
252 goto out;
253 }
254 list = r->res_root_list.next;
255 } else {
256 list = ls->ls_root_list.next;
257 }
258
259 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260 r = list_entry(list, struct dlm_rsb, res_root_list);
261 if (r->res_nodeid)
262 continue;
263
264 dir_nodeid = dlm_dir_nodeid(r);
265 if (dir_nodeid != nodeid)
266 continue;
267
268 /*
269 * The block ends when we can't fit the following in the
270 * remaining buffer space:
271 * namelen (uint16_t) +
272 * name (r->res_length) +
273 * end-of-block record 0x0000 (uint16_t)
274 */
275
276 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277 /* Write end-of-block record */
278 be_namelen = cpu_to_be16(0);
279 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280 offset += sizeof(__be16);
281 ls->ls_recover_dir_sent_msg++;
282 goto out;
283 }
284
285 be_namelen = cpu_to_be16(r->res_length);
286 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287 offset += sizeof(__be16);
288 memcpy(outbuf + offset, r->res_name, r->res_length);
289 offset += r->res_length;
290 ls->ls_recover_dir_sent_res++;
291 }
292
293 /*
294 * If we've reached the end of the list (and there's room) write a
295 * terminating record.
296 */
297
298 if ((list == &ls->ls_root_list) &&
299 (offset + sizeof(uint16_t) <= outlen)) {
300 be_namelen = cpu_to_be16(0xFFFF);
301 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302 offset += sizeof(__be16);
303 ls->ls_recover_dir_sent_msg++;
304 }
305 out:
306 up_read(&ls->ls_root_sem);
307}
308
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12#include "dlm_internal.h"
13#include "lockspace.h"
14#include "member.h"
15#include "lowcomms.h"
16#include "rcom.h"
17#include "config.h"
18#include "memory.h"
19#include "recover.h"
20#include "util.h"
21#include "lock.h"
22#include "dir.h"
23
24/*
25 * We use the upper 16 bits of the hash value to select the directory node.
26 * Low bits are used for distribution of rsb's among hash buckets on each node.
27 *
28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29 * num_nodes to the hash value. This value in the desired range is used as an
30 * offset into the sorted list of nodeid's to give the particular nodeid.
31 */
32
33int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
34{
35 uint32_t node;
36
37 if (ls->ls_num_nodes == 1)
38 return dlm_our_nodeid();
39 else {
40 node = (hash >> 16) % ls->ls_total_weight;
41 return ls->ls_node_array[node];
42 }
43}
44
45int dlm_dir_nodeid(struct dlm_rsb *r)
46{
47 return r->res_dir_nodeid;
48}
49
50void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
51{
52 struct dlm_rsb *r;
53
54 list_for_each_entry(r, root_list, res_root_list) {
55 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
56 }
57}
58
59int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
60{
61 struct dlm_member *memb;
62 char *b, *last_name = NULL;
63 int error = -ENOMEM, last_len, nodeid, result;
64 uint16_t namelen;
65 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
66
67 log_rinfo(ls, "dlm_recover_directory");
68
69 if (dlm_no_directory(ls))
70 goto out_status;
71
72 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
73 if (!last_name)
74 goto out;
75
76 list_for_each_entry(memb, &ls->ls_nodes, list) {
77 if (memb->nodeid == dlm_our_nodeid())
78 continue;
79
80 memset(last_name, 0, DLM_RESNAME_MAXLEN);
81 last_len = 0;
82
83 for (;;) {
84 int left;
85 if (dlm_recovery_stopped(ls)) {
86 error = -EINTR;
87 goto out_free;
88 }
89
90 error = dlm_rcom_names(ls, memb->nodeid,
91 last_name, last_len, seq);
92 if (error)
93 goto out_free;
94
95 cond_resched();
96
97 /*
98 * pick namelen/name pairs out of received buffer
99 */
100
101 b = ls->ls_recover_buf->rc_buf;
102 left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
103 left -= sizeof(struct dlm_rcom);
104
105 for (;;) {
106 __be16 v;
107
108 error = -EINVAL;
109 if (left < sizeof(__be16))
110 goto out_free;
111
112 memcpy(&v, b, sizeof(__be16));
113 namelen = be16_to_cpu(v);
114 b += sizeof(__be16);
115 left -= sizeof(__be16);
116
117 /* namelen of 0xFFFFF marks end of names for
118 this node; namelen of 0 marks end of the
119 buffer */
120
121 if (namelen == 0xFFFF)
122 goto done;
123 if (!namelen)
124 break;
125
126 if (namelen > left)
127 goto out_free;
128
129 if (namelen > DLM_RESNAME_MAXLEN)
130 goto out_free;
131
132 error = dlm_master_lookup(ls, memb->nodeid,
133 b, namelen,
134 DLM_LU_RECOVER_DIR,
135 &nodeid, &result);
136 if (error) {
137 log_error(ls, "recover_dir lookup %d",
138 error);
139 goto out_free;
140 }
141
142 /* The name was found in rsbtbl, but the
143 * master nodeid is different from
144 * memb->nodeid which says it is the master.
145 * This should not happen. */
146
147 if (result == DLM_LU_MATCH &&
148 nodeid != memb->nodeid) {
149 count_bad++;
150 log_error(ls, "recover_dir lookup %d "
151 "nodeid %d memb %d bad %u",
152 result, nodeid, memb->nodeid,
153 count_bad);
154 print_hex_dump_bytes("dlm_recover_dir ",
155 DUMP_PREFIX_NONE,
156 b, namelen);
157 }
158
159 /* The name was found in rsbtbl, and the
160 * master nodeid matches memb->nodeid. */
161
162 if (result == DLM_LU_MATCH &&
163 nodeid == memb->nodeid) {
164 count_match++;
165 }
166
167 /* The name was not found in rsbtbl and was
168 * added with memb->nodeid as the master. */
169
170 if (result == DLM_LU_ADD) {
171 count_add++;
172 }
173
174 last_len = namelen;
175 memcpy(last_name, b, namelen);
176 b += namelen;
177 left -= namelen;
178 count++;
179 }
180 }
181 done:
182 ;
183 }
184
185 out_status:
186 error = 0;
187 dlm_set_recover_status(ls, DLM_RS_DIR);
188
189 log_rinfo(ls, "dlm_recover_directory %u in %u new",
190 count, count_add);
191 out_free:
192 kfree(last_name);
193 out:
194 return error;
195}
196
197static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
198 int len)
199{
200 struct dlm_rsb *r;
201 int rv;
202
203 read_lock_bh(&ls->ls_rsbtbl_lock);
204 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
205 read_unlock_bh(&ls->ls_rsbtbl_lock);
206 if (!rv)
207 return r;
208
209 list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
210 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
211 log_debug(ls, "find_rsb_root revert to root_list %s",
212 r->res_name);
213 return r;
214 }
215 }
216 return NULL;
217}
218
219struct dlm_dir_dump {
220 /* init values to match if whole
221 * dump fits to one seq. Sanity check only.
222 */
223 uint64_t seq_init;
224 uint64_t nodeid_init;
225 /* compare local pointer with last lookup,
226 * just a sanity check.
227 */
228 struct list_head *last;
229
230 unsigned int sent_res; /* for log info */
231 unsigned int sent_msg; /* for log info */
232
233 struct list_head list;
234};
235
236static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
237{
238 struct dlm_dir_dump *dd, *safe;
239
240 write_lock_bh(&ls->ls_dir_dump_lock);
241 list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
242 if (dd->nodeid_init == nodeid) {
243 log_error(ls, "drop dump seq %llu",
244 (unsigned long long)dd->seq_init);
245 list_del(&dd->list);
246 kfree(dd);
247 }
248 }
249 write_unlock_bh(&ls->ls_dir_dump_lock);
250}
251
252static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
253{
254 struct dlm_dir_dump *iter, *dd = NULL;
255
256 read_lock_bh(&ls->ls_dir_dump_lock);
257 list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
258 if (iter->nodeid_init == nodeid) {
259 dd = iter;
260 break;
261 }
262 }
263 read_unlock_bh(&ls->ls_dir_dump_lock);
264
265 return dd;
266}
267
268static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
269{
270 struct dlm_dir_dump *dd;
271
272 dd = lookup_dir_dump(ls, nodeid);
273 if (dd) {
274 log_error(ls, "found ongoing dir dump for node %d, will drop it",
275 nodeid);
276 drop_dir_ctx(ls, nodeid);
277 }
278
279 dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
280 if (!dd)
281 return NULL;
282
283 dd->seq_init = ls->ls_recover_seq;
284 dd->nodeid_init = nodeid;
285
286 write_lock_bh(&ls->ls_dir_dump_lock);
287 list_add(&dd->list, &ls->ls_dir_dump_list);
288 write_unlock_bh(&ls->ls_dir_dump_lock);
289
290 return dd;
291}
292
293/* Find the rsb where we left off (or start again), then send rsb names
294 for rsb's we're master of and whose directory node matches the requesting
295 node. inbuf is the rsb name last sent, inlen is the name's length */
296
297void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
298 char *outbuf, int outlen, int nodeid)
299{
300 struct list_head *list;
301 struct dlm_rsb *r;
302 int offset = 0, dir_nodeid;
303 struct dlm_dir_dump *dd;
304 __be16 be_namelen;
305
306 read_lock_bh(&ls->ls_masters_lock);
307
308 if (inlen > 1) {
309 dd = lookup_dir_dump(ls, nodeid);
310 if (!dd) {
311 log_error(ls, "failed to lookup dir dump context nodeid: %d",
312 nodeid);
313 goto out;
314 }
315
316 /* next chunk in dump */
317 r = find_rsb_root(ls, inbuf, inlen);
318 if (!r) {
319 log_error(ls, "copy_master_names from %d start %d %.*s",
320 nodeid, inlen, inlen, inbuf);
321 goto out;
322 }
323 list = r->res_masters_list.next;
324
325 /* sanity checks */
326 if (dd->last != &r->res_masters_list ||
327 dd->seq_init != ls->ls_recover_seq) {
328 log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
329 (unsigned long long)dd->seq_init,
330 (unsigned long long)ls->ls_recover_seq);
331 goto out;
332 }
333 } else {
334 dd = init_dir_dump(ls, nodeid);
335 if (!dd) {
336 log_error(ls, "failed to allocate dir dump context");
337 goto out;
338 }
339
340 /* start dump */
341 list = ls->ls_masters_list.next;
342 dd->last = list;
343 }
344
345 for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
346 r = list_entry(list, struct dlm_rsb, res_masters_list);
347 dir_nodeid = dlm_dir_nodeid(r);
348 if (dir_nodeid != nodeid)
349 continue;
350
351 /*
352 * The block ends when we can't fit the following in the
353 * remaining buffer space:
354 * namelen (uint16_t) +
355 * name (r->res_length) +
356 * end-of-block record 0x0000 (uint16_t)
357 */
358
359 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
360 /* Write end-of-block record */
361 be_namelen = cpu_to_be16(0);
362 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
363 offset += sizeof(__be16);
364 dd->sent_msg++;
365 goto out;
366 }
367
368 be_namelen = cpu_to_be16(r->res_length);
369 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
370 offset += sizeof(__be16);
371 memcpy(outbuf + offset, r->res_name, r->res_length);
372 offset += r->res_length;
373 dd->sent_res++;
374 dd->last = list;
375 }
376
377 /*
378 * If we've reached the end of the list (and there's room) write a
379 * terminating record.
380 */
381
382 if ((list == &ls->ls_masters_list) &&
383 (offset + sizeof(uint16_t) <= outlen)) {
384 /* end dump */
385 be_namelen = cpu_to_be16(0xFFFF);
386 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
387 offset += sizeof(__be16);
388 dd->sent_msg++;
389 log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
390 nodeid, dd->sent_res, dd->sent_msg);
391
392 write_lock_bh(&ls->ls_dir_dump_lock);
393 list_del_init(&dd->list);
394 write_unlock_bh(&ls->ls_dir_dump_lock);
395 kfree(dd);
396 }
397 out:
398 read_unlock_bh(&ls->ls_masters_lock);
399}
400