Linux Audio

Check our new training course

Loading...
v5.4
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
  7**
 
 
 
  8**
  9*******************************************************************************
 10******************************************************************************/
 11
 12#include "dlm_internal.h"
 13#include "lockspace.h"
 14#include "member.h"
 15#include "lowcomms.h"
 16#include "rcom.h"
 17#include "config.h"
 18#include "memory.h"
 19#include "recover.h"
 20#include "util.h"
 21#include "lock.h"
 22#include "dir.h"
 23
 24/*
 25 * We use the upper 16 bits of the hash value to select the directory node.
 26 * Low bits are used for distribution of rsb's among hash buckets on each node.
 27 *
 28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
 29 * num_nodes to the hash value.  This value in the desired range is used as an
 30 * offset into the sorted list of nodeid's to give the particular nodeid.
 31 */
 32
 33int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
 34{
 35	uint32_t node;
 36
 37	if (ls->ls_num_nodes == 1)
 38		return dlm_our_nodeid();
 39	else {
 40		node = (hash >> 16) % ls->ls_total_weight;
 41		return ls->ls_node_array[node];
 42	}
 43}
 44
 45int dlm_dir_nodeid(struct dlm_rsb *r)
 46{
 47	return r->res_dir_nodeid;
 48}
 49
 50void dlm_recover_dir_nodeid(struct dlm_ls *ls)
 51{
 52	struct dlm_rsb *r;
 53
 54	down_read(&ls->ls_root_sem);
 55	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 56		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
 57	}
 58	up_read(&ls->ls_root_sem);
 59}
 60
 61int dlm_recover_directory(struct dlm_ls *ls)
 62{
 63	struct dlm_member *memb;
 64	char *b, *last_name = NULL;
 65	int error = -ENOMEM, last_len, nodeid, result;
 66	uint16_t namelen;
 67	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
 68
 69	log_rinfo(ls, "dlm_recover_directory");
 70
 71	if (dlm_no_directory(ls))
 72		goto out_status;
 73
 74	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
 75	if (!last_name)
 76		goto out;
 77
 78	list_for_each_entry(memb, &ls->ls_nodes, list) {
 79		if (memb->nodeid == dlm_our_nodeid())
 80			continue;
 81
 82		memset(last_name, 0, DLM_RESNAME_MAXLEN);
 83		last_len = 0;
 84
 85		for (;;) {
 86			int left;
 87			error = dlm_recovery_stopped(ls);
 88			if (error)
 89				goto out_free;
 90
 91			error = dlm_rcom_names(ls, memb->nodeid,
 92					       last_name, last_len);
 93			if (error)
 94				goto out_free;
 95
 96			cond_resched();
 97
 98			/*
 99			 * pick namelen/name pairs out of received buffer
100			 */
101
102			b = ls->ls_recover_buf->rc_buf;
103			left = ls->ls_recover_buf->rc_header.h_length;
104			left -= sizeof(struct dlm_rcom);
105
106			for (;;) {
107				__be16 v;
108
109				error = -EINVAL;
110				if (left < sizeof(__be16))
111					goto out_free;
112
113				memcpy(&v, b, sizeof(__be16));
114				namelen = be16_to_cpu(v);
115				b += sizeof(__be16);
116				left -= sizeof(__be16);
117
118				/* namelen of 0xFFFFF marks end of names for
119				   this node; namelen of 0 marks end of the
120				   buffer */
121
122				if (namelen == 0xFFFF)
123					goto done;
124				if (!namelen)
125					break;
126
127				if (namelen > left)
128					goto out_free;
129
130				if (namelen > DLM_RESNAME_MAXLEN)
131					goto out_free;
132
133				error = dlm_master_lookup(ls, memb->nodeid,
134							  b, namelen,
135							  DLM_LU_RECOVER_DIR,
136							  &nodeid, &result);
137				if (error) {
138					log_error(ls, "recover_dir lookup %d",
139						  error);
140					goto out_free;
141				}
142
143				/* The name was found in rsbtbl, but the
144				 * master nodeid is different from
145				 * memb->nodeid which says it is the master.
146				 * This should not happen. */
147
148				if (result == DLM_LU_MATCH &&
149				    nodeid != memb->nodeid) {
150					count_bad++;
151					log_error(ls, "recover_dir lookup %d "
152						  "nodeid %d memb %d bad %u",
153						  result, nodeid, memb->nodeid,
154						  count_bad);
155					print_hex_dump_bytes("dlm_recover_dir ",
156							     DUMP_PREFIX_NONE,
157							     b, namelen);
158				}
159
160				/* The name was found in rsbtbl, and the
161				 * master nodeid matches memb->nodeid. */
162
163				if (result == DLM_LU_MATCH &&
164				    nodeid == memb->nodeid) {
165					count_match++;
166				}
167
168				/* The name was not found in rsbtbl and was
169				 * added with memb->nodeid as the master. */
170
171				if (result == DLM_LU_ADD) {
172					count_add++;
173				}
174
175				last_len = namelen;
176				memcpy(last_name, b, namelen);
177				b += namelen;
178				left -= namelen;
179				count++;
180			}
181		}
182	 done:
183		;
184	}
185
186 out_status:
187	error = 0;
188	dlm_set_recover_status(ls, DLM_RS_DIR);
189
190	log_rinfo(ls, "dlm_recover_directory %u in %u new",
191		  count, count_add);
192 out_free:
193	kfree(last_name);
194 out:
195	return error;
196}
197
198static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
199{
200	struct dlm_rsb *r;
201	uint32_t hash, bucket;
202	int rv;
203
204	hash = jhash(name, len, 0);
205	bucket = hash & (ls->ls_rsbtbl_size - 1);
206
207	spin_lock(&ls->ls_rsbtbl[bucket].lock);
208	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
209	if (rv)
210		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
211					 name, len, &r);
212	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
213
214	if (!rv)
215		return r;
216
217	down_read(&ls->ls_root_sem);
218	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
219		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
220			up_read(&ls->ls_root_sem);
221			log_debug(ls, "find_rsb_root revert to root_list %s",
222				  r->res_name);
223			return r;
224		}
225	}
226	up_read(&ls->ls_root_sem);
227	return NULL;
228}
229
230/* Find the rsb where we left off (or start again), then send rsb names
231   for rsb's we're master of and whose directory node matches the requesting
232   node.  inbuf is the rsb name last sent, inlen is the name's length */
233
234void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
235 			   char *outbuf, int outlen, int nodeid)
236{
237	struct list_head *list;
238	struct dlm_rsb *r;
239	int offset = 0, dir_nodeid;
240	__be16 be_namelen;
241
242	down_read(&ls->ls_root_sem);
243
244	if (inlen > 1) {
245		r = find_rsb_root(ls, inbuf, inlen);
246		if (!r) {
247			inbuf[inlen - 1] = '\0';
248			log_error(ls, "copy_master_names from %d start %d %s",
249				  nodeid, inlen, inbuf);
250			goto out;
251		}
252		list = r->res_root_list.next;
253	} else {
254		list = ls->ls_root_list.next;
255	}
256
257	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
258		r = list_entry(list, struct dlm_rsb, res_root_list);
259		if (r->res_nodeid)
260			continue;
261
262		dir_nodeid = dlm_dir_nodeid(r);
263		if (dir_nodeid != nodeid)
264			continue;
265
266		/*
267		 * The block ends when we can't fit the following in the
268		 * remaining buffer space:
269		 * namelen (uint16_t) +
270		 * name (r->res_length) +
271		 * end-of-block record 0x0000 (uint16_t)
272		 */
273
274		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
275			/* Write end-of-block record */
276			be_namelen = cpu_to_be16(0);
277			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
278			offset += sizeof(__be16);
279			ls->ls_recover_dir_sent_msg++;
280			goto out;
281		}
282
283		be_namelen = cpu_to_be16(r->res_length);
284		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
285		offset += sizeof(__be16);
286		memcpy(outbuf + offset, r->res_name, r->res_length);
287		offset += r->res_length;
288		ls->ls_recover_dir_sent_res++;
289	}
290
291	/*
292	 * If we've reached the end of the list (and there's room) write a
293	 * terminating record.
294	 */
295
296	if ((list == &ls->ls_root_list) &&
297	    (offset + sizeof(uint16_t) <= outlen)) {
298		be_namelen = cpu_to_be16(0xFFFF);
299		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
300		offset += sizeof(__be16);
301		ls->ls_recover_dir_sent_msg++;
302	}
303 out:
304	up_read(&ls->ls_root_sem);
305}
306
v4.10.11
 
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "lowcomms.h"
 18#include "rcom.h"
 19#include "config.h"
 20#include "memory.h"
 21#include "recover.h"
 22#include "util.h"
 23#include "lock.h"
 24#include "dir.h"
 25
 26/*
 27 * We use the upper 16 bits of the hash value to select the directory node.
 28 * Low bits are used for distribution of rsb's among hash buckets on each node.
 29 *
 30 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
 31 * num_nodes to the hash value.  This value in the desired range is used as an
 32 * offset into the sorted list of nodeid's to give the particular nodeid.
 33 */
 34
 35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
 36{
 37	uint32_t node;
 38
 39	if (ls->ls_num_nodes == 1)
 40		return dlm_our_nodeid();
 41	else {
 42		node = (hash >> 16) % ls->ls_total_weight;
 43		return ls->ls_node_array[node];
 44	}
 45}
 46
 47int dlm_dir_nodeid(struct dlm_rsb *r)
 48{
 49	return r->res_dir_nodeid;
 50}
 51
 52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
 53{
 54	struct dlm_rsb *r;
 55
 56	down_read(&ls->ls_root_sem);
 57	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 58		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
 59	}
 60	up_read(&ls->ls_root_sem);
 61}
 62
 63int dlm_recover_directory(struct dlm_ls *ls)
 64{
 65	struct dlm_member *memb;
 66	char *b, *last_name = NULL;
 67	int error = -ENOMEM, last_len, nodeid, result;
 68	uint16_t namelen;
 69	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
 70
 71	log_rinfo(ls, "dlm_recover_directory");
 72
 73	if (dlm_no_directory(ls))
 74		goto out_status;
 75
 76	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
 77	if (!last_name)
 78		goto out;
 79
 80	list_for_each_entry(memb, &ls->ls_nodes, list) {
 81		if (memb->nodeid == dlm_our_nodeid())
 82			continue;
 83
 84		memset(last_name, 0, DLM_RESNAME_MAXLEN);
 85		last_len = 0;
 86
 87		for (;;) {
 88			int left;
 89			error = dlm_recovery_stopped(ls);
 90			if (error)
 91				goto out_free;
 92
 93			error = dlm_rcom_names(ls, memb->nodeid,
 94					       last_name, last_len);
 95			if (error)
 96				goto out_free;
 97
 98			cond_resched();
 99
100			/*
101			 * pick namelen/name pairs out of received buffer
102			 */
103
104			b = ls->ls_recover_buf->rc_buf;
105			left = ls->ls_recover_buf->rc_header.h_length;
106			left -= sizeof(struct dlm_rcom);
107
108			for (;;) {
109				__be16 v;
110
111				error = -EINVAL;
112				if (left < sizeof(__be16))
113					goto out_free;
114
115				memcpy(&v, b, sizeof(__be16));
116				namelen = be16_to_cpu(v);
117				b += sizeof(__be16);
118				left -= sizeof(__be16);
119
120				/* namelen of 0xFFFFF marks end of names for
121				   this node; namelen of 0 marks end of the
122				   buffer */
123
124				if (namelen == 0xFFFF)
125					goto done;
126				if (!namelen)
127					break;
128
129				if (namelen > left)
130					goto out_free;
131
132				if (namelen > DLM_RESNAME_MAXLEN)
133					goto out_free;
134
135				error = dlm_master_lookup(ls, memb->nodeid,
136							  b, namelen,
137							  DLM_LU_RECOVER_DIR,
138							  &nodeid, &result);
139				if (error) {
140					log_error(ls, "recover_dir lookup %d",
141						  error);
142					goto out_free;
143				}
144
145				/* The name was found in rsbtbl, but the
146				 * master nodeid is different from
147				 * memb->nodeid which says it is the master.
148				 * This should not happen. */
149
150				if (result == DLM_LU_MATCH &&
151				    nodeid != memb->nodeid) {
152					count_bad++;
153					log_error(ls, "recover_dir lookup %d "
154						  "nodeid %d memb %d bad %u",
155						  result, nodeid, memb->nodeid,
156						  count_bad);
157					print_hex_dump_bytes("dlm_recover_dir ",
158							     DUMP_PREFIX_NONE,
159							     b, namelen);
160				}
161
162				/* The name was found in rsbtbl, and the
163				 * master nodeid matches memb->nodeid. */
164
165				if (result == DLM_LU_MATCH &&
166				    nodeid == memb->nodeid) {
167					count_match++;
168				}
169
170				/* The name was not found in rsbtbl and was
171				 * added with memb->nodeid as the master. */
172
173				if (result == DLM_LU_ADD) {
174					count_add++;
175				}
176
177				last_len = namelen;
178				memcpy(last_name, b, namelen);
179				b += namelen;
180				left -= namelen;
181				count++;
182			}
183		}
184	 done:
185		;
186	}
187
188 out_status:
189	error = 0;
190	dlm_set_recover_status(ls, DLM_RS_DIR);
191
192	log_rinfo(ls, "dlm_recover_directory %u in %u new",
193		  count, count_add);
194 out_free:
195	kfree(last_name);
196 out:
197	return error;
198}
199
200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
201{
202	struct dlm_rsb *r;
203	uint32_t hash, bucket;
204	int rv;
205
206	hash = jhash(name, len, 0);
207	bucket = hash & (ls->ls_rsbtbl_size - 1);
208
209	spin_lock(&ls->ls_rsbtbl[bucket].lock);
210	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211	if (rv)
212		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213					 name, len, &r);
214	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215
216	if (!rv)
217		return r;
218
219	down_read(&ls->ls_root_sem);
220	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222			up_read(&ls->ls_root_sem);
223			log_debug(ls, "find_rsb_root revert to root_list %s",
224				  r->res_name);
225			return r;
226		}
227	}
228	up_read(&ls->ls_root_sem);
229	return NULL;
230}
231
232/* Find the rsb where we left off (or start again), then send rsb names
233   for rsb's we're master of and whose directory node matches the requesting
234   node.  inbuf is the rsb name last sent, inlen is the name's length */
235
236void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237 			   char *outbuf, int outlen, int nodeid)
238{
239	struct list_head *list;
240	struct dlm_rsb *r;
241	int offset = 0, dir_nodeid;
242	__be16 be_namelen;
243
244	down_read(&ls->ls_root_sem);
245
246	if (inlen > 1) {
247		r = find_rsb_root(ls, inbuf, inlen);
248		if (!r) {
249			inbuf[inlen - 1] = '\0';
250			log_error(ls, "copy_master_names from %d start %d %s",
251				  nodeid, inlen, inbuf);
252			goto out;
253		}
254		list = r->res_root_list.next;
255	} else {
256		list = ls->ls_root_list.next;
257	}
258
259	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260		r = list_entry(list, struct dlm_rsb, res_root_list);
261		if (r->res_nodeid)
262			continue;
263
264		dir_nodeid = dlm_dir_nodeid(r);
265		if (dir_nodeid != nodeid)
266			continue;
267
268		/*
269		 * The block ends when we can't fit the following in the
270		 * remaining buffer space:
271		 * namelen (uint16_t) +
272		 * name (r->res_length) +
273		 * end-of-block record 0x0000 (uint16_t)
274		 */
275
276		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277			/* Write end-of-block record */
278			be_namelen = cpu_to_be16(0);
279			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280			offset += sizeof(__be16);
281			ls->ls_recover_dir_sent_msg++;
282			goto out;
283		}
284
285		be_namelen = cpu_to_be16(r->res_length);
286		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287		offset += sizeof(__be16);
288		memcpy(outbuf + offset, r->res_name, r->res_length);
289		offset += r->res_length;
290		ls->ls_recover_dir_sent_res++;
291	}
292
293	/*
294	 * If we've reached the end of the list (and there's room) write a
295	 * terminating record.
296	 */
297
298	if ((list == &ls->ls_root_list) &&
299	    (offset + sizeof(uint16_t) <= outlen)) {
300		be_namelen = cpu_to_be16(0xFFFF);
301		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302		offset += sizeof(__be16);
303		ls->ls_recover_dir_sent_msg++;
304	}
305 out:
306	up_read(&ls->ls_root_sem);
307}
308