Linux Audio

Check our new training course

Linux kernel drivers training

May 6-19, 2025
Register
Loading...
v4.6
 
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "lowcomms.h"
 18#include "rcom.h"
 19#include "config.h"
 20#include "memory.h"
 21#include "recover.h"
 22#include "util.h"
 23#include "lock.h"
 24#include "dir.h"
 25
 26/*
 27 * We use the upper 16 bits of the hash value to select the directory node.
 28 * Low bits are used for distribution of rsb's among hash buckets on each node.
 29 *
 30 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
 31 * num_nodes to the hash value.  This value in the desired range is used as an
 32 * offset into the sorted list of nodeid's to give the particular nodeid.
 33 */
 34
 35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
 36{
 37	uint32_t node;
 38
 39	if (ls->ls_num_nodes == 1)
 40		return dlm_our_nodeid();
 41	else {
 42		node = (hash >> 16) % ls->ls_total_weight;
 43		return ls->ls_node_array[node];
 44	}
 45}
 46
 47int dlm_dir_nodeid(struct dlm_rsb *r)
 48{
 49	return r->res_dir_nodeid;
 50}
 51
 52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
 53{
 54	struct dlm_rsb *r;
 55
 56	down_read(&ls->ls_root_sem);
 57	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 58		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
 59	}
 60	up_read(&ls->ls_root_sem);
 61}
 62
 63int dlm_recover_directory(struct dlm_ls *ls)
 64{
 65	struct dlm_member *memb;
 66	char *b, *last_name = NULL;
 67	int error = -ENOMEM, last_len, nodeid, result;
 68	uint16_t namelen;
 69	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
 70
 71	log_rinfo(ls, "dlm_recover_directory");
 72
 73	if (dlm_no_directory(ls))
 74		goto out_status;
 75
 76	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
 77	if (!last_name)
 78		goto out;
 79
 80	list_for_each_entry(memb, &ls->ls_nodes, list) {
 81		if (memb->nodeid == dlm_our_nodeid())
 82			continue;
 83
 84		memset(last_name, 0, DLM_RESNAME_MAXLEN);
 85		last_len = 0;
 86
 87		for (;;) {
 88			int left;
 89			error = dlm_recovery_stopped(ls);
 90			if (error)
 91				goto out_free;
 
 92
 93			error = dlm_rcom_names(ls, memb->nodeid,
 94					       last_name, last_len);
 95			if (error)
 96				goto out_free;
 97
 98			cond_resched();
 99
100			/*
101			 * pick namelen/name pairs out of received buffer
102			 */
103
104			b = ls->ls_recover_buf->rc_buf;
105			left = ls->ls_recover_buf->rc_header.h_length;
106			left -= sizeof(struct dlm_rcom);
107
108			for (;;) {
109				__be16 v;
110
111				error = -EINVAL;
112				if (left < sizeof(__be16))
113					goto out_free;
114
115				memcpy(&v, b, sizeof(__be16));
116				namelen = be16_to_cpu(v);
117				b += sizeof(__be16);
118				left -= sizeof(__be16);
119
120				/* namelen of 0xFFFFF marks end of names for
121				   this node; namelen of 0 marks end of the
122				   buffer */
123
124				if (namelen == 0xFFFF)
125					goto done;
126				if (!namelen)
127					break;
128
129				if (namelen > left)
130					goto out_free;
131
132				if (namelen > DLM_RESNAME_MAXLEN)
133					goto out_free;
134
135				error = dlm_master_lookup(ls, memb->nodeid,
136							  b, namelen,
137							  DLM_LU_RECOVER_DIR,
138							  &nodeid, &result);
139				if (error) {
140					log_error(ls, "recover_dir lookup %d",
141						  error);
142					goto out_free;
143				}
144
145				/* The name was found in rsbtbl, but the
146				 * master nodeid is different from
147				 * memb->nodeid which says it is the master.
148				 * This should not happen. */
149
150				if (result == DLM_LU_MATCH &&
151				    nodeid != memb->nodeid) {
152					count_bad++;
153					log_error(ls, "recover_dir lookup %d "
154						  "nodeid %d memb %d bad %u",
155						  result, nodeid, memb->nodeid,
156						  count_bad);
157					print_hex_dump_bytes("dlm_recover_dir ",
158							     DUMP_PREFIX_NONE,
159							     b, namelen);
160				}
161
162				/* The name was found in rsbtbl, and the
163				 * master nodeid matches memb->nodeid. */
164
165				if (result == DLM_LU_MATCH &&
166				    nodeid == memb->nodeid) {
167					count_match++;
168				}
169
170				/* The name was not found in rsbtbl and was
171				 * added with memb->nodeid as the master. */
172
173				if (result == DLM_LU_ADD) {
174					count_add++;
175				}
176
177				last_len = namelen;
178				memcpy(last_name, b, namelen);
179				b += namelen;
180				left -= namelen;
181				count++;
182			}
183		}
184	 done:
185		;
186	}
187
188 out_status:
189	error = 0;
190	dlm_set_recover_status(ls, DLM_RS_DIR);
191
192	log_rinfo(ls, "dlm_recover_directory %u in %u new",
193		  count, count_add);
194 out_free:
195	kfree(last_name);
196 out:
197	return error;
198}
199
200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
 
201{
202	struct dlm_rsb *r;
203	uint32_t hash, bucket;
204	int rv;
205
206	hash = jhash(name, len, 0);
207	bucket = hash & (ls->ls_rsbtbl_size - 1);
208
209	spin_lock(&ls->ls_rsbtbl[bucket].lock);
210	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211	if (rv)
212		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213					 name, len, &r);
214	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215
216	if (!rv)
217		return r;
218
219	down_read(&ls->ls_root_sem);
220	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222			up_read(&ls->ls_root_sem);
223			log_debug(ls, "find_rsb_root revert to root_list %s",
224				  r->res_name);
225			return r;
226		}
227	}
228	up_read(&ls->ls_root_sem);
229	return NULL;
230}
231
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232/* Find the rsb where we left off (or start again), then send rsb names
233   for rsb's we're master of and whose directory node matches the requesting
234   node.  inbuf is the rsb name last sent, inlen is the name's length */
235
236void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237 			   char *outbuf, int outlen, int nodeid)
238{
239	struct list_head *list;
240	struct dlm_rsb *r;
241	int offset = 0, dir_nodeid;
 
242	__be16 be_namelen;
243
244	down_read(&ls->ls_root_sem);
245
246	if (inlen > 1) {
 
 
 
 
 
 
 
 
247		r = find_rsb_root(ls, inbuf, inlen);
248		if (!r) {
249			inbuf[inlen - 1] = '\0';
250			log_error(ls, "copy_master_names from %d start %d %s",
251				  nodeid, inlen, inbuf);
 
 
 
 
 
 
 
 
 
252			goto out;
253		}
254		list = r->res_root_list.next;
255	} else {
256		list = ls->ls_root_list.next;
257	}
 
 
 
258
259	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260		r = list_entry(list, struct dlm_rsb, res_root_list);
261		if (r->res_nodeid)
262			continue;
263
 
 
264		dir_nodeid = dlm_dir_nodeid(r);
265		if (dir_nodeid != nodeid)
266			continue;
267
268		/*
269		 * The block ends when we can't fit the following in the
270		 * remaining buffer space:
271		 * namelen (uint16_t) +
272		 * name (r->res_length) +
273		 * end-of-block record 0x0000 (uint16_t)
274		 */
275
276		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277			/* Write end-of-block record */
278			be_namelen = cpu_to_be16(0);
279			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280			offset += sizeof(__be16);
281			ls->ls_recover_dir_sent_msg++;
282			goto out;
283		}
284
285		be_namelen = cpu_to_be16(r->res_length);
286		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287		offset += sizeof(__be16);
288		memcpy(outbuf + offset, r->res_name, r->res_length);
289		offset += r->res_length;
290		ls->ls_recover_dir_sent_res++;
 
291	}
292
293	/*
294	 * If we've reached the end of the list (and there's room) write a
295	 * terminating record.
296	 */
297
298	if ((list == &ls->ls_root_list) &&
299	    (offset + sizeof(uint16_t) <= outlen)) {
 
300		be_namelen = cpu_to_be16(0xFFFF);
301		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302		offset += sizeof(__be16);
303		ls->ls_recover_dir_sent_msg++;
 
 
 
 
 
 
 
304	}
305 out:
306	up_read(&ls->ls_root_sem);
307}
308
v6.13.7
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
  7**
 
 
 
  8**
  9*******************************************************************************
 10******************************************************************************/
 11
 12#include "dlm_internal.h"
 13#include "lockspace.h"
 14#include "member.h"
 15#include "lowcomms.h"
 16#include "rcom.h"
 17#include "config.h"
 18#include "memory.h"
 19#include "recover.h"
 20#include "util.h"
 21#include "lock.h"
 22#include "dir.h"
 23
 24/*
 25 * We use the upper 16 bits of the hash value to select the directory node.
 26 * Low bits are used for distribution of rsb's among hash buckets on each node.
 27 *
 28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
 29 * num_nodes to the hash value.  This value in the desired range is used as an
 30 * offset into the sorted list of nodeid's to give the particular nodeid.
 31 */
 32
 33int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
 34{
 35	uint32_t node;
 36
 37	if (ls->ls_num_nodes == 1)
 38		return dlm_our_nodeid();
 39	else {
 40		node = (hash >> 16) % ls->ls_total_weight;
 41		return ls->ls_node_array[node];
 42	}
 43}
 44
 45int dlm_dir_nodeid(struct dlm_rsb *r)
 46{
 47	return r->res_dir_nodeid;
 48}
 49
 50void dlm_recover_dir_nodeid(struct dlm_ls *ls, const struct list_head *root_list)
 51{
 52	struct dlm_rsb *r;
 53
 54	list_for_each_entry(r, root_list, res_root_list) {
 
 55		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
 56	}
 
 57}
 58
 59int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
 60{
 61	struct dlm_member *memb;
 62	char *b, *last_name = NULL;
 63	int error = -ENOMEM, last_len, nodeid, result;
 64	uint16_t namelen;
 65	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
 66
 67	log_rinfo(ls, "dlm_recover_directory");
 68
 69	if (dlm_no_directory(ls))
 70		goto out_status;
 71
 72	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
 73	if (!last_name)
 74		goto out;
 75
 76	list_for_each_entry(memb, &ls->ls_nodes, list) {
 77		if (memb->nodeid == dlm_our_nodeid())
 78			continue;
 79
 80		memset(last_name, 0, DLM_RESNAME_MAXLEN);
 81		last_len = 0;
 82
 83		for (;;) {
 84			int left;
 85			if (dlm_recovery_stopped(ls)) {
 86				error = -EINTR;
 87				goto out_free;
 88			}
 89
 90			error = dlm_rcom_names(ls, memb->nodeid,
 91					       last_name, last_len, seq);
 92			if (error)
 93				goto out_free;
 94
 95			cond_resched();
 96
 97			/*
 98			 * pick namelen/name pairs out of received buffer
 99			 */
100
101			b = ls->ls_recover_buf->rc_buf;
102			left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
103			left -= sizeof(struct dlm_rcom);
104
105			for (;;) {
106				__be16 v;
107
108				error = -EINVAL;
109				if (left < sizeof(__be16))
110					goto out_free;
111
112				memcpy(&v, b, sizeof(__be16));
113				namelen = be16_to_cpu(v);
114				b += sizeof(__be16);
115				left -= sizeof(__be16);
116
117				/* namelen of 0xFFFFF marks end of names for
118				   this node; namelen of 0 marks end of the
119				   buffer */
120
121				if (namelen == 0xFFFF)
122					goto done;
123				if (!namelen)
124					break;
125
126				if (namelen > left)
127					goto out_free;
128
129				if (namelen > DLM_RESNAME_MAXLEN)
130					goto out_free;
131
132				error = dlm_master_lookup(ls, memb->nodeid,
133							  b, namelen,
134							  DLM_LU_RECOVER_DIR,
135							  &nodeid, &result);
136				if (error) {
137					log_error(ls, "recover_dir lookup %d",
138						  error);
139					goto out_free;
140				}
141
142				/* The name was found in rsbtbl, but the
143				 * master nodeid is different from
144				 * memb->nodeid which says it is the master.
145				 * This should not happen. */
146
147				if (result == DLM_LU_MATCH &&
148				    nodeid != memb->nodeid) {
149					count_bad++;
150					log_error(ls, "recover_dir lookup %d "
151						  "nodeid %d memb %d bad %u",
152						  result, nodeid, memb->nodeid,
153						  count_bad);
154					print_hex_dump_bytes("dlm_recover_dir ",
155							     DUMP_PREFIX_NONE,
156							     b, namelen);
157				}
158
159				/* The name was found in rsbtbl, and the
160				 * master nodeid matches memb->nodeid. */
161
162				if (result == DLM_LU_MATCH &&
163				    nodeid == memb->nodeid) {
164					count_match++;
165				}
166
167				/* The name was not found in rsbtbl and was
168				 * added with memb->nodeid as the master. */
169
170				if (result == DLM_LU_ADD) {
171					count_add++;
172				}
173
174				last_len = namelen;
175				memcpy(last_name, b, namelen);
176				b += namelen;
177				left -= namelen;
178				count++;
179			}
180		}
181	 done:
182		;
183	}
184
185 out_status:
186	error = 0;
187	dlm_set_recover_status(ls, DLM_RS_DIR);
188
189	log_rinfo(ls, "dlm_recover_directory %u in %u new",
190		  count, count_add);
191 out_free:
192	kfree(last_name);
193 out:
194	return error;
195}
196
197static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
198				     int len)
199{
200	struct dlm_rsb *r;
 
201	int rv;
202
203	read_lock_bh(&ls->ls_rsbtbl_lock);
204	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
205	read_unlock_bh(&ls->ls_rsbtbl_lock);
 
 
 
 
 
 
 
206	if (!rv)
207		return r;
208
209	list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) {
 
210		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
 
211			log_debug(ls, "find_rsb_root revert to root_list %s",
212				  r->res_name);
213			return r;
214		}
215	}
 
216	return NULL;
217}
218
219struct dlm_dir_dump {
220	/* init values to match if whole
221	 * dump fits to one seq. Sanity check only.
222	 */
223	uint64_t seq_init;
224	uint64_t nodeid_init;
225	/* compare local pointer with last lookup,
226	 * just a sanity check.
227	 */
228	struct list_head *last;
229
230	unsigned int sent_res; /* for log info */
231	unsigned int sent_msg; /* for log info */
232
233	struct list_head list;
234};
235
236static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
237{
238	struct dlm_dir_dump *dd, *safe;
239
240	write_lock_bh(&ls->ls_dir_dump_lock);
241	list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
242		if (dd->nodeid_init == nodeid) {
243			log_error(ls, "drop dump seq %llu",
244				 (unsigned long long)dd->seq_init);
245			list_del(&dd->list);
246			kfree(dd);
247		}
248	}
249	write_unlock_bh(&ls->ls_dir_dump_lock);
250}
251
252static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
253{
254	struct dlm_dir_dump *iter, *dd = NULL;
255
256	read_lock_bh(&ls->ls_dir_dump_lock);
257	list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
258		if (iter->nodeid_init == nodeid) {
259			dd = iter;
260			break;
261		}
262	}
263	read_unlock_bh(&ls->ls_dir_dump_lock);
264
265	return dd;
266}
267
268static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
269{
270	struct dlm_dir_dump *dd;
271
272	dd = lookup_dir_dump(ls, nodeid);
273	if (dd) {
274		log_error(ls, "found ongoing dir dump for node %d, will drop it",
275			  nodeid);
276		drop_dir_ctx(ls, nodeid);
277	}
278
279	dd = kzalloc(sizeof(*dd), GFP_ATOMIC);
280	if (!dd)
281		return NULL;
282
283	dd->seq_init = ls->ls_recover_seq;
284	dd->nodeid_init = nodeid;
285
286	write_lock_bh(&ls->ls_dir_dump_lock);
287	list_add(&dd->list, &ls->ls_dir_dump_list);
288	write_unlock_bh(&ls->ls_dir_dump_lock);
289
290	return dd;
291}
292
293/* Find the rsb where we left off (or start again), then send rsb names
294   for rsb's we're master of and whose directory node matches the requesting
295   node.  inbuf is the rsb name last sent, inlen is the name's length */
296
297void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
298 			   char *outbuf, int outlen, int nodeid)
299{
300	struct list_head *list;
301	struct dlm_rsb *r;
302	int offset = 0, dir_nodeid;
303	struct dlm_dir_dump *dd;
304	__be16 be_namelen;
305
306	read_lock_bh(&ls->ls_masters_lock);
307
308	if (inlen > 1) {
309		dd = lookup_dir_dump(ls, nodeid);
310		if (!dd) {
311			log_error(ls, "failed to lookup dir dump context nodeid: %d",
312				  nodeid);
313			goto out;
314		}
315
316		/* next chunk in dump */
317		r = find_rsb_root(ls, inbuf, inlen);
318		if (!r) {
319			log_error(ls, "copy_master_names from %d start %d %.*s",
320				  nodeid, inlen, inlen, inbuf);
321			goto out;
322		}
323		list = r->res_masters_list.next;
324
325		/* sanity checks */
326		if (dd->last != &r->res_masters_list ||
327		    dd->seq_init != ls->ls_recover_seq) {
328			log_error(ls, "failed dir dump sanity check seq_init: %llu seq: %llu",
329				  (unsigned long long)dd->seq_init,
330				  (unsigned long long)ls->ls_recover_seq);
331			goto out;
332		}
 
333	} else {
334		dd = init_dir_dump(ls, nodeid);
335		if (!dd) {
336			log_error(ls, "failed to allocate dir dump context");
337			goto out;
338		}
339
340		/* start dump */
341		list = ls->ls_masters_list.next;
342		dd->last = list;
343	}
344
345	for (offset = 0; list != &ls->ls_masters_list; list = list->next) {
346		r = list_entry(list, struct dlm_rsb, res_masters_list);
347		dir_nodeid = dlm_dir_nodeid(r);
348		if (dir_nodeid != nodeid)
349			continue;
350
351		/*
352		 * The block ends when we can't fit the following in the
353		 * remaining buffer space:
354		 * namelen (uint16_t) +
355		 * name (r->res_length) +
356		 * end-of-block record 0x0000 (uint16_t)
357		 */
358
359		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
360			/* Write end-of-block record */
361			be_namelen = cpu_to_be16(0);
362			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
363			offset += sizeof(__be16);
364			dd->sent_msg++;
365			goto out;
366		}
367
368		be_namelen = cpu_to_be16(r->res_length);
369		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
370		offset += sizeof(__be16);
371		memcpy(outbuf + offset, r->res_name, r->res_length);
372		offset += r->res_length;
373		dd->sent_res++;
374		dd->last = list;
375	}
376
377	/*
378	 * If we've reached the end of the list (and there's room) write a
379	 * terminating record.
380	 */
381
382	if ((list == &ls->ls_masters_list) &&
383	    (offset + sizeof(uint16_t) <= outlen)) {
384		/* end dump */
385		be_namelen = cpu_to_be16(0xFFFF);
386		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
387		offset += sizeof(__be16);
388		dd->sent_msg++;
389		log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
390			  nodeid, dd->sent_res, dd->sent_msg);
391
392		write_lock_bh(&ls->ls_dir_dump_lock);
393		list_del_init(&dd->list);
394		write_unlock_bh(&ls->ls_dir_dump_lock);
395		kfree(dd);
396	}
397 out:
398	read_unlock_bh(&ls->ls_masters_lock);
399}
400