Loading...
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12#include "dlm_internal.h"
13#include "lockspace.h"
14#include "member.h"
15#include "lowcomms.h"
16#include "rcom.h"
17#include "config.h"
18#include "memory.h"
19#include "recover.h"
20#include "util.h"
21#include "lock.h"
22#include "dir.h"
23
24/*
25 * We use the upper 16 bits of the hash value to select the directory node.
26 * Low bits are used for distribution of rsb's among hash buckets on each node.
27 *
28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29 * num_nodes to the hash value. This value in the desired range is used as an
30 * offset into the sorted list of nodeid's to give the particular nodeid.
31 */
32
33int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
34{
35 uint32_t node;
36
37 if (ls->ls_num_nodes == 1)
38 return dlm_our_nodeid();
39 else {
40 node = (hash >> 16) % ls->ls_total_weight;
41 return ls->ls_node_array[node];
42 }
43}
44
45int dlm_dir_nodeid(struct dlm_rsb *r)
46{
47 return r->res_dir_nodeid;
48}
49
50void dlm_recover_dir_nodeid(struct dlm_ls *ls)
51{
52 struct dlm_rsb *r;
53
54 down_read(&ls->ls_root_sem);
55 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
56 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
57 }
58 up_read(&ls->ls_root_sem);
59}
60
61int dlm_recover_directory(struct dlm_ls *ls)
62{
63 struct dlm_member *memb;
64 char *b, *last_name = NULL;
65 int error = -ENOMEM, last_len, nodeid, result;
66 uint16_t namelen;
67 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
68
69 log_rinfo(ls, "dlm_recover_directory");
70
71 if (dlm_no_directory(ls))
72 goto out_status;
73
74 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
75 if (!last_name)
76 goto out;
77
78 list_for_each_entry(memb, &ls->ls_nodes, list) {
79 if (memb->nodeid == dlm_our_nodeid())
80 continue;
81
82 memset(last_name, 0, DLM_RESNAME_MAXLEN);
83 last_len = 0;
84
85 for (;;) {
86 int left;
87 error = dlm_recovery_stopped(ls);
88 if (error)
89 goto out_free;
90
91 error = dlm_rcom_names(ls, memb->nodeid,
92 last_name, last_len);
93 if (error)
94 goto out_free;
95
96 cond_resched();
97
98 /*
99 * pick namelen/name pairs out of received buffer
100 */
101
102 b = ls->ls_recover_buf->rc_buf;
103 left = ls->ls_recover_buf->rc_header.h_length;
104 left -= sizeof(struct dlm_rcom);
105
106 for (;;) {
107 __be16 v;
108
109 error = -EINVAL;
110 if (left < sizeof(__be16))
111 goto out_free;
112
113 memcpy(&v, b, sizeof(__be16));
114 namelen = be16_to_cpu(v);
115 b += sizeof(__be16);
116 left -= sizeof(__be16);
117
118 /* namelen of 0xFFFFF marks end of names for
119 this node; namelen of 0 marks end of the
120 buffer */
121
122 if (namelen == 0xFFFF)
123 goto done;
124 if (!namelen)
125 break;
126
127 if (namelen > left)
128 goto out_free;
129
130 if (namelen > DLM_RESNAME_MAXLEN)
131 goto out_free;
132
133 error = dlm_master_lookup(ls, memb->nodeid,
134 b, namelen,
135 DLM_LU_RECOVER_DIR,
136 &nodeid, &result);
137 if (error) {
138 log_error(ls, "recover_dir lookup %d",
139 error);
140 goto out_free;
141 }
142
143 /* The name was found in rsbtbl, but the
144 * master nodeid is different from
145 * memb->nodeid which says it is the master.
146 * This should not happen. */
147
148 if (result == DLM_LU_MATCH &&
149 nodeid != memb->nodeid) {
150 count_bad++;
151 log_error(ls, "recover_dir lookup %d "
152 "nodeid %d memb %d bad %u",
153 result, nodeid, memb->nodeid,
154 count_bad);
155 print_hex_dump_bytes("dlm_recover_dir ",
156 DUMP_PREFIX_NONE,
157 b, namelen);
158 }
159
160 /* The name was found in rsbtbl, and the
161 * master nodeid matches memb->nodeid. */
162
163 if (result == DLM_LU_MATCH &&
164 nodeid == memb->nodeid) {
165 count_match++;
166 }
167
168 /* The name was not found in rsbtbl and was
169 * added with memb->nodeid as the master. */
170
171 if (result == DLM_LU_ADD) {
172 count_add++;
173 }
174
175 last_len = namelen;
176 memcpy(last_name, b, namelen);
177 b += namelen;
178 left -= namelen;
179 count++;
180 }
181 }
182 done:
183 ;
184 }
185
186 out_status:
187 error = 0;
188 dlm_set_recover_status(ls, DLM_RS_DIR);
189
190 log_rinfo(ls, "dlm_recover_directory %u in %u new",
191 count, count_add);
192 out_free:
193 kfree(last_name);
194 out:
195 return error;
196}
197
198static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
199{
200 struct dlm_rsb *r;
201 uint32_t hash, bucket;
202 int rv;
203
204 hash = jhash(name, len, 0);
205 bucket = hash & (ls->ls_rsbtbl_size - 1);
206
207 spin_lock(&ls->ls_rsbtbl[bucket].lock);
208 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
209 if (rv)
210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
211 name, len, &r);
212 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
213
214 if (!rv)
215 return r;
216
217 down_read(&ls->ls_root_sem);
218 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
219 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
220 up_read(&ls->ls_root_sem);
221 log_debug(ls, "find_rsb_root revert to root_list %s",
222 r->res_name);
223 return r;
224 }
225 }
226 up_read(&ls->ls_root_sem);
227 return NULL;
228}
229
230/* Find the rsb where we left off (or start again), then send rsb names
231 for rsb's we're master of and whose directory node matches the requesting
232 node. inbuf is the rsb name last sent, inlen is the name's length */
233
234void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
235 char *outbuf, int outlen, int nodeid)
236{
237 struct list_head *list;
238 struct dlm_rsb *r;
239 int offset = 0, dir_nodeid;
240 __be16 be_namelen;
241
242 down_read(&ls->ls_root_sem);
243
244 if (inlen > 1) {
245 r = find_rsb_root(ls, inbuf, inlen);
246 if (!r) {
247 inbuf[inlen - 1] = '\0';
248 log_error(ls, "copy_master_names from %d start %d %s",
249 nodeid, inlen, inbuf);
250 goto out;
251 }
252 list = r->res_root_list.next;
253 } else {
254 list = ls->ls_root_list.next;
255 }
256
257 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
258 r = list_entry(list, struct dlm_rsb, res_root_list);
259 if (r->res_nodeid)
260 continue;
261
262 dir_nodeid = dlm_dir_nodeid(r);
263 if (dir_nodeid != nodeid)
264 continue;
265
266 /*
267 * The block ends when we can't fit the following in the
268 * remaining buffer space:
269 * namelen (uint16_t) +
270 * name (r->res_length) +
271 * end-of-block record 0x0000 (uint16_t)
272 */
273
274 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
275 /* Write end-of-block record */
276 be_namelen = cpu_to_be16(0);
277 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
278 offset += sizeof(__be16);
279 ls->ls_recover_dir_sent_msg++;
280 goto out;
281 }
282
283 be_namelen = cpu_to_be16(r->res_length);
284 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
285 offset += sizeof(__be16);
286 memcpy(outbuf + offset, r->res_name, r->res_length);
287 offset += r->res_length;
288 ls->ls_recover_dir_sent_res++;
289 }
290
291 /*
292 * If we've reached the end of the list (and there's room) write a
293 * terminating record.
294 */
295
296 if ((list == &ls->ls_root_list) &&
297 (offset + sizeof(uint16_t) <= outlen)) {
298 be_namelen = cpu_to_be16(0xFFFF);
299 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
300 offset += sizeof(__be16);
301 ls->ls_recover_dir_sent_msg++;
302 }
303 out:
304 up_read(&ls->ls_root_sem);
305}
306
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "lowcomms.h"
18#include "rcom.h"
19#include "config.h"
20#include "memory.h"
21#include "recover.h"
22#include "util.h"
23#include "lock.h"
24#include "dir.h"
25
26/*
27 * We use the upper 16 bits of the hash value to select the directory node.
28 * Low bits are used for distribution of rsb's among hash buckets on each node.
29 *
30 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
31 * num_nodes to the hash value. This value in the desired range is used as an
32 * offset into the sorted list of nodeid's to give the particular nodeid.
33 */
34
35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
36{
37 uint32_t node;
38
39 if (ls->ls_num_nodes == 1)
40 return dlm_our_nodeid();
41 else {
42 node = (hash >> 16) % ls->ls_total_weight;
43 return ls->ls_node_array[node];
44 }
45}
46
47int dlm_dir_nodeid(struct dlm_rsb *r)
48{
49 return r->res_dir_nodeid;
50}
51
52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
53{
54 struct dlm_rsb *r;
55
56 down_read(&ls->ls_root_sem);
57 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
58 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
59 }
60 up_read(&ls->ls_root_sem);
61}
62
63int dlm_recover_directory(struct dlm_ls *ls)
64{
65 struct dlm_member *memb;
66 char *b, *last_name = NULL;
67 int error = -ENOMEM, last_len, nodeid, result;
68 uint16_t namelen;
69 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
70
71 log_rinfo(ls, "dlm_recover_directory");
72
73 if (dlm_no_directory(ls))
74 goto out_status;
75
76 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
77 if (!last_name)
78 goto out;
79
80 list_for_each_entry(memb, &ls->ls_nodes, list) {
81 if (memb->nodeid == dlm_our_nodeid())
82 continue;
83
84 memset(last_name, 0, DLM_RESNAME_MAXLEN);
85 last_len = 0;
86
87 for (;;) {
88 int left;
89 error = dlm_recovery_stopped(ls);
90 if (error)
91 goto out_free;
92
93 error = dlm_rcom_names(ls, memb->nodeid,
94 last_name, last_len);
95 if (error)
96 goto out_free;
97
98 cond_resched();
99
100 /*
101 * pick namelen/name pairs out of received buffer
102 */
103
104 b = ls->ls_recover_buf->rc_buf;
105 left = ls->ls_recover_buf->rc_header.h_length;
106 left -= sizeof(struct dlm_rcom);
107
108 for (;;) {
109 __be16 v;
110
111 error = -EINVAL;
112 if (left < sizeof(__be16))
113 goto out_free;
114
115 memcpy(&v, b, sizeof(__be16));
116 namelen = be16_to_cpu(v);
117 b += sizeof(__be16);
118 left -= sizeof(__be16);
119
120 /* namelen of 0xFFFFF marks end of names for
121 this node; namelen of 0 marks end of the
122 buffer */
123
124 if (namelen == 0xFFFF)
125 goto done;
126 if (!namelen)
127 break;
128
129 if (namelen > left)
130 goto out_free;
131
132 if (namelen > DLM_RESNAME_MAXLEN)
133 goto out_free;
134
135 error = dlm_master_lookup(ls, memb->nodeid,
136 b, namelen,
137 DLM_LU_RECOVER_DIR,
138 &nodeid, &result);
139 if (error) {
140 log_error(ls, "recover_dir lookup %d",
141 error);
142 goto out_free;
143 }
144
145 /* The name was found in rsbtbl, but the
146 * master nodeid is different from
147 * memb->nodeid which says it is the master.
148 * This should not happen. */
149
150 if (result == DLM_LU_MATCH &&
151 nodeid != memb->nodeid) {
152 count_bad++;
153 log_error(ls, "recover_dir lookup %d "
154 "nodeid %d memb %d bad %u",
155 result, nodeid, memb->nodeid,
156 count_bad);
157 print_hex_dump_bytes("dlm_recover_dir ",
158 DUMP_PREFIX_NONE,
159 b, namelen);
160 }
161
162 /* The name was found in rsbtbl, and the
163 * master nodeid matches memb->nodeid. */
164
165 if (result == DLM_LU_MATCH &&
166 nodeid == memb->nodeid) {
167 count_match++;
168 }
169
170 /* The name was not found in rsbtbl and was
171 * added with memb->nodeid as the master. */
172
173 if (result == DLM_LU_ADD) {
174 count_add++;
175 }
176
177 last_len = namelen;
178 memcpy(last_name, b, namelen);
179 b += namelen;
180 left -= namelen;
181 count++;
182 }
183 }
184 done:
185 ;
186 }
187
188 out_status:
189 error = 0;
190 dlm_set_recover_status(ls, DLM_RS_DIR);
191
192 log_rinfo(ls, "dlm_recover_directory %u in %u new",
193 count, count_add);
194 out_free:
195 kfree(last_name);
196 out:
197 return error;
198}
199
200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
201{
202 struct dlm_rsb *r;
203 uint32_t hash, bucket;
204 int rv;
205
206 hash = jhash(name, len, 0);
207 bucket = hash & (ls->ls_rsbtbl_size - 1);
208
209 spin_lock(&ls->ls_rsbtbl[bucket].lock);
210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211 if (rv)
212 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213 name, len, &r);
214 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215
216 if (!rv)
217 return r;
218
219 down_read(&ls->ls_root_sem);
220 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222 up_read(&ls->ls_root_sem);
223 log_debug(ls, "find_rsb_root revert to root_list %s",
224 r->res_name);
225 return r;
226 }
227 }
228 up_read(&ls->ls_root_sem);
229 return NULL;
230}
231
232/* Find the rsb where we left off (or start again), then send rsb names
233 for rsb's we're master of and whose directory node matches the requesting
234 node. inbuf is the rsb name last sent, inlen is the name's length */
235
236void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237 char *outbuf, int outlen, int nodeid)
238{
239 struct list_head *list;
240 struct dlm_rsb *r;
241 int offset = 0, dir_nodeid;
242 __be16 be_namelen;
243
244 down_read(&ls->ls_root_sem);
245
246 if (inlen > 1) {
247 r = find_rsb_root(ls, inbuf, inlen);
248 if (!r) {
249 inbuf[inlen - 1] = '\0';
250 log_error(ls, "copy_master_names from %d start %d %s",
251 nodeid, inlen, inbuf);
252 goto out;
253 }
254 list = r->res_root_list.next;
255 } else {
256 list = ls->ls_root_list.next;
257 }
258
259 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260 r = list_entry(list, struct dlm_rsb, res_root_list);
261 if (r->res_nodeid)
262 continue;
263
264 dir_nodeid = dlm_dir_nodeid(r);
265 if (dir_nodeid != nodeid)
266 continue;
267
268 /*
269 * The block ends when we can't fit the following in the
270 * remaining buffer space:
271 * namelen (uint16_t) +
272 * name (r->res_length) +
273 * end-of-block record 0x0000 (uint16_t)
274 */
275
276 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277 /* Write end-of-block record */
278 be_namelen = cpu_to_be16(0);
279 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280 offset += sizeof(__be16);
281 ls->ls_recover_dir_sent_msg++;
282 goto out;
283 }
284
285 be_namelen = cpu_to_be16(r->res_length);
286 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287 offset += sizeof(__be16);
288 memcpy(outbuf + offset, r->res_name, r->res_length);
289 offset += r->res_length;
290 ls->ls_recover_dir_sent_res++;
291 }
292
293 /*
294 * If we've reached the end of the list (and there's room) write a
295 * terminating record.
296 */
297
298 if ((list == &ls->ls_root_list) &&
299 (offset + sizeof(uint16_t) <= outlen)) {
300 be_namelen = cpu_to_be16(0xFFFF);
301 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302 offset += sizeof(__be16);
303 ls->ls_recover_dir_sent_msg++;
304 }
305 out:
306 up_read(&ls->ls_root_sem);
307}
308