Linux Audio

Check our new training course

Loading...
v3.5.6
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "lowcomms.h"
 18#include "rcom.h"
 19#include "config.h"
 20#include "memory.h"
 21#include "recover.h"
 22#include "util.h"
 23#include "lock.h"
 24#include "dir.h"
 25
 26
 27static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
 28{
 29	spin_lock(&ls->ls_recover_list_lock);
 30	list_add(&de->list, &ls->ls_recover_list);
 31	spin_unlock(&ls->ls_recover_list_lock);
 32}
 33
 34static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
 35{
 36	int found = 0;
 37	struct dlm_direntry *de;
 38
 39	spin_lock(&ls->ls_recover_list_lock);
 40	list_for_each_entry(de, &ls->ls_recover_list, list) {
 41		if (de->length == len) {
 42			list_del(&de->list);
 43			de->master_nodeid = 0;
 44			memset(de->name, 0, len);
 45			found = 1;
 46			break;
 47		}
 48	}
 49	spin_unlock(&ls->ls_recover_list_lock);
 50
 51	if (!found)
 52		de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
 53	return de;
 54}
 55
 56void dlm_clear_free_entries(struct dlm_ls *ls)
 57{
 58	struct dlm_direntry *de;
 59
 60	spin_lock(&ls->ls_recover_list_lock);
 61	while (!list_empty(&ls->ls_recover_list)) {
 62		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
 63				list);
 64		list_del(&de->list);
 65		kfree(de);
 66	}
 67	spin_unlock(&ls->ls_recover_list_lock);
 68}
 69
 70/*
 71 * We use the upper 16 bits of the hash value to select the directory node.
 72 * Low bits are used for distribution of rsb's among hash buckets on each node.
 73 *
 74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
 75 * num_nodes to the hash value.  This value in the desired range is used as an
 76 * offset into the sorted list of nodeid's to give the particular nodeid.
 77 */
 78
 79int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
 80{
 81	struct list_head *tmp;
 82	struct dlm_member *memb = NULL;
 83	uint32_t node, n = 0;
 84	int nodeid;
 85
 86	if (ls->ls_num_nodes == 1) {
 87		nodeid = dlm_our_nodeid();
 88		goto out;
 89	}
 90
 91	if (ls->ls_node_array) {
 92		node = (hash >> 16) % ls->ls_total_weight;
 93		nodeid = ls->ls_node_array[node];
 94		goto out;
 95	}
 96
 97	/* make_member_array() failed to kmalloc ls_node_array... */
 98
 99	node = (hash >> 16) % ls->ls_num_nodes;
100
101	list_for_each(tmp, &ls->ls_nodes) {
102		if (n++ != node)
103			continue;
104		memb = list_entry(tmp, struct dlm_member, list);
105		break;
106	}
107
108	DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109				 ls->ls_num_nodes, n, node););
110	nodeid = memb->nodeid;
111 out:
112	return nodeid;
113}
114
115int dlm_dir_nodeid(struct dlm_rsb *r)
116{
117	return dlm_hash2nodeid(r->res_ls, r->res_hash);
118}
119
120static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121{
122	uint32_t val;
123
124	val = jhash(name, len, 0);
125	val &= (ls->ls_dirtbl_size - 1);
126
127	return val;
128}
129
130static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131{
132	uint32_t bucket;
133
134	bucket = dir_hash(ls, de->name, de->length);
135	list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136}
137
138static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139					  int namelen, uint32_t bucket)
140{
141	struct dlm_direntry *de;
142
143	list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144		if (de->length == namelen && !memcmp(name, de->name, namelen))
145			goto out;
146	}
147	de = NULL;
148 out:
149	return de;
150}
151
152void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153{
154	struct dlm_direntry *de;
155	uint32_t bucket;
156
157	bucket = dir_hash(ls, name, namelen);
158
159	spin_lock(&ls->ls_dirtbl[bucket].lock);
160
161	de = search_bucket(ls, name, namelen, bucket);
162
163	if (!de) {
164		log_error(ls, "remove fr %u none", nodeid);
165		goto out;
166	}
167
168	if (de->master_nodeid != nodeid) {
169		log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170		goto out;
171	}
172
173	list_del(&de->list);
174	kfree(de);
175 out:
176	spin_unlock(&ls->ls_dirtbl[bucket].lock);
177}
178
179void dlm_dir_clear(struct dlm_ls *ls)
180{
181	struct list_head *head;
182	struct dlm_direntry *de;
183	int i;
184
185	DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187	for (i = 0; i < ls->ls_dirtbl_size; i++) {
188		spin_lock(&ls->ls_dirtbl[i].lock);
189		head = &ls->ls_dirtbl[i].list;
190		while (!list_empty(head)) {
191			de = list_entry(head->next, struct dlm_direntry, list);
192			list_del(&de->list);
193			put_free_de(ls, de);
194		}
195		spin_unlock(&ls->ls_dirtbl[i].lock);
196	}
 
197}
198
199int dlm_recover_directory(struct dlm_ls *ls)
200{
201	struct dlm_member *memb;
202	struct dlm_direntry *de;
203	char *b, *last_name = NULL;
204	int error = -ENOMEM, last_len, count = 0;
205	uint16_t namelen;
 
206
207	log_debug(ls, "dlm_recover_directory");
208
209	if (dlm_no_directory(ls))
210		goto out_status;
211
212	dlm_dir_clear(ls);
213
214	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215	if (!last_name)
216		goto out;
217
218	list_for_each_entry(memb, &ls->ls_nodes, list) {
 
 
 
219		memset(last_name, 0, DLM_RESNAME_MAXLEN);
220		last_len = 0;
221
222		for (;;) {
223			int left;
224			error = dlm_recovery_stopped(ls);
225			if (error)
226				goto out_free;
227
228			error = dlm_rcom_names(ls, memb->nodeid,
229					       last_name, last_len);
230			if (error)
231				goto out_free;
232
233			schedule();
234
235			/*
236			 * pick namelen/name pairs out of received buffer
237			 */
238
239			b = ls->ls_recover_buf->rc_buf;
240			left = ls->ls_recover_buf->rc_header.h_length;
241			left -= sizeof(struct dlm_rcom);
242
243			for (;;) {
244				__be16 v;
245
246				error = -EINVAL;
247				if (left < sizeof(__be16))
248					goto out_free;
249
250				memcpy(&v, b, sizeof(__be16));
251				namelen = be16_to_cpu(v);
252				b += sizeof(__be16);
253				left -= sizeof(__be16);
254
255				/* namelen of 0xFFFFF marks end of names for
256				   this node; namelen of 0 marks end of the
257				   buffer */
258
259				if (namelen == 0xFFFF)
260					goto done;
261				if (!namelen)
262					break;
263
264				if (namelen > left)
265					goto out_free;
266
267				if (namelen > DLM_RESNAME_MAXLEN)
268					goto out_free;
269
270				error = -ENOMEM;
271				de = get_free_de(ls, namelen);
272				if (!de)
 
 
 
 
273					goto out_free;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
274
275				de->master_nodeid = memb->nodeid;
276				de->length = namelen;
277				last_len = namelen;
278				memcpy(de->name, b, namelen);
279				memcpy(last_name, b, namelen);
280				b += namelen;
281				left -= namelen;
282
283				add_entry_to_hash(ls, de);
284				count++;
285			}
286		}
287         done:
288		;
289	}
290
291 out_status:
292	error = 0;
293	log_debug(ls, "dlm_recover_directory %d entries", count);
 
 
 
294 out_free:
295	kfree(last_name);
296 out:
297	dlm_clear_free_entries(ls);
298	return error;
299}
300
301static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302		     int namelen, int *r_nodeid)
303{
304	struct dlm_direntry *de, *tmp;
305	uint32_t bucket;
306
307	bucket = dir_hash(ls, name, namelen);
308
309	spin_lock(&ls->ls_dirtbl[bucket].lock);
310	de = search_bucket(ls, name, namelen, bucket);
311	if (de) {
312		*r_nodeid = de->master_nodeid;
313		spin_unlock(&ls->ls_dirtbl[bucket].lock);
314		if (*r_nodeid == nodeid)
315			return -EEXIST;
316		return 0;
317	}
318
319	spin_unlock(&ls->ls_dirtbl[bucket].lock);
320
321	if (namelen > DLM_RESNAME_MAXLEN)
322		return -EINVAL;
323
324	de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325	if (!de)
326		return -ENOMEM;
327
328	de->master_nodeid = nodeid;
329	de->length = namelen;
330	memcpy(de->name, name, namelen);
331
332	spin_lock(&ls->ls_dirtbl[bucket].lock);
333	tmp = search_bucket(ls, name, namelen, bucket);
334	if (tmp) {
335		kfree(de);
336		de = tmp;
337	} else {
338		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339	}
340	*r_nodeid = de->master_nodeid;
341	spin_unlock(&ls->ls_dirtbl[bucket].lock);
342	return 0;
343}
344
345int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346		   int *r_nodeid)
347{
348	return get_entry(ls, nodeid, name, namelen, r_nodeid);
349}
350
351static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352{
353	struct dlm_rsb *r;
354	uint32_t hash, bucket;
355	int rv;
356
357	hash = jhash(name, len, 0);
358	bucket = hash & (ls->ls_rsbtbl_size - 1);
359
360	spin_lock(&ls->ls_rsbtbl[bucket].lock);
361	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r);
362	if (rv)
363		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
364					 name, len, 0, &r);
365	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
366
367	if (!rv)
368		return r;
369
370	down_read(&ls->ls_root_sem);
371	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
372		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
373			up_read(&ls->ls_root_sem);
374			log_error(ls, "find_rsb_root revert to root_list %s",
375				  r->res_name);
376			return r;
377		}
378	}
379	up_read(&ls->ls_root_sem);
380	return NULL;
381}
382
383/* Find the rsb where we left off (or start again), then send rsb names
384   for rsb's we're master of and whose directory node matches the requesting
385   node.  inbuf is the rsb name last sent, inlen is the name's length */
386
387void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
388 			   char *outbuf, int outlen, int nodeid)
389{
390	struct list_head *list;
391	struct dlm_rsb *r;
392	int offset = 0, dir_nodeid;
393	__be16 be_namelen;
394
395	down_read(&ls->ls_root_sem);
396
397	if (inlen > 1) {
398		r = find_rsb_root(ls, inbuf, inlen);
399		if (!r) {
400			inbuf[inlen - 1] = '\0';
401			log_error(ls, "copy_master_names from %d start %d %s",
402				  nodeid, inlen, inbuf);
403			goto out;
404		}
405		list = r->res_root_list.next;
406	} else {
407		list = ls->ls_root_list.next;
408	}
409
410	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
411		r = list_entry(list, struct dlm_rsb, res_root_list);
412		if (r->res_nodeid)
413			continue;
414
415		dir_nodeid = dlm_dir_nodeid(r);
416		if (dir_nodeid != nodeid)
417			continue;
418
419		/*
420		 * The block ends when we can't fit the following in the
421		 * remaining buffer space:
422		 * namelen (uint16_t) +
423		 * name (r->res_length) +
424		 * end-of-block record 0x0000 (uint16_t)
425		 */
426
427		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
428			/* Write end-of-block record */
429			be_namelen = cpu_to_be16(0);
430			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
431			offset += sizeof(__be16);
 
432			goto out;
433		}
434
435		be_namelen = cpu_to_be16(r->res_length);
436		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
437		offset += sizeof(__be16);
438		memcpy(outbuf + offset, r->res_name, r->res_length);
439		offset += r->res_length;
 
440	}
441
442	/*
443	 * If we've reached the end of the list (and there's room) write a
444	 * terminating record.
445	 */
446
447	if ((list == &ls->ls_root_list) &&
448	    (offset + sizeof(uint16_t) <= outlen)) {
449		be_namelen = cpu_to_be16(0xFFFF);
450		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
451		offset += sizeof(__be16);
 
452	}
453
454 out:
455	up_read(&ls->ls_root_sem);
456}
457
v3.15
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "lowcomms.h"
 18#include "rcom.h"
 19#include "config.h"
 20#include "memory.h"
 21#include "recover.h"
 22#include "util.h"
 23#include "lock.h"
 24#include "dir.h"
 25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 26/*
 27 * We use the upper 16 bits of the hash value to select the directory node.
 28 * Low bits are used for distribution of rsb's among hash buckets on each node.
 29 *
 30 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
 31 * num_nodes to the hash value.  This value in the desired range is used as an
 32 * offset into the sorted list of nodeid's to give the particular nodeid.
 33 */
 34
 35int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
 36{
 37	uint32_t node;
 
 
 
 38
 39	if (ls->ls_num_nodes == 1)
 40		return dlm_our_nodeid();
 41	else {
 
 
 
 42		node = (hash >> 16) % ls->ls_total_weight;
 43		return ls->ls_node_array[node];
 
 44	}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 45}
 46
 47int dlm_dir_nodeid(struct dlm_rsb *r)
 48{
 49	return r->res_dir_nodeid;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 50}
 51
 52void dlm_recover_dir_nodeid(struct dlm_ls *ls)
 53{
 54	struct dlm_rsb *r;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 55
 56	down_read(&ls->ls_root_sem);
 57	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
 58		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 59	}
 60	up_read(&ls->ls_root_sem);
 61}
 62
 63int dlm_recover_directory(struct dlm_ls *ls)
 64{
 65	struct dlm_member *memb;
 
 66	char *b, *last_name = NULL;
 67	int error = -ENOMEM, last_len, nodeid, result;
 68	uint16_t namelen;
 69	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
 70
 71	log_rinfo(ls, "dlm_recover_directory");
 72
 73	if (dlm_no_directory(ls))
 74		goto out_status;
 75
 
 
 76	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
 77	if (!last_name)
 78		goto out;
 79
 80	list_for_each_entry(memb, &ls->ls_nodes, list) {
 81		if (memb->nodeid == dlm_our_nodeid())
 82			continue;
 83
 84		memset(last_name, 0, DLM_RESNAME_MAXLEN);
 85		last_len = 0;
 86
 87		for (;;) {
 88			int left;
 89			error = dlm_recovery_stopped(ls);
 90			if (error)
 91				goto out_free;
 92
 93			error = dlm_rcom_names(ls, memb->nodeid,
 94					       last_name, last_len);
 95			if (error)
 96				goto out_free;
 97
 98			cond_resched();
 99
100			/*
101			 * pick namelen/name pairs out of received buffer
102			 */
103
104			b = ls->ls_recover_buf->rc_buf;
105			left = ls->ls_recover_buf->rc_header.h_length;
106			left -= sizeof(struct dlm_rcom);
107
108			for (;;) {
109				__be16 v;
110
111				error = -EINVAL;
112				if (left < sizeof(__be16))
113					goto out_free;
114
115				memcpy(&v, b, sizeof(__be16));
116				namelen = be16_to_cpu(v);
117				b += sizeof(__be16);
118				left -= sizeof(__be16);
119
120				/* namelen of 0xFFFFF marks end of names for
121				   this node; namelen of 0 marks end of the
122				   buffer */
123
124				if (namelen == 0xFFFF)
125					goto done;
126				if (!namelen)
127					break;
128
129				if (namelen > left)
130					goto out_free;
131
132				if (namelen > DLM_RESNAME_MAXLEN)
133					goto out_free;
134
135				error = dlm_master_lookup(ls, memb->nodeid,
136							  b, namelen,
137							  DLM_LU_RECOVER_DIR,
138							  &nodeid, &result);
139				if (error) {
140					log_error(ls, "recover_dir lookup %d",
141						  error);
142					goto out_free;
143				}
144
145				/* The name was found in rsbtbl, but the
146				 * master nodeid is different from
147				 * memb->nodeid which says it is the master.
148				 * This should not happen. */
149
150				if (result == DLM_LU_MATCH &&
151				    nodeid != memb->nodeid) {
152					count_bad++;
153					log_error(ls, "recover_dir lookup %d "
154						  "nodeid %d memb %d bad %u",
155						  result, nodeid, memb->nodeid,
156						  count_bad);
157					print_hex_dump_bytes("dlm_recover_dir ",
158							     DUMP_PREFIX_NONE,
159							     b, namelen);
160				}
161
162				/* The name was found in rsbtbl, and the
163				 * master nodeid matches memb->nodeid. */
164
165				if (result == DLM_LU_MATCH &&
166				    nodeid == memb->nodeid) {
167					count_match++;
168				}
169
170				/* The name was not found in rsbtbl and was
171				 * added with memb->nodeid as the master. */
172
173				if (result == DLM_LU_ADD) {
174					count_add++;
175				}
176
 
 
177				last_len = namelen;
 
178				memcpy(last_name, b, namelen);
179				b += namelen;
180				left -= namelen;
 
 
181				count++;
182			}
183		}
184	 done:
185		;
186	}
187
188 out_status:
189	error = 0;
190	dlm_set_recover_status(ls, DLM_RS_DIR);
191
192	log_rinfo(ls, "dlm_recover_directory %u in %u new",
193		  count, count_add);
194 out_free:
195	kfree(last_name);
196 out:
 
197	return error;
198}
199
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
201{
202	struct dlm_rsb *r;
203	uint32_t hash, bucket;
204	int rv;
205
206	hash = jhash(name, len, 0);
207	bucket = hash & (ls->ls_rsbtbl_size - 1);
208
209	spin_lock(&ls->ls_rsbtbl[bucket].lock);
210	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211	if (rv)
212		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213					 name, len, &r);
214	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215
216	if (!rv)
217		return r;
218
219	down_read(&ls->ls_root_sem);
220	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222			up_read(&ls->ls_root_sem);
223			log_debug(ls, "find_rsb_root revert to root_list %s",
224				  r->res_name);
225			return r;
226		}
227	}
228	up_read(&ls->ls_root_sem);
229	return NULL;
230}
231
232/* Find the rsb where we left off (or start again), then send rsb names
233   for rsb's we're master of and whose directory node matches the requesting
234   node.  inbuf is the rsb name last sent, inlen is the name's length */
235
236void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237 			   char *outbuf, int outlen, int nodeid)
238{
239	struct list_head *list;
240	struct dlm_rsb *r;
241	int offset = 0, dir_nodeid;
242	__be16 be_namelen;
243
244	down_read(&ls->ls_root_sem);
245
246	if (inlen > 1) {
247		r = find_rsb_root(ls, inbuf, inlen);
248		if (!r) {
249			inbuf[inlen - 1] = '\0';
250			log_error(ls, "copy_master_names from %d start %d %s",
251				  nodeid, inlen, inbuf);
252			goto out;
253		}
254		list = r->res_root_list.next;
255	} else {
256		list = ls->ls_root_list.next;
257	}
258
259	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260		r = list_entry(list, struct dlm_rsb, res_root_list);
261		if (r->res_nodeid)
262			continue;
263
264		dir_nodeid = dlm_dir_nodeid(r);
265		if (dir_nodeid != nodeid)
266			continue;
267
268		/*
269		 * The block ends when we can't fit the following in the
270		 * remaining buffer space:
271		 * namelen (uint16_t) +
272		 * name (r->res_length) +
273		 * end-of-block record 0x0000 (uint16_t)
274		 */
275
276		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277			/* Write end-of-block record */
278			be_namelen = cpu_to_be16(0);
279			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280			offset += sizeof(__be16);
281			ls->ls_recover_dir_sent_msg++;
282			goto out;
283		}
284
285		be_namelen = cpu_to_be16(r->res_length);
286		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287		offset += sizeof(__be16);
288		memcpy(outbuf + offset, r->res_name, r->res_length);
289		offset += r->res_length;
290		ls->ls_recover_dir_sent_res++;
291	}
292
293	/*
294	 * If we've reached the end of the list (and there's room) write a
295	 * terminating record.
296	 */
297
298	if ((list == &ls->ls_root_list) &&
299	    (offset + sizeof(uint16_t) <= outlen)) {
300		be_namelen = cpu_to_be16(0xFFFF);
301		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302		offset += sizeof(__be16);
303		ls->ls_recover_dir_sent_msg++;
304	}
 
305 out:
306	up_read(&ls->ls_root_sem);
307}
308