Linux Audio

Check our new training course

Real-Time Linux with PREEMPT_RT training

Feb 18-20, 2025
Register
Loading...
v5.4
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
  6**
  7**
  8*******************************************************************************
  9******************************************************************************/
 10
 11#include "dlm_internal.h"
 12#include "lockspace.h"
 13#include "member.h"
 14#include "recoverd.h"
 15#include "recover.h"
 16#include "rcom.h"
 17#include "config.h"
 
 18#include "lowcomms.h"
 19
 20int dlm_slots_version(struct dlm_header *h)
 21{
 22	if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
 23		return 0;
 24	return 1;
 25}
 26
 27void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
 28		   struct dlm_member *memb)
 29{
 30	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 31
 32	if (!dlm_slots_version(&rc->rc_header))
 33		return;
 34
 35	memb->slot = le16_to_cpu(rf->rf_our_slot);
 36	memb->generation = le32_to_cpu(rf->rf_generation);
 37}
 38
 39void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
 40{
 41	struct dlm_slot *slot;
 42	struct rcom_slot *ro;
 43	int i;
 44
 45	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 46
 47	/* ls_slots array is sparse, but not rcom_slots */
 48
 49	for (i = 0; i < ls->ls_slots_size; i++) {
 50		slot = &ls->ls_slots[i];
 51		if (!slot->nodeid)
 52			continue;
 53		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
 54		ro->ro_slot = cpu_to_le16(slot->slot);
 55		ro++;
 56	}
 57}
 58
 59#define SLOT_DEBUG_LINE 128
 60
 61static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
 62		      struct rcom_slot *ro0, struct dlm_slot *array,
 63		      int array_size)
 64{
 65	char line[SLOT_DEBUG_LINE];
 66	int len = SLOT_DEBUG_LINE - 1;
 67	int pos = 0;
 68	int ret, i;
 69
 70	memset(line, 0, sizeof(line));
 71
 72	if (array) {
 73		for (i = 0; i < array_size; i++) {
 74			if (!array[i].nodeid)
 75				continue;
 76
 77			ret = snprintf(line + pos, len - pos, " %d:%d",
 78				       array[i].slot, array[i].nodeid);
 79			if (ret >= len - pos)
 80				break;
 81			pos += ret;
 82		}
 83	} else if (ro0) {
 84		for (i = 0; i < num_slots; i++) {
 85			ret = snprintf(line + pos, len - pos, " %d:%d",
 86				       ro0[i].ro_slot, ro0[i].ro_nodeid);
 87			if (ret >= len - pos)
 88				break;
 89			pos += ret;
 90		}
 91	}
 92
 93	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
 94}
 95
 96int dlm_slots_copy_in(struct dlm_ls *ls)
 97{
 98	struct dlm_member *memb;
 99	struct dlm_rcom *rc = ls->ls_recover_buf;
100	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
101	struct rcom_slot *ro0, *ro;
102	int our_nodeid = dlm_our_nodeid();
103	int i, num_slots;
104	uint32_t gen;
105
106	if (!dlm_slots_version(&rc->rc_header))
107		return -1;
108
109	gen = le32_to_cpu(rf->rf_generation);
110	if (gen <= ls->ls_generation) {
111		log_error(ls, "dlm_slots_copy_in gen %u old %u",
112			  gen, ls->ls_generation);
113	}
114	ls->ls_generation = gen;
115
116	num_slots = le16_to_cpu(rf->rf_num_slots);
117	if (!num_slots)
118		return -1;
119
120	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
121
122	for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
123		ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
124		ro->ro_slot = le16_to_cpu(ro->ro_slot);
125	}
126
127	log_slots(ls, gen, num_slots, ro0, NULL, 0);
128
129	list_for_each_entry(memb, &ls->ls_nodes, list) {
130		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
131			if (ro->ro_nodeid != memb->nodeid)
132				continue;
133			memb->slot = ro->ro_slot;
134			memb->slot_prev = memb->slot;
135			break;
136		}
137
138		if (memb->nodeid == our_nodeid) {
139			if (ls->ls_slot && ls->ls_slot != memb->slot) {
140				log_error(ls, "dlm_slots_copy_in our slot "
141					  "changed %d %d", ls->ls_slot,
142					  memb->slot);
143				return -1;
144			}
145
146			if (!ls->ls_slot)
147				ls->ls_slot = memb->slot;
148		}
149
150		if (!memb->slot) {
151			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
152				   memb->nodeid);
153			return -1;
154		}
155	}
156
157	return 0;
158}
159
160/* for any nodes that do not support slots, we will not have set memb->slot
161   in wait_status_all(), so memb->slot will remain -1, and we will not
162   assign slots or set ls_num_slots here */
163
164int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
165		     struct dlm_slot **slots_out, uint32_t *gen_out)
166{
167	struct dlm_member *memb;
168	struct dlm_slot *array;
169	int our_nodeid = dlm_our_nodeid();
170	int array_size, max_slots, i;
171	int need = 0;
172	int max = 0;
173	int num = 0;
174	uint32_t gen = 0;
175
176	/* our own memb struct will have slot -1 gen 0 */
177
178	list_for_each_entry(memb, &ls->ls_nodes, list) {
179		if (memb->nodeid == our_nodeid) {
180			memb->slot = ls->ls_slot;
181			memb->generation = ls->ls_generation;
182			break;
183		}
184	}
185
186	list_for_each_entry(memb, &ls->ls_nodes, list) {
187		if (memb->generation > gen)
188			gen = memb->generation;
189
190		/* node doesn't support slots */
191
192		if (memb->slot == -1)
193			return -1;
194
195		/* node needs a slot assigned */
196
197		if (!memb->slot)
198			need++;
199
200		/* node has a slot assigned */
201
202		num++;
203
204		if (!max || max < memb->slot)
205			max = memb->slot;
206
207		/* sanity check, once slot is assigned it shouldn't change */
208
209		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
210			log_error(ls, "nodeid %d slot changed %d %d",
211				  memb->nodeid, memb->slot_prev, memb->slot);
212			return -1;
213		}
214		memb->slot_prev = memb->slot;
215	}
216
217	array_size = max + need;
218	array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
219	if (!array)
220		return -ENOMEM;
221
222	num = 0;
223
224	/* fill in slots (offsets) that are used */
225
226	list_for_each_entry(memb, &ls->ls_nodes, list) {
227		if (!memb->slot)
228			continue;
229
230		if (memb->slot > array_size) {
231			log_error(ls, "invalid slot number %d", memb->slot);
232			kfree(array);
233			return -1;
234		}
235
236		array[memb->slot - 1].nodeid = memb->nodeid;
237		array[memb->slot - 1].slot = memb->slot;
238		num++;
239	}
240
241	/* assign new slots from unused offsets */
242
243	list_for_each_entry(memb, &ls->ls_nodes, list) {
244		if (memb->slot)
245			continue;
246
247		for (i = 0; i < array_size; i++) {
248			if (array[i].nodeid)
249				continue;
250
251			memb->slot = i + 1;
252			memb->slot_prev = memb->slot;
253			array[i].nodeid = memb->nodeid;
254			array[i].slot = memb->slot;
255			num++;
256
257			if (!ls->ls_slot && memb->nodeid == our_nodeid)
258				ls->ls_slot = memb->slot;
259			break;
260		}
261
262		if (!memb->slot) {
263			log_error(ls, "no free slot found");
264			kfree(array);
265			return -1;
266		}
267	}
268
269	gen++;
270
271	log_slots(ls, gen, num, NULL, array, array_size);
272
273	max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
274		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
275
276	if (num > max_slots) {
277		log_error(ls, "num_slots %d exceeds max_slots %d",
278			  num, max_slots);
279		kfree(array);
280		return -1;
281	}
282
283	*gen_out = gen;
284	*slots_out = array;
285	*slots_size = array_size;
286	*num_slots = num;
287	return 0;
288}
289
290static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
291{
292	struct dlm_member *memb = NULL;
293	struct list_head *tmp;
294	struct list_head *newlist = &new->list;
295	struct list_head *head = &ls->ls_nodes;
296
297	list_for_each(tmp, head) {
298		memb = list_entry(tmp, struct dlm_member, list);
299		if (new->nodeid < memb->nodeid)
300			break;
301	}
302
303	if (!memb)
304		list_add_tail(newlist, head);
305	else {
306		/* FIXME: can use list macro here */
307		newlist->prev = tmp->prev;
308		newlist->next = tmp;
309		tmp->prev->next = newlist;
310		tmp->prev = newlist;
311	}
312}
313
314static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
315{
316	struct dlm_member *memb;
317	int error;
318
319	memb = kzalloc(sizeof(*memb), GFP_NOFS);
320	if (!memb)
321		return -ENOMEM;
322
323	error = dlm_lowcomms_connect_node(node->nodeid);
324	if (error < 0) {
325		kfree(memb);
326		return error;
327	}
328
329	memb->nodeid = node->nodeid;
330	memb->weight = node->weight;
331	memb->comm_seq = node->comm_seq;
 
332	add_ordered_member(ls, memb);
333	ls->ls_num_nodes++;
334	return 0;
335}
336
337static struct dlm_member *find_memb(struct list_head *head, int nodeid)
338{
339	struct dlm_member *memb;
340
341	list_for_each_entry(memb, head, list) {
342		if (memb->nodeid == nodeid)
343			return memb;
344	}
345	return NULL;
346}
347
348int dlm_is_member(struct dlm_ls *ls, int nodeid)
349{
350	if (find_memb(&ls->ls_nodes, nodeid))
351		return 1;
352	return 0;
353}
354
355int dlm_is_removed(struct dlm_ls *ls, int nodeid)
356{
357	if (find_memb(&ls->ls_nodes_gone, nodeid))
358		return 1;
359	return 0;
360}
361
362static void clear_memb_list(struct list_head *head)
 
363{
364	struct dlm_member *memb;
365
366	while (!list_empty(head)) {
367		memb = list_entry(head->next, struct dlm_member, list);
368		list_del(&memb->list);
 
 
369		kfree(memb);
370	}
371}
372
 
 
 
 
 
373void dlm_clear_members(struct dlm_ls *ls)
374{
375	clear_memb_list(&ls->ls_nodes);
376	ls->ls_num_nodes = 0;
377}
378
379void dlm_clear_members_gone(struct dlm_ls *ls)
380{
381	clear_memb_list(&ls->ls_nodes_gone);
382}
383
384static void make_member_array(struct dlm_ls *ls)
385{
386	struct dlm_member *memb;
387	int i, w, x = 0, total = 0, all_zero = 0, *array;
388
389	kfree(ls->ls_node_array);
390	ls->ls_node_array = NULL;
391
392	list_for_each_entry(memb, &ls->ls_nodes, list) {
393		if (memb->weight)
394			total += memb->weight;
395	}
396
397	/* all nodes revert to weight of 1 if all have weight 0 */
398
399	if (!total) {
400		total = ls->ls_num_nodes;
401		all_zero = 1;
402	}
403
404	ls->ls_total_weight = total;
405	array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
406	if (!array)
407		return;
408
409	list_for_each_entry(memb, &ls->ls_nodes, list) {
410		if (!all_zero && !memb->weight)
411			continue;
412
413		if (all_zero)
414			w = 1;
415		else
416			w = memb->weight;
417
418		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
419
420		for (i = 0; i < w; i++)
421			array[x++] = memb->nodeid;
422	}
423
424	ls->ls_node_array = array;
425}
426
427/* send a status request to all members just to establish comms connections */
428
429static int ping_members(struct dlm_ls *ls)
430{
431	struct dlm_member *memb;
432	int error = 0;
433
434	list_for_each_entry(memb, &ls->ls_nodes, list) {
435		error = dlm_recovery_stopped(ls);
436		if (error)
437			break;
438		error = dlm_rcom_status(ls, memb->nodeid, 0);
439		if (error)
440			break;
441	}
442	if (error)
443		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
444			  error, ls->ls_recover_nodeid);
445	return error;
446}
447
448static void dlm_lsop_recover_prep(struct dlm_ls *ls)
449{
450	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
451		return;
452	ls->ls_ops->recover_prep(ls->ls_ops_arg);
453}
454
455static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
456{
457	struct dlm_slot slot;
458	uint32_t seq;
459	int error;
460
461	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
462		return;
463
464	/* if there is no comms connection with this node
465	   or the present comms connection is newer
466	   than the one when this member was added, then
467	   we consider the node to have failed (versus
468	   being removed due to dlm_release_lockspace) */
469
470	error = dlm_comm_seq(memb->nodeid, &seq);
471
472	if (!error && seq == memb->comm_seq)
473		return;
474
475	slot.nodeid = memb->nodeid;
476	slot.slot = memb->slot;
477
478	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
479}
480
481void dlm_lsop_recover_done(struct dlm_ls *ls)
482{
483	struct dlm_member *memb;
484	struct dlm_slot *slots;
485	int i, num;
486
487	if (!ls->ls_ops || !ls->ls_ops->recover_done)
488		return;
489
490	num = ls->ls_num_nodes;
491	slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
492	if (!slots)
493		return;
494
495	i = 0;
496	list_for_each_entry(memb, &ls->ls_nodes, list) {
497		if (i == num) {
498			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
499			goto out;
500		}
501		slots[i].nodeid = memb->nodeid;
502		slots[i].slot = memb->slot;
503		i++;
504	}
505
506	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
507				 ls->ls_slot, ls->ls_generation);
508 out:
509	kfree(slots);
510}
511
512static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
513						int nodeid)
514{
515	int i;
516
517	for (i = 0; i < rv->nodes_count; i++) {
518		if (rv->nodes[i].nodeid == nodeid)
519			return &rv->nodes[i];
520	}
521	return NULL;
522}
523
524int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
525{
526	struct dlm_member *memb, *safe;
527	struct dlm_config_node *node;
528	int i, error, neg = 0, low = -1;
529
530	/* previously removed members that we've not finished removing need to
531	   count as a negative change so the "neg" recovery steps will happen */
532
533	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
534		log_rinfo(ls, "prev removed member %d", memb->nodeid);
535		neg++;
536	}
537
538	/* move departed members from ls_nodes to ls_nodes_gone */
539
540	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
541		node = find_config_node(rv, memb->nodeid);
542		if (node && !node->new)
543			continue;
544
545		if (!node) {
546			log_rinfo(ls, "remove member %d", memb->nodeid);
547		} else {
548			/* removed and re-added */
549			log_rinfo(ls, "remove member %d comm_seq %u %u",
550				  memb->nodeid, memb->comm_seq, node->comm_seq);
551		}
552
553		neg++;
554		list_move(&memb->list, &ls->ls_nodes_gone);
 
555		ls->ls_num_nodes--;
556		dlm_lsop_recover_slot(ls, memb);
557	}
558
559	/* add new members to ls_nodes */
560
561	for (i = 0; i < rv->nodes_count; i++) {
562		node = &rv->nodes[i];
563		if (dlm_is_member(ls, node->nodeid))
564			continue;
565		dlm_add_member(ls, node);
566		log_rinfo(ls, "add member %d", node->nodeid);
567	}
568
569	list_for_each_entry(memb, &ls->ls_nodes, list) {
570		if (low == -1 || memb->nodeid < low)
571			low = memb->nodeid;
572	}
573	ls->ls_low_nodeid = low;
574
575	make_member_array(ls);
576	*neg_out = neg;
577
578	error = ping_members(ls);
579	if (!error || error == -EPROTO) {
580		/* new_lockspace() may be waiting to know if the config
581		   is good or bad */
582		ls->ls_members_result = error;
583		complete(&ls->ls_members_done);
584	}
 
 
 
 
 
 
585
586	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
587	return error;
588}
589
590/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
591   dlm_ls_start() is called on any of them to start the new recovery. */
592
593int dlm_ls_stop(struct dlm_ls *ls)
594{
595	int new;
596
597	/*
598	 * Prevent dlm_recv from being in the middle of something when we do
599	 * the stop.  This includes ensuring dlm_recv isn't processing a
600	 * recovery message (rcom), while dlm_recoverd is aborting and
601	 * resetting things from an in-progress recovery.  i.e. we want
602	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
603	 * processing an rcom at the same time.  Stopping dlm_recv also makes
604	 * it easy for dlm_receive_message() to check locking stopped and add a
605	 * message to the requestqueue without races.
606	 */
607
608	down_write(&ls->ls_recv_active);
609
610	/*
611	 * Abort any recovery that's in progress (see RECOVER_STOP,
612	 * dlm_recovery_stopped()) and tell any other threads running in the
613	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
614	 */
615
616	spin_lock(&ls->ls_recover_lock);
617	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
618	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
619	ls->ls_recover_seq++;
620	spin_unlock(&ls->ls_recover_lock);
621
622	/*
623	 * Let dlm_recv run again, now any normal messages will be saved on the
624	 * requestqueue for later.
625	 */
626
627	up_write(&ls->ls_recv_active);
628
629	/*
630	 * This in_recovery lock does two things:
631	 * 1) Keeps this function from returning until all threads are out
632	 *    of locking routines and locking is truly stopped.
633	 * 2) Keeps any new requests from being processed until it's unlocked
634	 *    when recovery is complete.
635	 */
636
637	if (new) {
638		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
639		wake_up_process(ls->ls_recoverd_task);
640		wait_event(ls->ls_recover_lock_wait,
641			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
642	}
643
644	/*
645	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
646	 * running) has noticed RECOVER_STOP above and quit processing the
647	 * previous recovery.
648	 */
649
650	dlm_recoverd_suspend(ls);
651
652	spin_lock(&ls->ls_recover_lock);
653	kfree(ls->ls_slots);
654	ls->ls_slots = NULL;
655	ls->ls_num_slots = 0;
656	ls->ls_slots_size = 0;
657	ls->ls_recover_status = 0;
658	spin_unlock(&ls->ls_recover_lock);
659
660	dlm_recoverd_resume(ls);
661
662	if (!ls->ls_recover_begin)
663		ls->ls_recover_begin = jiffies;
664
665	dlm_lsop_recover_prep(ls);
666	return 0;
667}
668
669int dlm_ls_start(struct dlm_ls *ls)
670{
671	struct dlm_recover *rv, *rv_old;
672	struct dlm_config_node *nodes = NULL;
673	int error, count;
674
675	rv = kzalloc(sizeof(*rv), GFP_NOFS);
676	if (!rv)
677		return -ENOMEM;
678
679	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
680	if (error < 0)
681		goto fail_rv;
682
683	spin_lock(&ls->ls_recover_lock);
684
685	/* the lockspace needs to be stopped before it can be started */
686
687	if (!dlm_locking_stopped(ls)) {
688		spin_unlock(&ls->ls_recover_lock);
689		log_error(ls, "start ignored: lockspace running");
690		error = -EINVAL;
691		goto fail;
692	}
693
694	rv->nodes = nodes;
695	rv->nodes_count = count;
696	rv->seq = ++ls->ls_recover_seq;
697	rv_old = ls->ls_recover_args;
698	ls->ls_recover_args = rv;
699	spin_unlock(&ls->ls_recover_lock);
700
701	if (rv_old) {
702		log_error(ls, "unused recovery %llx %d",
703			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
704		kfree(rv_old->nodes);
705		kfree(rv_old);
706	}
707
708	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
709	wake_up_process(ls->ls_recoverd_task);
710	return 0;
711
712 fail:
713	kfree(nodes);
714 fail_rv:
715	kfree(rv);
716	return error;
717}
718
v5.14.15
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
  6**
  7**
  8*******************************************************************************
  9******************************************************************************/
 10
 11#include "dlm_internal.h"
 12#include "lockspace.h"
 13#include "member.h"
 14#include "recoverd.h"
 15#include "recover.h"
 16#include "rcom.h"
 17#include "config.h"
 18#include "midcomms.h"
 19#include "lowcomms.h"
 20
 21int dlm_slots_version(struct dlm_header *h)
 22{
 23	if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
 24		return 0;
 25	return 1;
 26}
 27
 28void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
 29		   struct dlm_member *memb)
 30{
 31	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 32
 33	if (!dlm_slots_version(&rc->rc_header))
 34		return;
 35
 36	memb->slot = le16_to_cpu(rf->rf_our_slot);
 37	memb->generation = le32_to_cpu(rf->rf_generation);
 38}
 39
 40void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
 41{
 42	struct dlm_slot *slot;
 43	struct rcom_slot *ro;
 44	int i;
 45
 46	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 47
 48	/* ls_slots array is sparse, but not rcom_slots */
 49
 50	for (i = 0; i < ls->ls_slots_size; i++) {
 51		slot = &ls->ls_slots[i];
 52		if (!slot->nodeid)
 53			continue;
 54		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
 55		ro->ro_slot = cpu_to_le16(slot->slot);
 56		ro++;
 57	}
 58}
 59
 60#define SLOT_DEBUG_LINE 128
 61
 62static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
 63		      struct rcom_slot *ro0, struct dlm_slot *array,
 64		      int array_size)
 65{
 66	char line[SLOT_DEBUG_LINE];
 67	int len = SLOT_DEBUG_LINE - 1;
 68	int pos = 0;
 69	int ret, i;
 70
 71	memset(line, 0, sizeof(line));
 72
 73	if (array) {
 74		for (i = 0; i < array_size; i++) {
 75			if (!array[i].nodeid)
 76				continue;
 77
 78			ret = snprintf(line + pos, len - pos, " %d:%d",
 79				       array[i].slot, array[i].nodeid);
 80			if (ret >= len - pos)
 81				break;
 82			pos += ret;
 83		}
 84	} else if (ro0) {
 85		for (i = 0; i < num_slots; i++) {
 86			ret = snprintf(line + pos, len - pos, " %d:%d",
 87				       ro0[i].ro_slot, ro0[i].ro_nodeid);
 88			if (ret >= len - pos)
 89				break;
 90			pos += ret;
 91		}
 92	}
 93
 94	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
 95}
 96
 97int dlm_slots_copy_in(struct dlm_ls *ls)
 98{
 99	struct dlm_member *memb;
100	struct dlm_rcom *rc = ls->ls_recover_buf;
101	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
102	struct rcom_slot *ro0, *ro;
103	int our_nodeid = dlm_our_nodeid();
104	int i, num_slots;
105	uint32_t gen;
106
107	if (!dlm_slots_version(&rc->rc_header))
108		return -1;
109
110	gen = le32_to_cpu(rf->rf_generation);
111	if (gen <= ls->ls_generation) {
112		log_error(ls, "dlm_slots_copy_in gen %u old %u",
113			  gen, ls->ls_generation);
114	}
115	ls->ls_generation = gen;
116
117	num_slots = le16_to_cpu(rf->rf_num_slots);
118	if (!num_slots)
119		return -1;
120
121	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
122
123	for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
124		ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
125		ro->ro_slot = le16_to_cpu(ro->ro_slot);
126	}
127
128	log_slots(ls, gen, num_slots, ro0, NULL, 0);
129
130	list_for_each_entry(memb, &ls->ls_nodes, list) {
131		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
132			if (ro->ro_nodeid != memb->nodeid)
133				continue;
134			memb->slot = ro->ro_slot;
135			memb->slot_prev = memb->slot;
136			break;
137		}
138
139		if (memb->nodeid == our_nodeid) {
140			if (ls->ls_slot && ls->ls_slot != memb->slot) {
141				log_error(ls, "dlm_slots_copy_in our slot "
142					  "changed %d %d", ls->ls_slot,
143					  memb->slot);
144				return -1;
145			}
146
147			if (!ls->ls_slot)
148				ls->ls_slot = memb->slot;
149		}
150
151		if (!memb->slot) {
152			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
153				   memb->nodeid);
154			return -1;
155		}
156	}
157
158	return 0;
159}
160
161/* for any nodes that do not support slots, we will not have set memb->slot
162   in wait_status_all(), so memb->slot will remain -1, and we will not
163   assign slots or set ls_num_slots here */
164
165int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
166		     struct dlm_slot **slots_out, uint32_t *gen_out)
167{
168	struct dlm_member *memb;
169	struct dlm_slot *array;
170	int our_nodeid = dlm_our_nodeid();
171	int array_size, max_slots, i;
172	int need = 0;
173	int max = 0;
174	int num = 0;
175	uint32_t gen = 0;
176
177	/* our own memb struct will have slot -1 gen 0 */
178
179	list_for_each_entry(memb, &ls->ls_nodes, list) {
180		if (memb->nodeid == our_nodeid) {
181			memb->slot = ls->ls_slot;
182			memb->generation = ls->ls_generation;
183			break;
184		}
185	}
186
187	list_for_each_entry(memb, &ls->ls_nodes, list) {
188		if (memb->generation > gen)
189			gen = memb->generation;
190
191		/* node doesn't support slots */
192
193		if (memb->slot == -1)
194			return -1;
195
196		/* node needs a slot assigned */
197
198		if (!memb->slot)
199			need++;
200
201		/* node has a slot assigned */
202
203		num++;
204
205		if (!max || max < memb->slot)
206			max = memb->slot;
207
208		/* sanity check, once slot is assigned it shouldn't change */
209
210		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
211			log_error(ls, "nodeid %d slot changed %d %d",
212				  memb->nodeid, memb->slot_prev, memb->slot);
213			return -1;
214		}
215		memb->slot_prev = memb->slot;
216	}
217
218	array_size = max + need;
219	array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
220	if (!array)
221		return -ENOMEM;
222
223	num = 0;
224
225	/* fill in slots (offsets) that are used */
226
227	list_for_each_entry(memb, &ls->ls_nodes, list) {
228		if (!memb->slot)
229			continue;
230
231		if (memb->slot > array_size) {
232			log_error(ls, "invalid slot number %d", memb->slot);
233			kfree(array);
234			return -1;
235		}
236
237		array[memb->slot - 1].nodeid = memb->nodeid;
238		array[memb->slot - 1].slot = memb->slot;
239		num++;
240	}
241
242	/* assign new slots from unused offsets */
243
244	list_for_each_entry(memb, &ls->ls_nodes, list) {
245		if (memb->slot)
246			continue;
247
248		for (i = 0; i < array_size; i++) {
249			if (array[i].nodeid)
250				continue;
251
252			memb->slot = i + 1;
253			memb->slot_prev = memb->slot;
254			array[i].nodeid = memb->nodeid;
255			array[i].slot = memb->slot;
256			num++;
257
258			if (!ls->ls_slot && memb->nodeid == our_nodeid)
259				ls->ls_slot = memb->slot;
260			break;
261		}
262
263		if (!memb->slot) {
264			log_error(ls, "no free slot found");
265			kfree(array);
266			return -1;
267		}
268	}
269
270	gen++;
271
272	log_slots(ls, gen, num, NULL, array, array_size);
273
274	max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
275		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
276
277	if (num > max_slots) {
278		log_error(ls, "num_slots %d exceeds max_slots %d",
279			  num, max_slots);
280		kfree(array);
281		return -1;
282	}
283
284	*gen_out = gen;
285	*slots_out = array;
286	*slots_size = array_size;
287	*num_slots = num;
288	return 0;
289}
290
291static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
292{
293	struct dlm_member *memb = NULL;
294	struct list_head *tmp;
295	struct list_head *newlist = &new->list;
296	struct list_head *head = &ls->ls_nodes;
297
298	list_for_each(tmp, head) {
299		memb = list_entry(tmp, struct dlm_member, list);
300		if (new->nodeid < memb->nodeid)
301			break;
302	}
303
304	if (!memb)
305		list_add_tail(newlist, head);
306	else {
307		/* FIXME: can use list macro here */
308		newlist->prev = tmp->prev;
309		newlist->next = tmp;
310		tmp->prev->next = newlist;
311		tmp->prev = newlist;
312	}
313}
314
315static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
316{
317	struct dlm_member *memb;
318	int error;
319
320	memb = kzalloc(sizeof(*memb), GFP_NOFS);
321	if (!memb)
322		return -ENOMEM;
323
324	error = dlm_lowcomms_connect_node(node->nodeid);
325	if (error < 0) {
326		kfree(memb);
327		return error;
328	}
329
330	memb->nodeid = node->nodeid;
331	memb->weight = node->weight;
332	memb->comm_seq = node->comm_seq;
333	dlm_midcomms_add_member(node->nodeid);
334	add_ordered_member(ls, memb);
335	ls->ls_num_nodes++;
336	return 0;
337}
338
339static struct dlm_member *find_memb(struct list_head *head, int nodeid)
340{
341	struct dlm_member *memb;
342
343	list_for_each_entry(memb, head, list) {
344		if (memb->nodeid == nodeid)
345			return memb;
346	}
347	return NULL;
348}
349
350int dlm_is_member(struct dlm_ls *ls, int nodeid)
351{
352	if (find_memb(&ls->ls_nodes, nodeid))
353		return 1;
354	return 0;
355}
356
357int dlm_is_removed(struct dlm_ls *ls, int nodeid)
358{
359	if (find_memb(&ls->ls_nodes_gone, nodeid))
360		return 1;
361	return 0;
362}
363
364static void clear_memb_list(struct list_head *head,
365			    void (*after_del)(int nodeid))
366{
367	struct dlm_member *memb;
368
369	while (!list_empty(head)) {
370		memb = list_entry(head->next, struct dlm_member, list);
371		list_del(&memb->list);
372		if (after_del)
373			after_del(memb->nodeid);
374		kfree(memb);
375	}
376}
377
378static void clear_members_cb(int nodeid)
379{
380	dlm_midcomms_remove_member(nodeid);
381}
382
383void dlm_clear_members(struct dlm_ls *ls)
384{
385	clear_memb_list(&ls->ls_nodes, clear_members_cb);
386	ls->ls_num_nodes = 0;
387}
388
389void dlm_clear_members_gone(struct dlm_ls *ls)
390{
391	clear_memb_list(&ls->ls_nodes_gone, NULL);
392}
393
394static void make_member_array(struct dlm_ls *ls)
395{
396	struct dlm_member *memb;
397	int i, w, x = 0, total = 0, all_zero = 0, *array;
398
399	kfree(ls->ls_node_array);
400	ls->ls_node_array = NULL;
401
402	list_for_each_entry(memb, &ls->ls_nodes, list) {
403		if (memb->weight)
404			total += memb->weight;
405	}
406
407	/* all nodes revert to weight of 1 if all have weight 0 */
408
409	if (!total) {
410		total = ls->ls_num_nodes;
411		all_zero = 1;
412	}
413
414	ls->ls_total_weight = total;
415	array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
416	if (!array)
417		return;
418
419	list_for_each_entry(memb, &ls->ls_nodes, list) {
420		if (!all_zero && !memb->weight)
421			continue;
422
423		if (all_zero)
424			w = 1;
425		else
426			w = memb->weight;
427
428		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
429
430		for (i = 0; i < w; i++)
431			array[x++] = memb->nodeid;
432	}
433
434	ls->ls_node_array = array;
435}
436
437/* send a status request to all members just to establish comms connections */
438
439static int ping_members(struct dlm_ls *ls)
440{
441	struct dlm_member *memb;
442	int error = 0;
443
444	list_for_each_entry(memb, &ls->ls_nodes, list) {
445		error = dlm_recovery_stopped(ls);
446		if (error)
447			break;
448		error = dlm_rcom_status(ls, memb->nodeid, 0);
449		if (error)
450			break;
451	}
452	if (error)
453		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
454			  error, ls->ls_recover_nodeid);
455	return error;
456}
457
458static void dlm_lsop_recover_prep(struct dlm_ls *ls)
459{
460	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
461		return;
462	ls->ls_ops->recover_prep(ls->ls_ops_arg);
463}
464
465static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
466{
467	struct dlm_slot slot;
468	uint32_t seq;
469	int error;
470
471	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
472		return;
473
474	/* if there is no comms connection with this node
475	   or the present comms connection is newer
476	   than the one when this member was added, then
477	   we consider the node to have failed (versus
478	   being removed due to dlm_release_lockspace) */
479
480	error = dlm_comm_seq(memb->nodeid, &seq);
481
482	if (!error && seq == memb->comm_seq)
483		return;
484
485	slot.nodeid = memb->nodeid;
486	slot.slot = memb->slot;
487
488	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
489}
490
491void dlm_lsop_recover_done(struct dlm_ls *ls)
492{
493	struct dlm_member *memb;
494	struct dlm_slot *slots;
495	int i, num;
496
497	if (!ls->ls_ops || !ls->ls_ops->recover_done)
498		return;
499
500	num = ls->ls_num_nodes;
501	slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
502	if (!slots)
503		return;
504
505	i = 0;
506	list_for_each_entry(memb, &ls->ls_nodes, list) {
507		if (i == num) {
508			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
509			goto out;
510		}
511		slots[i].nodeid = memb->nodeid;
512		slots[i].slot = memb->slot;
513		i++;
514	}
515
516	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
517				 ls->ls_slot, ls->ls_generation);
518 out:
519	kfree(slots);
520}
521
522static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
523						int nodeid)
524{
525	int i;
526
527	for (i = 0; i < rv->nodes_count; i++) {
528		if (rv->nodes[i].nodeid == nodeid)
529			return &rv->nodes[i];
530	}
531	return NULL;
532}
533
534int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
535{
536	struct dlm_member *memb, *safe;
537	struct dlm_config_node *node;
538	int i, error, neg = 0, low = -1;
539
540	/* previously removed members that we've not finished removing need to
541	   count as a negative change so the "neg" recovery steps will happen */
542
543	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
544		log_rinfo(ls, "prev removed member %d", memb->nodeid);
545		neg++;
546	}
547
548	/* move departed members from ls_nodes to ls_nodes_gone */
549
550	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
551		node = find_config_node(rv, memb->nodeid);
552		if (node && !node->new)
553			continue;
554
555		if (!node) {
556			log_rinfo(ls, "remove member %d", memb->nodeid);
557		} else {
558			/* removed and re-added */
559			log_rinfo(ls, "remove member %d comm_seq %u %u",
560				  memb->nodeid, memb->comm_seq, node->comm_seq);
561		}
562
563		neg++;
564		list_move(&memb->list, &ls->ls_nodes_gone);
565		dlm_midcomms_remove_member(memb->nodeid);
566		ls->ls_num_nodes--;
567		dlm_lsop_recover_slot(ls, memb);
568	}
569
570	/* add new members to ls_nodes */
571
572	for (i = 0; i < rv->nodes_count; i++) {
573		node = &rv->nodes[i];
574		if (dlm_is_member(ls, node->nodeid))
575			continue;
576		dlm_add_member(ls, node);
577		log_rinfo(ls, "add member %d", node->nodeid);
578	}
579
580	list_for_each_entry(memb, &ls->ls_nodes, list) {
581		if (low == -1 || memb->nodeid < low)
582			low = memb->nodeid;
583	}
584	ls->ls_low_nodeid = low;
585
586	make_member_array(ls);
587	*neg_out = neg;
588
589	error = ping_members(ls);
590	/* error -EINTR means that a new recovery action is triggered.
591	 * We ignore this recovery action and let run the new one which might
592	 * have new member configuration.
593	 */
594	if (error == -EINTR)
595		error = 0;
596
597	/* new_lockspace() may be waiting to know if the config
598	 * is good or bad
599	 */
600	ls->ls_members_result = error;
601	complete(&ls->ls_members_done);
602
603	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
604	return error;
605}
606
607/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
608   dlm_ls_start() is called on any of them to start the new recovery. */
609
610int dlm_ls_stop(struct dlm_ls *ls)
611{
612	int new;
613
614	/*
615	 * Prevent dlm_recv from being in the middle of something when we do
616	 * the stop.  This includes ensuring dlm_recv isn't processing a
617	 * recovery message (rcom), while dlm_recoverd is aborting and
618	 * resetting things from an in-progress recovery.  i.e. we want
619	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
620	 * processing an rcom at the same time.  Stopping dlm_recv also makes
621	 * it easy for dlm_receive_message() to check locking stopped and add a
622	 * message to the requestqueue without races.
623	 */
624
625	down_write(&ls->ls_recv_active);
626
627	/*
628	 * Abort any recovery that's in progress (see RECOVER_STOP,
629	 * dlm_recovery_stopped()) and tell any other threads running in the
630	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
631	 */
632
633	spin_lock(&ls->ls_recover_lock);
634	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
635	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
636	ls->ls_recover_seq++;
637	spin_unlock(&ls->ls_recover_lock);
638
639	/*
640	 * Let dlm_recv run again, now any normal messages will be saved on the
641	 * requestqueue for later.
642	 */
643
644	up_write(&ls->ls_recv_active);
645
646	/*
647	 * This in_recovery lock does two things:
648	 * 1) Keeps this function from returning until all threads are out
649	 *    of locking routines and locking is truly stopped.
650	 * 2) Keeps any new requests from being processed until it's unlocked
651	 *    when recovery is complete.
652	 */
653
654	if (new) {
655		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
656		wake_up_process(ls->ls_recoverd_task);
657		wait_event(ls->ls_recover_lock_wait,
658			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
659	}
660
661	/*
662	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
663	 * running) has noticed RECOVER_STOP above and quit processing the
664	 * previous recovery.
665	 */
666
667	dlm_recoverd_suspend(ls);
668
669	spin_lock(&ls->ls_recover_lock);
670	kfree(ls->ls_slots);
671	ls->ls_slots = NULL;
672	ls->ls_num_slots = 0;
673	ls->ls_slots_size = 0;
674	ls->ls_recover_status = 0;
675	spin_unlock(&ls->ls_recover_lock);
676
677	dlm_recoverd_resume(ls);
678
679	if (!ls->ls_recover_begin)
680		ls->ls_recover_begin = jiffies;
681
682	dlm_lsop_recover_prep(ls);
683	return 0;
684}
685
686int dlm_ls_start(struct dlm_ls *ls)
687{
688	struct dlm_recover *rv, *rv_old;
689	struct dlm_config_node *nodes = NULL;
690	int error, count;
691
692	rv = kzalloc(sizeof(*rv), GFP_NOFS);
693	if (!rv)
694		return -ENOMEM;
695
696	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
697	if (error < 0)
698		goto fail_rv;
699
700	spin_lock(&ls->ls_recover_lock);
701
702	/* the lockspace needs to be stopped before it can be started */
703
704	if (!dlm_locking_stopped(ls)) {
705		spin_unlock(&ls->ls_recover_lock);
706		log_error(ls, "start ignored: lockspace running");
707		error = -EINVAL;
708		goto fail;
709	}
710
711	rv->nodes = nodes;
712	rv->nodes_count = count;
713	rv->seq = ++ls->ls_recover_seq;
714	rv_old = ls->ls_recover_args;
715	ls->ls_recover_args = rv;
716	spin_unlock(&ls->ls_recover_lock);
717
718	if (rv_old) {
719		log_error(ls, "unused recovery %llx %d",
720			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
721		kfree(rv_old->nodes);
722		kfree(rv_old);
723	}
724
725	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
726	wake_up_process(ls->ls_recoverd_task);
727	return 0;
728
729 fail:
730	kfree(nodes);
731 fail_rv:
732	kfree(rv);
733	return error;
734}
735