Linux Audio

Check our new training course

Loading...
v6.9.4
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
  6**
  7**
  8*******************************************************************************
  9******************************************************************************/
 10
 11#include "dlm_internal.h"
 12#include "lockspace.h"
 13#include "member.h"
 14#include "recoverd.h"
 15#include "recover.h"
 16#include "rcom.h"
 17#include "config.h"
 18#include "midcomms.h"
 19#include "lowcomms.h"
 20
 21int dlm_slots_version(const struct dlm_header *h)
 22{
 23	if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
 24		return 0;
 25	return 1;
 26}
 27
 28void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
 29		   struct dlm_member *memb)
 30{
 31	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 32
 33	if (!dlm_slots_version(&rc->rc_header))
 34		return;
 35
 36	memb->slot = le16_to_cpu(rf->rf_our_slot);
 37	memb->generation = le32_to_cpu(rf->rf_generation);
 38}
 39
 40void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
 41{
 42	struct dlm_slot *slot;
 43	struct rcom_slot *ro;
 44	int i;
 45
 46	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 47
 48	/* ls_slots array is sparse, but not rcom_slots */
 49
 50	for (i = 0; i < ls->ls_slots_size; i++) {
 51		slot = &ls->ls_slots[i];
 52		if (!slot->nodeid)
 53			continue;
 54		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
 55		ro->ro_slot = cpu_to_le16(slot->slot);
 56		ro++;
 57	}
 58}
 59
 60#define SLOT_DEBUG_LINE 128
 61
 62static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
 63		      struct rcom_slot *ro0, struct dlm_slot *array,
 64		      int array_size)
 65{
 66	char line[SLOT_DEBUG_LINE];
 67	int len = SLOT_DEBUG_LINE - 1;
 68	int pos = 0;
 69	int ret, i;
 70
 71	memset(line, 0, sizeof(line));
 72
 73	if (array) {
 74		for (i = 0; i < array_size; i++) {
 75			if (!array[i].nodeid)
 76				continue;
 77
 78			ret = snprintf(line + pos, len - pos, " %d:%d",
 79				       array[i].slot, array[i].nodeid);
 80			if (ret >= len - pos)
 81				break;
 82			pos += ret;
 83		}
 84	} else if (ro0) {
 85		for (i = 0; i < num_slots; i++) {
 86			ret = snprintf(line + pos, len - pos, " %d:%d",
 87				       ro0[i].ro_slot, ro0[i].ro_nodeid);
 88			if (ret >= len - pos)
 89				break;
 90			pos += ret;
 91		}
 92	}
 93
 94	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
 95}
 96
 97int dlm_slots_copy_in(struct dlm_ls *ls)
 98{
 99	struct dlm_member *memb;
100	struct dlm_rcom *rc = ls->ls_recover_buf;
101	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
102	struct rcom_slot *ro0, *ro;
103	int our_nodeid = dlm_our_nodeid();
104	int i, num_slots;
105	uint32_t gen;
106
107	if (!dlm_slots_version(&rc->rc_header))
108		return -1;
109
110	gen = le32_to_cpu(rf->rf_generation);
111	if (gen <= ls->ls_generation) {
112		log_error(ls, "dlm_slots_copy_in gen %u old %u",
113			  gen, ls->ls_generation);
114	}
115	ls->ls_generation = gen;
116
117	num_slots = le16_to_cpu(rf->rf_num_slots);
118	if (!num_slots)
119		return -1;
120
121	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
122
123	log_slots(ls, gen, num_slots, ro0, NULL, 0);
124
125	list_for_each_entry(memb, &ls->ls_nodes, list) {
126		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
127			if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
128				continue;
129			memb->slot = le16_to_cpu(ro->ro_slot);
130			memb->slot_prev = memb->slot;
131			break;
132		}
133
134		if (memb->nodeid == our_nodeid) {
135			if (ls->ls_slot && ls->ls_slot != memb->slot) {
136				log_error(ls, "dlm_slots_copy_in our slot "
137					  "changed %d %d", ls->ls_slot,
138					  memb->slot);
139				return -1;
140			}
141
142			if (!ls->ls_slot)
143				ls->ls_slot = memb->slot;
144		}
145
146		if (!memb->slot) {
147			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
148				   memb->nodeid);
149			return -1;
150		}
151	}
152
153	return 0;
154}
155
156/* for any nodes that do not support slots, we will not have set memb->slot
157   in wait_status_all(), so memb->slot will remain -1, and we will not
158   assign slots or set ls_num_slots here */
159
160int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
161		     struct dlm_slot **slots_out, uint32_t *gen_out)
162{
163	struct dlm_member *memb;
164	struct dlm_slot *array;
165	int our_nodeid = dlm_our_nodeid();
166	int array_size, max_slots, i;
167	int need = 0;
168	int max = 0;
169	int num = 0;
170	uint32_t gen = 0;
171
172	/* our own memb struct will have slot -1 gen 0 */
173
174	list_for_each_entry(memb, &ls->ls_nodes, list) {
175		if (memb->nodeid == our_nodeid) {
176			memb->slot = ls->ls_slot;
177			memb->generation = ls->ls_generation;
178			break;
179		}
180	}
181
182	list_for_each_entry(memb, &ls->ls_nodes, list) {
183		if (memb->generation > gen)
184			gen = memb->generation;
185
186		/* node doesn't support slots */
187
188		if (memb->slot == -1)
189			return -1;
190
191		/* node needs a slot assigned */
192
193		if (!memb->slot)
194			need++;
195
196		/* node has a slot assigned */
197
198		num++;
199
200		if (!max || max < memb->slot)
201			max = memb->slot;
202
203		/* sanity check, once slot is assigned it shouldn't change */
204
205		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
206			log_error(ls, "nodeid %d slot changed %d %d",
207				  memb->nodeid, memb->slot_prev, memb->slot);
208			return -1;
209		}
210		memb->slot_prev = memb->slot;
211	}
212
213	array_size = max + need;
214	array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
215	if (!array)
216		return -ENOMEM;
217
218	num = 0;
219
220	/* fill in slots (offsets) that are used */
221
222	list_for_each_entry(memb, &ls->ls_nodes, list) {
223		if (!memb->slot)
224			continue;
225
226		if (memb->slot > array_size) {
227			log_error(ls, "invalid slot number %d", memb->slot);
228			kfree(array);
229			return -1;
230		}
231
232		array[memb->slot - 1].nodeid = memb->nodeid;
233		array[memb->slot - 1].slot = memb->slot;
234		num++;
235	}
236
237	/* assign new slots from unused offsets */
238
239	list_for_each_entry(memb, &ls->ls_nodes, list) {
240		if (memb->slot)
241			continue;
242
243		for (i = 0; i < array_size; i++) {
244			if (array[i].nodeid)
245				continue;
246
247			memb->slot = i + 1;
248			memb->slot_prev = memb->slot;
249			array[i].nodeid = memb->nodeid;
250			array[i].slot = memb->slot;
251			num++;
252
253			if (!ls->ls_slot && memb->nodeid == our_nodeid)
254				ls->ls_slot = memb->slot;
255			break;
256		}
257
258		if (!memb->slot) {
259			log_error(ls, "no free slot found");
260			kfree(array);
261			return -1;
262		}
263	}
264
265	gen++;
266
267	log_slots(ls, gen, num, NULL, array, array_size);
268
269	max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
270		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
271
272	if (num > max_slots) {
273		log_error(ls, "num_slots %d exceeds max_slots %d",
274			  num, max_slots);
275		kfree(array);
276		return -1;
277	}
278
279	*gen_out = gen;
280	*slots_out = array;
281	*slots_size = array_size;
282	*num_slots = num;
283	return 0;
284}
285
286static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
287{
288	struct dlm_member *memb = NULL;
289	struct list_head *tmp;
290	struct list_head *newlist = &new->list;
291	struct list_head *head = &ls->ls_nodes;
292
293	list_for_each(tmp, head) {
294		memb = list_entry(tmp, struct dlm_member, list);
295		if (new->nodeid < memb->nodeid)
296			break;
297	}
298
299	if (!memb)
300		list_add_tail(newlist, head);
301	else {
302		/* FIXME: can use list macro here */
303		newlist->prev = tmp->prev;
304		newlist->next = tmp;
305		tmp->prev->next = newlist;
306		tmp->prev = newlist;
307	}
308}
309
310static int add_remote_member(int nodeid)
311{
312	int error;
313
314	if (nodeid == dlm_our_nodeid())
315		return 0;
316
317	error = dlm_lowcomms_connect_node(nodeid);
318	if (error < 0)
319		return error;
320
321	dlm_midcomms_add_member(nodeid);
322	return 0;
323}
324
325static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
326{
327	struct dlm_member *memb;
328	int error;
329
330	memb = kzalloc(sizeof(*memb), GFP_NOFS);
331	if (!memb)
332		return -ENOMEM;
333
334	memb->nodeid = node->nodeid;
335	memb->weight = node->weight;
336	memb->comm_seq = node->comm_seq;
337
338	error = add_remote_member(node->nodeid);
339	if (error < 0) {
340		kfree(memb);
341		return error;
342	}
343
344	add_ordered_member(ls, memb);
345	ls->ls_num_nodes++;
346	return 0;
347}
348
349static struct dlm_member *find_memb(struct list_head *head, int nodeid)
350{
351	struct dlm_member *memb;
352
353	list_for_each_entry(memb, head, list) {
354		if (memb->nodeid == nodeid)
355			return memb;
356	}
357	return NULL;
358}
359
360int dlm_is_member(struct dlm_ls *ls, int nodeid)
361{
362	if (find_memb(&ls->ls_nodes, nodeid))
363		return 1;
364	return 0;
365}
366
367int dlm_is_removed(struct dlm_ls *ls, int nodeid)
368{
 
 
369	if (find_memb(&ls->ls_nodes_gone, nodeid))
370		return 1;
371	return 0;
372}
373
374static void clear_memb_list(struct list_head *head,
375			    void (*after_del)(int nodeid))
376{
377	struct dlm_member *memb;
378
379	while (!list_empty(head)) {
380		memb = list_entry(head->next, struct dlm_member, list);
381		list_del(&memb->list);
382		if (after_del)
383			after_del(memb->nodeid);
384		kfree(memb);
385	}
386}
387
388static void remove_remote_member(int nodeid)
389{
390	if (nodeid == dlm_our_nodeid())
391		return;
392
393	dlm_midcomms_remove_member(nodeid);
394}
395
396void dlm_clear_members(struct dlm_ls *ls)
397{
398	clear_memb_list(&ls->ls_nodes, remove_remote_member);
399	ls->ls_num_nodes = 0;
400}
401
402void dlm_clear_members_gone(struct dlm_ls *ls)
403{
404	clear_memb_list(&ls->ls_nodes_gone, NULL);
405}
406
407static void make_member_array(struct dlm_ls *ls)
408{
409	struct dlm_member *memb;
410	int i, w, x = 0, total = 0, all_zero = 0, *array;
411
412	kfree(ls->ls_node_array);
413	ls->ls_node_array = NULL;
414
415	list_for_each_entry(memb, &ls->ls_nodes, list) {
416		if (memb->weight)
417			total += memb->weight;
418	}
419
420	/* all nodes revert to weight of 1 if all have weight 0 */
421
422	if (!total) {
423		total = ls->ls_num_nodes;
424		all_zero = 1;
425	}
426
427	ls->ls_total_weight = total;
428	array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
429	if (!array)
430		return;
431
432	list_for_each_entry(memb, &ls->ls_nodes, list) {
433		if (!all_zero && !memb->weight)
434			continue;
435
436		if (all_zero)
437			w = 1;
438		else
439			w = memb->weight;
440
441		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
442
443		for (i = 0; i < w; i++)
444			array[x++] = memb->nodeid;
445	}
446
447	ls->ls_node_array = array;
448}
449
450/* send a status request to all members just to establish comms connections */
451
452static int ping_members(struct dlm_ls *ls, uint64_t seq)
453{
454	struct dlm_member *memb;
455	int error = 0;
456
457	list_for_each_entry(memb, &ls->ls_nodes, list) {
458		if (dlm_recovery_stopped(ls)) {
459			error = -EINTR;
460			break;
461		}
462		error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
463		if (error)
464			break;
465	}
466	if (error)
467		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
468			  error, ls->ls_recover_nodeid);
469	return error;
470}
471
472static void dlm_lsop_recover_prep(struct dlm_ls *ls)
473{
474	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
475		return;
476	ls->ls_ops->recover_prep(ls->ls_ops_arg);
477}
478
479static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
480{
481	struct dlm_slot slot;
482	uint32_t seq;
483	int error;
484
485	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
486		return;
487
488	/* if there is no comms connection with this node
489	   or the present comms connection is newer
490	   than the one when this member was added, then
491	   we consider the node to have failed (versus
492	   being removed due to dlm_release_lockspace) */
493
494	error = dlm_comm_seq(memb->nodeid, &seq);
495
496	if (!error && seq == memb->comm_seq)
497		return;
498
499	slot.nodeid = memb->nodeid;
500	slot.slot = memb->slot;
501
502	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
503}
504
505void dlm_lsop_recover_done(struct dlm_ls *ls)
506{
507	struct dlm_member *memb;
508	struct dlm_slot *slots;
509	int i, num;
510
511	if (!ls->ls_ops || !ls->ls_ops->recover_done)
512		return;
513
514	num = ls->ls_num_nodes;
515	slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
516	if (!slots)
517		return;
518
519	i = 0;
520	list_for_each_entry(memb, &ls->ls_nodes, list) {
521		if (i == num) {
522			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
523			goto out;
524		}
525		slots[i].nodeid = memb->nodeid;
526		slots[i].slot = memb->slot;
527		i++;
528	}
529
530	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
531				 ls->ls_slot, ls->ls_generation);
532 out:
533	kfree(slots);
534}
535
536static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
537						int nodeid)
538{
539	int i;
540
541	for (i = 0; i < rv->nodes_count; i++) {
542		if (rv->nodes[i].nodeid == nodeid)
543			return &rv->nodes[i];
544	}
545	return NULL;
546}
547
548int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
549{
550	struct dlm_member *memb, *safe;
551	struct dlm_config_node *node;
552	int i, error, neg = 0, low = -1;
553
554	/* previously removed members that we've not finished removing need to
555	 * count as a negative change so the "neg" recovery steps will happen
556	 *
557	 * This functionality must report all member changes to lsops or
558	 * midcomms layer and must never return before.
559	 */
560
561	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
562		log_rinfo(ls, "prev removed member %d", memb->nodeid);
563		neg++;
564	}
565
566	/* move departed members from ls_nodes to ls_nodes_gone */
567
568	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
569		node = find_config_node(rv, memb->nodeid);
570		if (node && !node->new)
571			continue;
572
573		if (!node) {
574			log_rinfo(ls, "remove member %d", memb->nodeid);
575		} else {
576			/* removed and re-added */
577			log_rinfo(ls, "remove member %d comm_seq %u %u",
578				  memb->nodeid, memb->comm_seq, node->comm_seq);
579		}
580
581		neg++;
582		list_move(&memb->list, &ls->ls_nodes_gone);
583		remove_remote_member(memb->nodeid);
584		ls->ls_num_nodes--;
585		dlm_lsop_recover_slot(ls, memb);
586	}
587
588	/* add new members to ls_nodes */
589
590	for (i = 0; i < rv->nodes_count; i++) {
591		node = &rv->nodes[i];
592		if (dlm_is_member(ls, node->nodeid))
593			continue;
594		error = dlm_add_member(ls, node);
595		if (error)
596			return error;
597
598		log_rinfo(ls, "add member %d", node->nodeid);
599	}
600
601	list_for_each_entry(memb, &ls->ls_nodes, list) {
602		if (low == -1 || memb->nodeid < low)
603			low = memb->nodeid;
604	}
605	ls->ls_low_nodeid = low;
606
607	make_member_array(ls);
608	*neg_out = neg;
609
610	error = ping_members(ls, rv->seq);
611	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
612	return error;
613}
614
615/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
616   dlm_ls_start() is called on any of them to start the new recovery. */
617
618int dlm_ls_stop(struct dlm_ls *ls)
619{
620	int new;
621
622	/*
623	 * Prevent dlm_recv from being in the middle of something when we do
624	 * the stop.  This includes ensuring dlm_recv isn't processing a
625	 * recovery message (rcom), while dlm_recoverd is aborting and
626	 * resetting things from an in-progress recovery.  i.e. we want
627	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
628	 * processing an rcom at the same time.  Stopping dlm_recv also makes
629	 * it easy for dlm_receive_message() to check locking stopped and add a
630	 * message to the requestqueue without races.
631	 */
632
633	down_write(&ls->ls_recv_active);
634
635	/*
636	 * Abort any recovery that's in progress (see RECOVER_STOP,
637	 * dlm_recovery_stopped()) and tell any other threads running in the
638	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
639	 */
640
641	spin_lock(&ls->ls_recover_lock);
642	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
643	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 
 
644	ls->ls_recover_seq++;
645	spin_unlock(&ls->ls_recover_lock);
 
 
 
 
 
646
647	/*
648	 * Let dlm_recv run again, now any normal messages will be saved on the
649	 * requestqueue for later.
650	 */
651
652	up_write(&ls->ls_recv_active);
653
654	/*
655	 * This in_recovery lock does two things:
656	 * 1) Keeps this function from returning until all threads are out
657	 *    of locking routines and locking is truly stopped.
658	 * 2) Keeps any new requests from being processed until it's unlocked
659	 *    when recovery is complete.
660	 */
661
662	if (new) {
663		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
664		wake_up_process(ls->ls_recoverd_task);
665		wait_event(ls->ls_recover_lock_wait,
666			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
667	}
668
669	/*
670	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
671	 * running) has noticed RECOVER_STOP above and quit processing the
672	 * previous recovery.
673	 */
674
675	dlm_recoverd_suspend(ls);
676
677	spin_lock(&ls->ls_recover_lock);
678	kfree(ls->ls_slots);
679	ls->ls_slots = NULL;
680	ls->ls_num_slots = 0;
681	ls->ls_slots_size = 0;
682	ls->ls_recover_status = 0;
683	spin_unlock(&ls->ls_recover_lock);
684
685	dlm_recoverd_resume(ls);
686
687	if (!ls->ls_recover_begin)
688		ls->ls_recover_begin = jiffies;
689
690	/* call recover_prep ops only once and not multiple times
691	 * for each possible dlm_ls_stop() when recovery is already
692	 * stopped.
693	 *
694	 * If we successful was able to clear LSFL_RUNNING bit and
695	 * it was set we know it is the first dlm_ls_stop() call.
696	 */
697	if (new)
698		dlm_lsop_recover_prep(ls);
699
700	return 0;
701}
702
703int dlm_ls_start(struct dlm_ls *ls)
704{
705	struct dlm_recover *rv, *rv_old;
706	struct dlm_config_node *nodes = NULL;
707	int error, count;
708
709	rv = kzalloc(sizeof(*rv), GFP_NOFS);
710	if (!rv)
711		return -ENOMEM;
712
713	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
714	if (error < 0)
715		goto fail_rv;
716
717	spin_lock(&ls->ls_recover_lock);
718
719	/* the lockspace needs to be stopped before it can be started */
720
721	if (!dlm_locking_stopped(ls)) {
722		spin_unlock(&ls->ls_recover_lock);
723		log_error(ls, "start ignored: lockspace running");
724		error = -EINVAL;
725		goto fail;
726	}
727
728	rv->nodes = nodes;
729	rv->nodes_count = count;
730	rv->seq = ++ls->ls_recover_seq;
731	rv_old = ls->ls_recover_args;
732	ls->ls_recover_args = rv;
733	spin_unlock(&ls->ls_recover_lock);
734
735	if (rv_old) {
736		log_error(ls, "unused recovery %llx %d",
737			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
738		kfree(rv_old->nodes);
739		kfree(rv_old);
740	}
741
742	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
743	wake_up_process(ls->ls_recoverd_task);
744	return 0;
745
746 fail:
747	kfree(nodes);
748 fail_rv:
749	kfree(rv);
750	return error;
751}
752
v6.13.7
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
  6**
  7**
  8*******************************************************************************
  9******************************************************************************/
 10
 11#include "dlm_internal.h"
 12#include "lockspace.h"
 13#include "member.h"
 14#include "recoverd.h"
 15#include "recover.h"
 16#include "rcom.h"
 17#include "config.h"
 18#include "midcomms.h"
 19#include "lowcomms.h"
 20
 21int dlm_slots_version(const struct dlm_header *h)
 22{
 23	if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
 24		return 0;
 25	return 1;
 26}
 27
 28void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
 29		   struct dlm_member *memb)
 30{
 31	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 32
 33	if (!dlm_slots_version(&rc->rc_header))
 34		return;
 35
 36	memb->slot = le16_to_cpu(rf->rf_our_slot);
 37	memb->generation = le32_to_cpu(rf->rf_generation);
 38}
 39
 40void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
 41{
 42	struct dlm_slot *slot;
 43	struct rcom_slot *ro;
 44	int i;
 45
 46	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 47
 48	/* ls_slots array is sparse, but not rcom_slots */
 49
 50	for (i = 0; i < ls->ls_slots_size; i++) {
 51		slot = &ls->ls_slots[i];
 52		if (!slot->nodeid)
 53			continue;
 54		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
 55		ro->ro_slot = cpu_to_le16(slot->slot);
 56		ro++;
 57	}
 58}
 59
 60#define SLOT_DEBUG_LINE 128
 61
 62static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
 63		      struct rcom_slot *ro0, struct dlm_slot *array,
 64		      int array_size)
 65{
 66	char line[SLOT_DEBUG_LINE];
 67	int len = SLOT_DEBUG_LINE - 1;
 68	int pos = 0;
 69	int ret, i;
 70
 71	memset(line, 0, sizeof(line));
 72
 73	if (array) {
 74		for (i = 0; i < array_size; i++) {
 75			if (!array[i].nodeid)
 76				continue;
 77
 78			ret = snprintf(line + pos, len - pos, " %d:%d",
 79				       array[i].slot, array[i].nodeid);
 80			if (ret >= len - pos)
 81				break;
 82			pos += ret;
 83		}
 84	} else if (ro0) {
 85		for (i = 0; i < num_slots; i++) {
 86			ret = snprintf(line + pos, len - pos, " %d:%d",
 87				       ro0[i].ro_slot, ro0[i].ro_nodeid);
 88			if (ret >= len - pos)
 89				break;
 90			pos += ret;
 91		}
 92	}
 93
 94	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
 95}
 96
 97int dlm_slots_copy_in(struct dlm_ls *ls)
 98{
 99	struct dlm_member *memb;
100	struct dlm_rcom *rc = ls->ls_recover_buf;
101	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
102	struct rcom_slot *ro0, *ro;
103	int our_nodeid = dlm_our_nodeid();
104	int i, num_slots;
105	uint32_t gen;
106
107	if (!dlm_slots_version(&rc->rc_header))
108		return -1;
109
110	gen = le32_to_cpu(rf->rf_generation);
111	if (gen <= ls->ls_generation) {
112		log_error(ls, "dlm_slots_copy_in gen %u old %u",
113			  gen, ls->ls_generation);
114	}
115	ls->ls_generation = gen;
116
117	num_slots = le16_to_cpu(rf->rf_num_slots);
118	if (!num_slots)
119		return -1;
120
121	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
122
123	log_slots(ls, gen, num_slots, ro0, NULL, 0);
124
125	list_for_each_entry(memb, &ls->ls_nodes, list) {
126		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
127			if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
128				continue;
129			memb->slot = le16_to_cpu(ro->ro_slot);
130			memb->slot_prev = memb->slot;
131			break;
132		}
133
134		if (memb->nodeid == our_nodeid) {
135			if (ls->ls_slot && ls->ls_slot != memb->slot) {
136				log_error(ls, "dlm_slots_copy_in our slot "
137					  "changed %d %d", ls->ls_slot,
138					  memb->slot);
139				return -1;
140			}
141
142			if (!ls->ls_slot)
143				ls->ls_slot = memb->slot;
144		}
145
146		if (!memb->slot) {
147			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
148				   memb->nodeid);
149			return -1;
150		}
151	}
152
153	return 0;
154}
155
156/* for any nodes that do not support slots, we will not have set memb->slot
157   in wait_status_all(), so memb->slot will remain -1, and we will not
158   assign slots or set ls_num_slots here */
159
160int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
161		     struct dlm_slot **slots_out, uint32_t *gen_out)
162{
163	struct dlm_member *memb;
164	struct dlm_slot *array;
165	int our_nodeid = dlm_our_nodeid();
166	int array_size, max_slots, i;
167	int need = 0;
168	int max = 0;
169	int num = 0;
170	uint32_t gen = 0;
171
172	/* our own memb struct will have slot -1 gen 0 */
173
174	list_for_each_entry(memb, &ls->ls_nodes, list) {
175		if (memb->nodeid == our_nodeid) {
176			memb->slot = ls->ls_slot;
177			memb->generation = ls->ls_generation;
178			break;
179		}
180	}
181
182	list_for_each_entry(memb, &ls->ls_nodes, list) {
183		if (memb->generation > gen)
184			gen = memb->generation;
185
186		/* node doesn't support slots */
187
188		if (memb->slot == -1)
189			return -1;
190
191		/* node needs a slot assigned */
192
193		if (!memb->slot)
194			need++;
195
196		/* node has a slot assigned */
197
198		num++;
199
200		if (!max || max < memb->slot)
201			max = memb->slot;
202
203		/* sanity check, once slot is assigned it shouldn't change */
204
205		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
206			log_error(ls, "nodeid %d slot changed %d %d",
207				  memb->nodeid, memb->slot_prev, memb->slot);
208			return -1;
209		}
210		memb->slot_prev = memb->slot;
211	}
212
213	array_size = max + need;
214	array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
215	if (!array)
216		return -ENOMEM;
217
218	num = 0;
219
220	/* fill in slots (offsets) that are used */
221
222	list_for_each_entry(memb, &ls->ls_nodes, list) {
223		if (!memb->slot)
224			continue;
225
226		if (memb->slot > array_size) {
227			log_error(ls, "invalid slot number %d", memb->slot);
228			kfree(array);
229			return -1;
230		}
231
232		array[memb->slot - 1].nodeid = memb->nodeid;
233		array[memb->slot - 1].slot = memb->slot;
234		num++;
235	}
236
237	/* assign new slots from unused offsets */
238
239	list_for_each_entry(memb, &ls->ls_nodes, list) {
240		if (memb->slot)
241			continue;
242
243		for (i = 0; i < array_size; i++) {
244			if (array[i].nodeid)
245				continue;
246
247			memb->slot = i + 1;
248			memb->slot_prev = memb->slot;
249			array[i].nodeid = memb->nodeid;
250			array[i].slot = memb->slot;
251			num++;
252
253			if (!ls->ls_slot && memb->nodeid == our_nodeid)
254				ls->ls_slot = memb->slot;
255			break;
256		}
257
258		if (!memb->slot) {
259			log_error(ls, "no free slot found");
260			kfree(array);
261			return -1;
262		}
263	}
264
265	gen++;
266
267	log_slots(ls, gen, num, NULL, array, array_size);
268
269	max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
270		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
271
272	if (num > max_slots) {
273		log_error(ls, "num_slots %d exceeds max_slots %d",
274			  num, max_slots);
275		kfree(array);
276		return -1;
277	}
278
279	*gen_out = gen;
280	*slots_out = array;
281	*slots_size = array_size;
282	*num_slots = num;
283	return 0;
284}
285
286static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
287{
288	struct dlm_member *memb = NULL;
289	struct list_head *tmp;
290	struct list_head *newlist = &new->list;
291	struct list_head *head = &ls->ls_nodes;
292
293	list_for_each(tmp, head) {
294		memb = list_entry(tmp, struct dlm_member, list);
295		if (new->nodeid < memb->nodeid)
296			break;
297	}
298
299	if (!memb)
300		list_add_tail(newlist, head);
301	else {
302		/* FIXME: can use list macro here */
303		newlist->prev = tmp->prev;
304		newlist->next = tmp;
305		tmp->prev->next = newlist;
306		tmp->prev = newlist;
307	}
308}
309
310static int add_remote_member(int nodeid)
311{
312	int error;
313
314	if (nodeid == dlm_our_nodeid())
315		return 0;
316
317	error = dlm_lowcomms_connect_node(nodeid);
318	if (error < 0)
319		return error;
320
321	dlm_midcomms_add_member(nodeid);
322	return 0;
323}
324
325static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
326{
327	struct dlm_member *memb;
328	int error;
329
330	memb = kzalloc(sizeof(*memb), GFP_NOFS);
331	if (!memb)
332		return -ENOMEM;
333
334	memb->nodeid = node->nodeid;
335	memb->weight = node->weight;
336	memb->comm_seq = node->comm_seq;
337
338	error = add_remote_member(node->nodeid);
339	if (error < 0) {
340		kfree(memb);
341		return error;
342	}
343
344	add_ordered_member(ls, memb);
345	ls->ls_num_nodes++;
346	return 0;
347}
348
349static struct dlm_member *find_memb(struct list_head *head, int nodeid)
350{
351	struct dlm_member *memb;
352
353	list_for_each_entry(memb, head, list) {
354		if (memb->nodeid == nodeid)
355			return memb;
356	}
357	return NULL;
358}
359
360int dlm_is_member(struct dlm_ls *ls, int nodeid)
361{
362	if (find_memb(&ls->ls_nodes, nodeid))
363		return 1;
364	return 0;
365}
366
367int dlm_is_removed(struct dlm_ls *ls, int nodeid)
368{
369	WARN_ON_ONCE(!nodeid || nodeid == -1);
370
371	if (find_memb(&ls->ls_nodes_gone, nodeid))
372		return 1;
373	return 0;
374}
375
376static void clear_memb_list(struct list_head *head,
377			    void (*after_del)(int nodeid))
378{
379	struct dlm_member *memb;
380
381	while (!list_empty(head)) {
382		memb = list_entry(head->next, struct dlm_member, list);
383		list_del(&memb->list);
384		if (after_del)
385			after_del(memb->nodeid);
386		kfree(memb);
387	}
388}
389
390static void remove_remote_member(int nodeid)
391{
392	if (nodeid == dlm_our_nodeid())
393		return;
394
395	dlm_midcomms_remove_member(nodeid);
396}
397
398void dlm_clear_members(struct dlm_ls *ls)
399{
400	clear_memb_list(&ls->ls_nodes, remove_remote_member);
401	ls->ls_num_nodes = 0;
402}
403
404void dlm_clear_members_gone(struct dlm_ls *ls)
405{
406	clear_memb_list(&ls->ls_nodes_gone, NULL);
407}
408
409static void make_member_array(struct dlm_ls *ls)
410{
411	struct dlm_member *memb;
412	int i, w, x = 0, total = 0, all_zero = 0, *array;
413
414	kfree(ls->ls_node_array);
415	ls->ls_node_array = NULL;
416
417	list_for_each_entry(memb, &ls->ls_nodes, list) {
418		if (memb->weight)
419			total += memb->weight;
420	}
421
422	/* all nodes revert to weight of 1 if all have weight 0 */
423
424	if (!total) {
425		total = ls->ls_num_nodes;
426		all_zero = 1;
427	}
428
429	ls->ls_total_weight = total;
430	array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
431	if (!array)
432		return;
433
434	list_for_each_entry(memb, &ls->ls_nodes, list) {
435		if (!all_zero && !memb->weight)
436			continue;
437
438		if (all_zero)
439			w = 1;
440		else
441			w = memb->weight;
442
443		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
444
445		for (i = 0; i < w; i++)
446			array[x++] = memb->nodeid;
447	}
448
449	ls->ls_node_array = array;
450}
451
452/* send a status request to all members just to establish comms connections */
453
454static int ping_members(struct dlm_ls *ls, uint64_t seq)
455{
456	struct dlm_member *memb;
457	int error = 0;
458
459	list_for_each_entry(memb, &ls->ls_nodes, list) {
460		if (dlm_recovery_stopped(ls)) {
461			error = -EINTR;
462			break;
463		}
464		error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
465		if (error)
466			break;
467	}
468	if (error)
469		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
470			  error, ls->ls_recover_nodeid);
471	return error;
472}
473
474static void dlm_lsop_recover_prep(struct dlm_ls *ls)
475{
476	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
477		return;
478	ls->ls_ops->recover_prep(ls->ls_ops_arg);
479}
480
481static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
482{
483	struct dlm_slot slot;
484	uint32_t seq;
485	int error;
486
487	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
488		return;
489
490	/* if there is no comms connection with this node
491	   or the present comms connection is newer
492	   than the one when this member was added, then
493	   we consider the node to have failed (versus
494	   being removed due to dlm_release_lockspace) */
495
496	error = dlm_comm_seq(memb->nodeid, &seq, false);
497
498	if (!error && seq == memb->comm_seq)
499		return;
500
501	slot.nodeid = memb->nodeid;
502	slot.slot = memb->slot;
503
504	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
505}
506
507void dlm_lsop_recover_done(struct dlm_ls *ls)
508{
509	struct dlm_member *memb;
510	struct dlm_slot *slots;
511	int i, num;
512
513	if (!ls->ls_ops || !ls->ls_ops->recover_done)
514		return;
515
516	num = ls->ls_num_nodes;
517	slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
518	if (!slots)
519		return;
520
521	i = 0;
522	list_for_each_entry(memb, &ls->ls_nodes, list) {
523		if (i == num) {
524			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
525			goto out;
526		}
527		slots[i].nodeid = memb->nodeid;
528		slots[i].slot = memb->slot;
529		i++;
530	}
531
532	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
533				 ls->ls_slot, ls->ls_generation);
534 out:
535	kfree(slots);
536}
537
538static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
539						int nodeid)
540{
541	int i;
542
543	for (i = 0; i < rv->nodes_count; i++) {
544		if (rv->nodes[i].nodeid == nodeid)
545			return &rv->nodes[i];
546	}
547	return NULL;
548}
549
550int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
551{
552	struct dlm_member *memb, *safe;
553	struct dlm_config_node *node;
554	int i, error, neg = 0, low = -1;
555
556	/* previously removed members that we've not finished removing need to
557	 * count as a negative change so the "neg" recovery steps will happen
558	 *
559	 * This functionality must report all member changes to lsops or
560	 * midcomms layer and must never return before.
561	 */
562
563	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
564		log_rinfo(ls, "prev removed member %d", memb->nodeid);
565		neg++;
566	}
567
568	/* move departed members from ls_nodes to ls_nodes_gone */
569
570	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
571		node = find_config_node(rv, memb->nodeid);
572		if (node && !node->new)
573			continue;
574
575		if (!node) {
576			log_rinfo(ls, "remove member %d", memb->nodeid);
577		} else {
578			/* removed and re-added */
579			log_rinfo(ls, "remove member %d comm_seq %u %u",
580				  memb->nodeid, memb->comm_seq, node->comm_seq);
581		}
582
583		neg++;
584		list_move(&memb->list, &ls->ls_nodes_gone);
585		remove_remote_member(memb->nodeid);
586		ls->ls_num_nodes--;
587		dlm_lsop_recover_slot(ls, memb);
588	}
589
590	/* add new members to ls_nodes */
591
592	for (i = 0; i < rv->nodes_count; i++) {
593		node = &rv->nodes[i];
594		if (dlm_is_member(ls, node->nodeid))
595			continue;
596		error = dlm_add_member(ls, node);
597		if (error)
598			return error;
599
600		log_rinfo(ls, "add member %d", node->nodeid);
601	}
602
603	list_for_each_entry(memb, &ls->ls_nodes, list) {
604		if (low == -1 || memb->nodeid < low)
605			low = memb->nodeid;
606	}
607	ls->ls_low_nodeid = low;
608
609	make_member_array(ls);
610	*neg_out = neg;
611
612	error = ping_members(ls, rv->seq);
613	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
614	return error;
615}
616
617/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
618   dlm_ls_start() is called on any of them to start the new recovery. */
619
620int dlm_ls_stop(struct dlm_ls *ls)
621{
622	int new;
623
624	/*
625	 * Prevent dlm_recv from being in the middle of something when we do
626	 * the stop.  This includes ensuring dlm_recv isn't processing a
627	 * recovery message (rcom), while dlm_recoverd is aborting and
628	 * resetting things from an in-progress recovery.  i.e. we want
629	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
630	 * processing an rcom at the same time.  Stopping dlm_recv also makes
631	 * it easy for dlm_receive_message() to check locking stopped and add a
632	 * message to the requestqueue without races.
633	 */
634
635	write_lock_bh(&ls->ls_recv_active);
636
637	/*
638	 * Abort any recovery that's in progress (see RECOVER_STOP,
639	 * dlm_recovery_stopped()) and tell any other threads running in the
640	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
641	 */
642
643	spin_lock_bh(&ls->ls_recover_lock);
644	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
645	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
646	if (new)
647		timer_delete_sync(&ls->ls_scan_timer);
648	ls->ls_recover_seq++;
649
650	/* activate requestqueue and stop processing */
651	write_lock_bh(&ls->ls_requestqueue_lock);
652	set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
653	write_unlock_bh(&ls->ls_requestqueue_lock);
654	spin_unlock_bh(&ls->ls_recover_lock);
655
656	/*
657	 * Let dlm_recv run again, now any normal messages will be saved on the
658	 * requestqueue for later.
659	 */
660
661	write_unlock_bh(&ls->ls_recv_active);
662
663	/*
664	 * This in_recovery lock does two things:
665	 * 1) Keeps this function from returning until all threads are out
666	 *    of locking routines and locking is truly stopped.
667	 * 2) Keeps any new requests from being processed until it's unlocked
668	 *    when recovery is complete.
669	 */
670
671	if (new) {
672		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
673		wake_up_process(ls->ls_recoverd_task);
674		wait_event(ls->ls_recover_lock_wait,
675			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
676	}
677
678	/*
679	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
680	 * running) has noticed RECOVER_STOP above and quit processing the
681	 * previous recovery.
682	 */
683
684	dlm_recoverd_suspend(ls);
685
686	spin_lock_bh(&ls->ls_recover_lock);
687	kfree(ls->ls_slots);
688	ls->ls_slots = NULL;
689	ls->ls_num_slots = 0;
690	ls->ls_slots_size = 0;
691	ls->ls_recover_status = 0;
692	spin_unlock_bh(&ls->ls_recover_lock);
693
694	dlm_recoverd_resume(ls);
695
696	if (!ls->ls_recover_begin)
697		ls->ls_recover_begin = jiffies;
698
699	/* call recover_prep ops only once and not multiple times
700	 * for each possible dlm_ls_stop() when recovery is already
701	 * stopped.
702	 *
703	 * If we successful was able to clear LSFL_RUNNING bit and
704	 * it was set we know it is the first dlm_ls_stop() call.
705	 */
706	if (new)
707		dlm_lsop_recover_prep(ls);
708
709	return 0;
710}
711
712int dlm_ls_start(struct dlm_ls *ls)
713{
714	struct dlm_recover *rv, *rv_old;
715	struct dlm_config_node *nodes = NULL;
716	int error, count;
717
718	rv = kzalloc(sizeof(*rv), GFP_NOFS);
719	if (!rv)
720		return -ENOMEM;
721
722	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
723	if (error < 0)
724		goto fail_rv;
725
726	spin_lock_bh(&ls->ls_recover_lock);
727
728	/* the lockspace needs to be stopped before it can be started */
729
730	if (!dlm_locking_stopped(ls)) {
731		spin_unlock_bh(&ls->ls_recover_lock);
732		log_error(ls, "start ignored: lockspace running");
733		error = -EINVAL;
734		goto fail;
735	}
736
737	rv->nodes = nodes;
738	rv->nodes_count = count;
739	rv->seq = ++ls->ls_recover_seq;
740	rv_old = ls->ls_recover_args;
741	ls->ls_recover_args = rv;
742	spin_unlock_bh(&ls->ls_recover_lock);
743
744	if (rv_old) {
745		log_error(ls, "unused recovery %llx %d",
746			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
747		kfree(rv_old->nodes);
748		kfree(rv_old);
749	}
750
751	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
752	wake_up_process(ls->ls_recoverd_task);
753	return 0;
754
755 fail:
756	kfree(nodes);
757 fail_rv:
758	kfree(rv);
759	return error;
760}
761