Linux Audio

Check our new training course

Loading...
v4.6
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
  5**
  6**  This copyrighted material is made available to anyone wishing to use,
  7**  modify, copy, or redistribute it subject to the terms and conditions
  8**  of the GNU General Public License v.2.
  9**
 10*******************************************************************************
 11******************************************************************************/
 12
 13#include "dlm_internal.h"
 14#include "lockspace.h"
 15#include "member.h"
 16#include "recoverd.h"
 17#include "recover.h"
 18#include "rcom.h"
 19#include "config.h"
 20#include "lowcomms.h"
 21
 22int dlm_slots_version(struct dlm_header *h)
 23{
 24	if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
 25		return 0;
 26	return 1;
 27}
 28
 29void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
 30		   struct dlm_member *memb)
 31{
 32	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 33
 34	if (!dlm_slots_version(&rc->rc_header))
 35		return;
 36
 37	memb->slot = le16_to_cpu(rf->rf_our_slot);
 38	memb->generation = le32_to_cpu(rf->rf_generation);
 39}
 40
 41void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
 42{
 43	struct dlm_slot *slot;
 44	struct rcom_slot *ro;
 45	int i;
 46
 47	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 48
 49	/* ls_slots array is sparse, but not rcom_slots */
 50
 51	for (i = 0; i < ls->ls_slots_size; i++) {
 52		slot = &ls->ls_slots[i];
 53		if (!slot->nodeid)
 54			continue;
 55		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
 56		ro->ro_slot = cpu_to_le16(slot->slot);
 57		ro++;
 58	}
 59}
 60
 61#define SLOT_DEBUG_LINE 128
 62
 63static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
 64		      struct rcom_slot *ro0, struct dlm_slot *array,
 65		      int array_size)
 66{
 67	char line[SLOT_DEBUG_LINE];
 68	int len = SLOT_DEBUG_LINE - 1;
 69	int pos = 0;
 70	int ret, i;
 71
 
 
 
 72	memset(line, 0, sizeof(line));
 73
 74	if (array) {
 75		for (i = 0; i < array_size; i++) {
 76			if (!array[i].nodeid)
 77				continue;
 78
 79			ret = snprintf(line + pos, len - pos, " %d:%d",
 80				       array[i].slot, array[i].nodeid);
 81			if (ret >= len - pos)
 82				break;
 83			pos += ret;
 84		}
 85	} else if (ro0) {
 86		for (i = 0; i < num_slots; i++) {
 87			ret = snprintf(line + pos, len - pos, " %d:%d",
 88				       ro0[i].ro_slot, ro0[i].ro_nodeid);
 89			if (ret >= len - pos)
 90				break;
 91			pos += ret;
 92		}
 93	}
 94
 95	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
 96}
 97
 98int dlm_slots_copy_in(struct dlm_ls *ls)
 99{
100	struct dlm_member *memb;
101	struct dlm_rcom *rc = ls->ls_recover_buf;
102	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
103	struct rcom_slot *ro0, *ro;
104	int our_nodeid = dlm_our_nodeid();
105	int i, num_slots;
106	uint32_t gen;
107
108	if (!dlm_slots_version(&rc->rc_header))
109		return -1;
110
111	gen = le32_to_cpu(rf->rf_generation);
112	if (gen <= ls->ls_generation) {
113		log_error(ls, "dlm_slots_copy_in gen %u old %u",
114			  gen, ls->ls_generation);
115	}
116	ls->ls_generation = gen;
117
118	num_slots = le16_to_cpu(rf->rf_num_slots);
119	if (!num_slots)
120		return -1;
121
122	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
123
124	for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
125		ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
126		ro->ro_slot = le16_to_cpu(ro->ro_slot);
127	}
128
129	log_slots(ls, gen, num_slots, ro0, NULL, 0);
130
131	list_for_each_entry(memb, &ls->ls_nodes, list) {
132		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
133			if (ro->ro_nodeid != memb->nodeid)
134				continue;
135			memb->slot = ro->ro_slot;
136			memb->slot_prev = memb->slot;
137			break;
138		}
139
140		if (memb->nodeid == our_nodeid) {
141			if (ls->ls_slot && ls->ls_slot != memb->slot) {
142				log_error(ls, "dlm_slots_copy_in our slot "
143					  "changed %d %d", ls->ls_slot,
144					  memb->slot);
145				return -1;
146			}
147
148			if (!ls->ls_slot)
149				ls->ls_slot = memb->slot;
150		}
151
152		if (!memb->slot) {
153			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
154				   memb->nodeid);
155			return -1;
156		}
157	}
158
159	return 0;
160}
161
162/* for any nodes that do not support slots, we will not have set memb->slot
163   in wait_status_all(), so memb->slot will remain -1, and we will not
164   assign slots or set ls_num_slots here */
165
166int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
167		     struct dlm_slot **slots_out, uint32_t *gen_out)
168{
169	struct dlm_member *memb;
170	struct dlm_slot *array;
171	int our_nodeid = dlm_our_nodeid();
172	int array_size, max_slots, i;
173	int need = 0;
174	int max = 0;
175	int num = 0;
176	uint32_t gen = 0;
177
178	/* our own memb struct will have slot -1 gen 0 */
179
180	list_for_each_entry(memb, &ls->ls_nodes, list) {
181		if (memb->nodeid == our_nodeid) {
182			memb->slot = ls->ls_slot;
183			memb->generation = ls->ls_generation;
184			break;
185		}
186	}
187
188	list_for_each_entry(memb, &ls->ls_nodes, list) {
189		if (memb->generation > gen)
190			gen = memb->generation;
191
192		/* node doesn't support slots */
193
194		if (memb->slot == -1)
195			return -1;
196
197		/* node needs a slot assigned */
198
199		if (!memb->slot)
200			need++;
201
202		/* node has a slot assigned */
203
204		num++;
205
206		if (!max || max < memb->slot)
207			max = memb->slot;
208
209		/* sanity check, once slot is assigned it shouldn't change */
210
211		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
212			log_error(ls, "nodeid %d slot changed %d %d",
213				  memb->nodeid, memb->slot_prev, memb->slot);
214			return -1;
215		}
216		memb->slot_prev = memb->slot;
217	}
218
219	array_size = max + need;
220
221	array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
222	if (!array)
223		return -ENOMEM;
224
225	num = 0;
226
227	/* fill in slots (offsets) that are used */
228
229	list_for_each_entry(memb, &ls->ls_nodes, list) {
230		if (!memb->slot)
231			continue;
232
233		if (memb->slot > array_size) {
234			log_error(ls, "invalid slot number %d", memb->slot);
235			kfree(array);
236			return -1;
237		}
238
239		array[memb->slot - 1].nodeid = memb->nodeid;
240		array[memb->slot - 1].slot = memb->slot;
241		num++;
242	}
243
244	/* assign new slots from unused offsets */
245
246	list_for_each_entry(memb, &ls->ls_nodes, list) {
247		if (memb->slot)
248			continue;
249
250		for (i = 0; i < array_size; i++) {
251			if (array[i].nodeid)
252				continue;
253
254			memb->slot = i + 1;
255			memb->slot_prev = memb->slot;
256			array[i].nodeid = memb->nodeid;
257			array[i].slot = memb->slot;
258			num++;
259
260			if (!ls->ls_slot && memb->nodeid == our_nodeid)
261				ls->ls_slot = memb->slot;
262			break;
263		}
264
265		if (!memb->slot) {
266			log_error(ls, "no free slot found");
267			kfree(array);
268			return -1;
269		}
270	}
271
272	gen++;
273
274	log_slots(ls, gen, num, NULL, array, array_size);
275
276	max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
277		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
278
279	if (num > max_slots) {
280		log_error(ls, "num_slots %d exceeds max_slots %d",
281			  num, max_slots);
282		kfree(array);
283		return -1;
284	}
285
286	*gen_out = gen;
287	*slots_out = array;
288	*slots_size = array_size;
289	*num_slots = num;
290	return 0;
291}
292
293static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
294{
295	struct dlm_member *memb = NULL;
296	struct list_head *tmp;
297	struct list_head *newlist = &new->list;
298	struct list_head *head = &ls->ls_nodes;
299
300	list_for_each(tmp, head) {
301		memb = list_entry(tmp, struct dlm_member, list);
302		if (new->nodeid < memb->nodeid)
303			break;
304	}
305
306	if (!memb)
307		list_add_tail(newlist, head);
308	else {
309		/* FIXME: can use list macro here */
310		newlist->prev = tmp->prev;
311		newlist->next = tmp;
312		tmp->prev->next = newlist;
313		tmp->prev = newlist;
314	}
315}
316
317static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
318{
319	struct dlm_member *memb;
320	int error;
321
322	memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
323	if (!memb)
324		return -ENOMEM;
325
326	error = dlm_lowcomms_connect_node(node->nodeid);
327	if (error < 0) {
328		kfree(memb);
329		return error;
330	}
331
332	memb->nodeid = node->nodeid;
333	memb->weight = node->weight;
334	memb->comm_seq = node->comm_seq;
335	add_ordered_member(ls, memb);
336	ls->ls_num_nodes++;
337	return 0;
338}
339
340static struct dlm_member *find_memb(struct list_head *head, int nodeid)
341{
342	struct dlm_member *memb;
343
344	list_for_each_entry(memb, head, list) {
345		if (memb->nodeid == nodeid)
346			return memb;
347	}
348	return NULL;
349}
350
351int dlm_is_member(struct dlm_ls *ls, int nodeid)
352{
353	if (find_memb(&ls->ls_nodes, nodeid))
354		return 1;
355	return 0;
356}
357
358int dlm_is_removed(struct dlm_ls *ls, int nodeid)
359{
360	if (find_memb(&ls->ls_nodes_gone, nodeid))
361		return 1;
362	return 0;
363}
364
365static void clear_memb_list(struct list_head *head)
366{
367	struct dlm_member *memb;
368
369	while (!list_empty(head)) {
370		memb = list_entry(head->next, struct dlm_member, list);
371		list_del(&memb->list);
372		kfree(memb);
373	}
374}
375
376void dlm_clear_members(struct dlm_ls *ls)
377{
378	clear_memb_list(&ls->ls_nodes);
379	ls->ls_num_nodes = 0;
380}
381
382void dlm_clear_members_gone(struct dlm_ls *ls)
383{
384	clear_memb_list(&ls->ls_nodes_gone);
385}
386
387static void make_member_array(struct dlm_ls *ls)
388{
389	struct dlm_member *memb;
390	int i, w, x = 0, total = 0, all_zero = 0, *array;
391
392	kfree(ls->ls_node_array);
393	ls->ls_node_array = NULL;
394
395	list_for_each_entry(memb, &ls->ls_nodes, list) {
396		if (memb->weight)
397			total += memb->weight;
398	}
399
400	/* all nodes revert to weight of 1 if all have weight 0 */
401
402	if (!total) {
403		total = ls->ls_num_nodes;
404		all_zero = 1;
405	}
406
407	ls->ls_total_weight = total;
408
409	array = kmalloc(sizeof(int) * total, GFP_NOFS);
410	if (!array)
411		return;
412
413	list_for_each_entry(memb, &ls->ls_nodes, list) {
414		if (!all_zero && !memb->weight)
415			continue;
416
417		if (all_zero)
418			w = 1;
419		else
420			w = memb->weight;
421
422		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
423
424		for (i = 0; i < w; i++)
425			array[x++] = memb->nodeid;
426	}
427
428	ls->ls_node_array = array;
429}
430
431/* send a status request to all members just to establish comms connections */
432
433static int ping_members(struct dlm_ls *ls)
434{
435	struct dlm_member *memb;
436	int error = 0;
437
438	list_for_each_entry(memb, &ls->ls_nodes, list) {
439		error = dlm_recovery_stopped(ls);
440		if (error)
441			break;
442		error = dlm_rcom_status(ls, memb->nodeid, 0);
443		if (error)
444			break;
445	}
446	if (error)
447		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
448			  error, ls->ls_recover_nodeid);
449	return error;
450}
451
452static void dlm_lsop_recover_prep(struct dlm_ls *ls)
453{
454	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
455		return;
456	ls->ls_ops->recover_prep(ls->ls_ops_arg);
457}
458
459static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
460{
461	struct dlm_slot slot;
462	uint32_t seq;
463	int error;
464
465	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
466		return;
467
468	/* if there is no comms connection with this node
469	   or the present comms connection is newer
470	   than the one when this member was added, then
471	   we consider the node to have failed (versus
472	   being removed due to dlm_release_lockspace) */
473
474	error = dlm_comm_seq(memb->nodeid, &seq);
475
476	if (!error && seq == memb->comm_seq)
477		return;
478
479	slot.nodeid = memb->nodeid;
480	slot.slot = memb->slot;
481
482	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
483}
484
485void dlm_lsop_recover_done(struct dlm_ls *ls)
486{
487	struct dlm_member *memb;
488	struct dlm_slot *slots;
489	int i, num;
490
491	if (!ls->ls_ops || !ls->ls_ops->recover_done)
492		return;
493
494	num = ls->ls_num_nodes;
495
496	slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
497	if (!slots)
498		return;
499
500	i = 0;
501	list_for_each_entry(memb, &ls->ls_nodes, list) {
502		if (i == num) {
503			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
504			goto out;
505		}
506		slots[i].nodeid = memb->nodeid;
507		slots[i].slot = memb->slot;
508		i++;
509	}
510
511	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
512				 ls->ls_slot, ls->ls_generation);
513 out:
514	kfree(slots);
515}
516
517static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
518						int nodeid)
519{
520	int i;
521
522	for (i = 0; i < rv->nodes_count; i++) {
523		if (rv->nodes[i].nodeid == nodeid)
524			return &rv->nodes[i];
525	}
526	return NULL;
527}
528
529int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
530{
531	struct dlm_member *memb, *safe;
532	struct dlm_config_node *node;
533	int i, error, neg = 0, low = -1;
534
535	/* previously removed members that we've not finished removing need to
536	   count as a negative change so the "neg" recovery steps will happen */
537
538	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
539		log_rinfo(ls, "prev removed member %d", memb->nodeid);
540		neg++;
541	}
542
543	/* move departed members from ls_nodes to ls_nodes_gone */
544
545	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
546		node = find_config_node(rv, memb->nodeid);
547		if (node && !node->new)
548			continue;
549
550		if (!node) {
551			log_rinfo(ls, "remove member %d", memb->nodeid);
552		} else {
553			/* removed and re-added */
554			log_rinfo(ls, "remove member %d comm_seq %u %u",
555				  memb->nodeid, memb->comm_seq, node->comm_seq);
556		}
557
558		neg++;
559		list_move(&memb->list, &ls->ls_nodes_gone);
560		ls->ls_num_nodes--;
561		dlm_lsop_recover_slot(ls, memb);
562	}
563
564	/* add new members to ls_nodes */
565
566	for (i = 0; i < rv->nodes_count; i++) {
567		node = &rv->nodes[i];
568		if (dlm_is_member(ls, node->nodeid))
569			continue;
570		dlm_add_member(ls, node);
571		log_rinfo(ls, "add member %d", node->nodeid);
572	}
573
574	list_for_each_entry(memb, &ls->ls_nodes, list) {
575		if (low == -1 || memb->nodeid < low)
576			low = memb->nodeid;
577	}
578	ls->ls_low_nodeid = low;
579
580	make_member_array(ls);
581	*neg_out = neg;
582
583	error = ping_members(ls);
584	if (!error || error == -EPROTO) {
585		/* new_lockspace() may be waiting to know if the config
586		   is good or bad */
587		ls->ls_members_result = error;
588		complete(&ls->ls_members_done);
589	}
590
591	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
592	return error;
593}
594
595/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
596   dlm_ls_start() is called on any of them to start the new recovery. */
597
598int dlm_ls_stop(struct dlm_ls *ls)
599{
600	int new;
601
602	/*
603	 * Prevent dlm_recv from being in the middle of something when we do
604	 * the stop.  This includes ensuring dlm_recv isn't processing a
605	 * recovery message (rcom), while dlm_recoverd is aborting and
606	 * resetting things from an in-progress recovery.  i.e. we want
607	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
608	 * processing an rcom at the same time.  Stopping dlm_recv also makes
609	 * it easy for dlm_receive_message() to check locking stopped and add a
610	 * message to the requestqueue without races.
611	 */
612
613	down_write(&ls->ls_recv_active);
614
615	/*
616	 * Abort any recovery that's in progress (see RECOVER_STOP,
617	 * dlm_recovery_stopped()) and tell any other threads running in the
618	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
619	 */
620
621	spin_lock(&ls->ls_recover_lock);
622	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
623	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
624	ls->ls_recover_seq++;
625	spin_unlock(&ls->ls_recover_lock);
626
627	/*
628	 * Let dlm_recv run again, now any normal messages will be saved on the
629	 * requestqueue for later.
630	 */
631
632	up_write(&ls->ls_recv_active);
633
634	/*
635	 * This in_recovery lock does two things:
636	 * 1) Keeps this function from returning until all threads are out
637	 *    of locking routines and locking is truly stopped.
638	 * 2) Keeps any new requests from being processed until it's unlocked
639	 *    when recovery is complete.
640	 */
641
642	if (new) {
643		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
644		wake_up_process(ls->ls_recoverd_task);
645		wait_event(ls->ls_recover_lock_wait,
646			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
647	}
648
649	/*
650	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
651	 * running) has noticed RECOVER_STOP above and quit processing the
652	 * previous recovery.
653	 */
654
655	dlm_recoverd_suspend(ls);
656
657	spin_lock(&ls->ls_recover_lock);
658	kfree(ls->ls_slots);
659	ls->ls_slots = NULL;
660	ls->ls_num_slots = 0;
661	ls->ls_slots_size = 0;
662	ls->ls_recover_status = 0;
663	spin_unlock(&ls->ls_recover_lock);
664
665	dlm_recoverd_resume(ls);
666
667	if (!ls->ls_recover_begin)
668		ls->ls_recover_begin = jiffies;
669
670	dlm_lsop_recover_prep(ls);
671	return 0;
672}
673
674int dlm_ls_start(struct dlm_ls *ls)
675{
676	struct dlm_recover *rv = NULL, *rv_old;
677	struct dlm_config_node *nodes;
678	int error, count;
679
680	rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
681	if (!rv)
682		return -ENOMEM;
683
684	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
685	if (error < 0)
686		goto fail;
687
688	spin_lock(&ls->ls_recover_lock);
689
690	/* the lockspace needs to be stopped before it can be started */
691
692	if (!dlm_locking_stopped(ls)) {
693		spin_unlock(&ls->ls_recover_lock);
694		log_error(ls, "start ignored: lockspace running");
695		error = -EINVAL;
696		goto fail;
697	}
698
699	rv->nodes = nodes;
700	rv->nodes_count = count;
701	rv->seq = ++ls->ls_recover_seq;
702	rv_old = ls->ls_recover_args;
703	ls->ls_recover_args = rv;
704	spin_unlock(&ls->ls_recover_lock);
705
706	if (rv_old) {
707		log_error(ls, "unused recovery %llx %d",
708			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
709		kfree(rv_old->nodes);
710		kfree(rv_old);
711	}
712
713	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
714	wake_up_process(ls->ls_recoverd_task);
715	return 0;
716
717 fail:
718	kfree(rv);
719	kfree(nodes);
720	return error;
721}
722
v3.5.6
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
  5**
  6**  This copyrighted material is made available to anyone wishing to use,
  7**  modify, copy, or redistribute it subject to the terms and conditions
  8**  of the GNU General Public License v.2.
  9**
 10*******************************************************************************
 11******************************************************************************/
 12
 13#include "dlm_internal.h"
 14#include "lockspace.h"
 15#include "member.h"
 16#include "recoverd.h"
 17#include "recover.h"
 18#include "rcom.h"
 19#include "config.h"
 20#include "lowcomms.h"
 21
 22int dlm_slots_version(struct dlm_header *h)
 23{
 24	if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
 25		return 0;
 26	return 1;
 27}
 28
 29void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
 30		   struct dlm_member *memb)
 31{
 32	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 33
 34	if (!dlm_slots_version(&rc->rc_header))
 35		return;
 36
 37	memb->slot = le16_to_cpu(rf->rf_our_slot);
 38	memb->generation = le32_to_cpu(rf->rf_generation);
 39}
 40
 41void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
 42{
 43	struct dlm_slot *slot;
 44	struct rcom_slot *ro;
 45	int i;
 46
 47	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 48
 49	/* ls_slots array is sparse, but not rcom_slots */
 50
 51	for (i = 0; i < ls->ls_slots_size; i++) {
 52		slot = &ls->ls_slots[i];
 53		if (!slot->nodeid)
 54			continue;
 55		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
 56		ro->ro_slot = cpu_to_le16(slot->slot);
 57		ro++;
 58	}
 59}
 60
 61#define SLOT_DEBUG_LINE 128
 62
 63static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
 64			    struct rcom_slot *ro0, struct dlm_slot *array,
 65			    int array_size)
 66{
 67	char line[SLOT_DEBUG_LINE];
 68	int len = SLOT_DEBUG_LINE - 1;
 69	int pos = 0;
 70	int ret, i;
 71
 72	if (!dlm_config.ci_log_debug)
 73		return;
 74
 75	memset(line, 0, sizeof(line));
 76
 77	if (array) {
 78		for (i = 0; i < array_size; i++) {
 79			if (!array[i].nodeid)
 80				continue;
 81
 82			ret = snprintf(line + pos, len - pos, " %d:%d",
 83				       array[i].slot, array[i].nodeid);
 84			if (ret >= len - pos)
 85				break;
 86			pos += ret;
 87		}
 88	} else if (ro0) {
 89		for (i = 0; i < num_slots; i++) {
 90			ret = snprintf(line + pos, len - pos, " %d:%d",
 91				       ro0[i].ro_slot, ro0[i].ro_nodeid);
 92			if (ret >= len - pos)
 93				break;
 94			pos += ret;
 95		}
 96	}
 97
 98	log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
 99}
100
101int dlm_slots_copy_in(struct dlm_ls *ls)
102{
103	struct dlm_member *memb;
104	struct dlm_rcom *rc = ls->ls_recover_buf;
105	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
106	struct rcom_slot *ro0, *ro;
107	int our_nodeid = dlm_our_nodeid();
108	int i, num_slots;
109	uint32_t gen;
110
111	if (!dlm_slots_version(&rc->rc_header))
112		return -1;
113
114	gen = le32_to_cpu(rf->rf_generation);
115	if (gen <= ls->ls_generation) {
116		log_error(ls, "dlm_slots_copy_in gen %u old %u",
117			  gen, ls->ls_generation);
118	}
119	ls->ls_generation = gen;
120
121	num_slots = le16_to_cpu(rf->rf_num_slots);
122	if (!num_slots)
123		return -1;
124
125	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
126
127	for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
128		ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
129		ro->ro_slot = le16_to_cpu(ro->ro_slot);
130	}
131
132	log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
133
134	list_for_each_entry(memb, &ls->ls_nodes, list) {
135		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
136			if (ro->ro_nodeid != memb->nodeid)
137				continue;
138			memb->slot = ro->ro_slot;
139			memb->slot_prev = memb->slot;
140			break;
141		}
142
143		if (memb->nodeid == our_nodeid) {
144			if (ls->ls_slot && ls->ls_slot != memb->slot) {
145				log_error(ls, "dlm_slots_copy_in our slot "
146					  "changed %d %d", ls->ls_slot,
147					  memb->slot);
148				return -1;
149			}
150
151			if (!ls->ls_slot)
152				ls->ls_slot = memb->slot;
153		}
154
155		if (!memb->slot) {
156			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
157				   memb->nodeid);
158			return -1;
159		}
160	}
161
162	return 0;
163}
164
165/* for any nodes that do not support slots, we will not have set memb->slot
166   in wait_status_all(), so memb->slot will remain -1, and we will not
167   assign slots or set ls_num_slots here */
168
169int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
170		     struct dlm_slot **slots_out, uint32_t *gen_out)
171{
172	struct dlm_member *memb;
173	struct dlm_slot *array;
174	int our_nodeid = dlm_our_nodeid();
175	int array_size, max_slots, i;
176	int need = 0;
177	int max = 0;
178	int num = 0;
179	uint32_t gen = 0;
180
181	/* our own memb struct will have slot -1 gen 0 */
182
183	list_for_each_entry(memb, &ls->ls_nodes, list) {
184		if (memb->nodeid == our_nodeid) {
185			memb->slot = ls->ls_slot;
186			memb->generation = ls->ls_generation;
187			break;
188		}
189	}
190
191	list_for_each_entry(memb, &ls->ls_nodes, list) {
192		if (memb->generation > gen)
193			gen = memb->generation;
194
195		/* node doesn't support slots */
196
197		if (memb->slot == -1)
198			return -1;
199
200		/* node needs a slot assigned */
201
202		if (!memb->slot)
203			need++;
204
205		/* node has a slot assigned */
206
207		num++;
208
209		if (!max || max < memb->slot)
210			max = memb->slot;
211
212		/* sanity check, once slot is assigned it shouldn't change */
213
214		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
215			log_error(ls, "nodeid %d slot changed %d %d",
216				  memb->nodeid, memb->slot_prev, memb->slot);
217			return -1;
218		}
219		memb->slot_prev = memb->slot;
220	}
221
222	array_size = max + need;
223
224	array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
225	if (!array)
226		return -ENOMEM;
227
228	num = 0;
229
230	/* fill in slots (offsets) that are used */
231
232	list_for_each_entry(memb, &ls->ls_nodes, list) {
233		if (!memb->slot)
234			continue;
235
236		if (memb->slot > array_size) {
237			log_error(ls, "invalid slot number %d", memb->slot);
238			kfree(array);
239			return -1;
240		}
241
242		array[memb->slot - 1].nodeid = memb->nodeid;
243		array[memb->slot - 1].slot = memb->slot;
244		num++;
245	}
246
247	/* assign new slots from unused offsets */
248
249	list_for_each_entry(memb, &ls->ls_nodes, list) {
250		if (memb->slot)
251			continue;
252
253		for (i = 0; i < array_size; i++) {
254			if (array[i].nodeid)
255				continue;
256
257			memb->slot = i + 1;
258			memb->slot_prev = memb->slot;
259			array[i].nodeid = memb->nodeid;
260			array[i].slot = memb->slot;
261			num++;
262
263			if (!ls->ls_slot && memb->nodeid == our_nodeid)
264				ls->ls_slot = memb->slot;
265			break;
266		}
267
268		if (!memb->slot) {
269			log_error(ls, "no free slot found");
270			kfree(array);
271			return -1;
272		}
273	}
274
275	gen++;
276
277	log_debug_slots(ls, gen, num, NULL, array, array_size);
278
279	max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
280		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
281
282	if (num > max_slots) {
283		log_error(ls, "num_slots %d exceeds max_slots %d",
284			  num, max_slots);
285		kfree(array);
286		return -1;
287	}
288
289	*gen_out = gen;
290	*slots_out = array;
291	*slots_size = array_size;
292	*num_slots = num;
293	return 0;
294}
295
296static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
297{
298	struct dlm_member *memb = NULL;
299	struct list_head *tmp;
300	struct list_head *newlist = &new->list;
301	struct list_head *head = &ls->ls_nodes;
302
303	list_for_each(tmp, head) {
304		memb = list_entry(tmp, struct dlm_member, list);
305		if (new->nodeid < memb->nodeid)
306			break;
307	}
308
309	if (!memb)
310		list_add_tail(newlist, head);
311	else {
312		/* FIXME: can use list macro here */
313		newlist->prev = tmp->prev;
314		newlist->next = tmp;
315		tmp->prev->next = newlist;
316		tmp->prev = newlist;
317	}
318}
319
320static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
321{
322	struct dlm_member *memb;
323	int error;
324
325	memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
326	if (!memb)
327		return -ENOMEM;
328
329	error = dlm_lowcomms_connect_node(node->nodeid);
330	if (error < 0) {
331		kfree(memb);
332		return error;
333	}
334
335	memb->nodeid = node->nodeid;
336	memb->weight = node->weight;
337	memb->comm_seq = node->comm_seq;
338	add_ordered_member(ls, memb);
339	ls->ls_num_nodes++;
340	return 0;
341}
342
343static struct dlm_member *find_memb(struct list_head *head, int nodeid)
344{
345	struct dlm_member *memb;
346
347	list_for_each_entry(memb, head, list) {
348		if (memb->nodeid == nodeid)
349			return memb;
350	}
351	return NULL;
352}
353
354int dlm_is_member(struct dlm_ls *ls, int nodeid)
355{
356	if (find_memb(&ls->ls_nodes, nodeid))
357		return 1;
358	return 0;
359}
360
361int dlm_is_removed(struct dlm_ls *ls, int nodeid)
362{
363	if (find_memb(&ls->ls_nodes_gone, nodeid))
364		return 1;
365	return 0;
366}
367
368static void clear_memb_list(struct list_head *head)
369{
370	struct dlm_member *memb;
371
372	while (!list_empty(head)) {
373		memb = list_entry(head->next, struct dlm_member, list);
374		list_del(&memb->list);
375		kfree(memb);
376	}
377}
378
379void dlm_clear_members(struct dlm_ls *ls)
380{
381	clear_memb_list(&ls->ls_nodes);
382	ls->ls_num_nodes = 0;
383}
384
385void dlm_clear_members_gone(struct dlm_ls *ls)
386{
387	clear_memb_list(&ls->ls_nodes_gone);
388}
389
390static void make_member_array(struct dlm_ls *ls)
391{
392	struct dlm_member *memb;
393	int i, w, x = 0, total = 0, all_zero = 0, *array;
394
395	kfree(ls->ls_node_array);
396	ls->ls_node_array = NULL;
397
398	list_for_each_entry(memb, &ls->ls_nodes, list) {
399		if (memb->weight)
400			total += memb->weight;
401	}
402
403	/* all nodes revert to weight of 1 if all have weight 0 */
404
405	if (!total) {
406		total = ls->ls_num_nodes;
407		all_zero = 1;
408	}
409
410	ls->ls_total_weight = total;
411
412	array = kmalloc(sizeof(int) * total, GFP_NOFS);
413	if (!array)
414		return;
415
416	list_for_each_entry(memb, &ls->ls_nodes, list) {
417		if (!all_zero && !memb->weight)
418			continue;
419
420		if (all_zero)
421			w = 1;
422		else
423			w = memb->weight;
424
425		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
426
427		for (i = 0; i < w; i++)
428			array[x++] = memb->nodeid;
429	}
430
431	ls->ls_node_array = array;
432}
433
434/* send a status request to all members just to establish comms connections */
435
436static int ping_members(struct dlm_ls *ls)
437{
438	struct dlm_member *memb;
439	int error = 0;
440
441	list_for_each_entry(memb, &ls->ls_nodes, list) {
442		error = dlm_recovery_stopped(ls);
443		if (error)
444			break;
445		error = dlm_rcom_status(ls, memb->nodeid, 0);
446		if (error)
447			break;
448	}
449	if (error)
450		log_debug(ls, "ping_members aborted %d last nodeid %d",
451			  error, ls->ls_recover_nodeid);
452	return error;
453}
454
455static void dlm_lsop_recover_prep(struct dlm_ls *ls)
456{
457	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
458		return;
459	ls->ls_ops->recover_prep(ls->ls_ops_arg);
460}
461
462static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
463{
464	struct dlm_slot slot;
465	uint32_t seq;
466	int error;
467
468	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
469		return;
470
471	/* if there is no comms connection with this node
472	   or the present comms connection is newer
473	   than the one when this member was added, then
474	   we consider the node to have failed (versus
475	   being removed due to dlm_release_lockspace) */
476
477	error = dlm_comm_seq(memb->nodeid, &seq);
478
479	if (!error && seq == memb->comm_seq)
480		return;
481
482	slot.nodeid = memb->nodeid;
483	slot.slot = memb->slot;
484
485	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
486}
487
488void dlm_lsop_recover_done(struct dlm_ls *ls)
489{
490	struct dlm_member *memb;
491	struct dlm_slot *slots;
492	int i, num;
493
494	if (!ls->ls_ops || !ls->ls_ops->recover_done)
495		return;
496
497	num = ls->ls_num_nodes;
498
499	slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
500	if (!slots)
501		return;
502
503	i = 0;
504	list_for_each_entry(memb, &ls->ls_nodes, list) {
505		if (i == num) {
506			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
507			goto out;
508		}
509		slots[i].nodeid = memb->nodeid;
510		slots[i].slot = memb->slot;
511		i++;
512	}
513
514	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
515				 ls->ls_slot, ls->ls_generation);
516 out:
517	kfree(slots);
518}
519
520static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
521						int nodeid)
522{
523	int i;
524
525	for (i = 0; i < rv->nodes_count; i++) {
526		if (rv->nodes[i].nodeid == nodeid)
527			return &rv->nodes[i];
528	}
529	return NULL;
530}
531
532int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
533{
534	struct dlm_member *memb, *safe;
535	struct dlm_config_node *node;
536	int i, error, neg = 0, low = -1;
537
538	/* previously removed members that we've not finished removing need to
539	   count as a negative change so the "neg" recovery steps will happen */
540
541	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
542		log_debug(ls, "prev removed member %d", memb->nodeid);
543		neg++;
544	}
545
546	/* move departed members from ls_nodes to ls_nodes_gone */
547
548	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
549		node = find_config_node(rv, memb->nodeid);
550		if (node && !node->new)
551			continue;
552
553		if (!node) {
554			log_debug(ls, "remove member %d", memb->nodeid);
555		} else {
556			/* removed and re-added */
557			log_debug(ls, "remove member %d comm_seq %u %u",
558				  memb->nodeid, memb->comm_seq, node->comm_seq);
559		}
560
561		neg++;
562		list_move(&memb->list, &ls->ls_nodes_gone);
563		ls->ls_num_nodes--;
564		dlm_lsop_recover_slot(ls, memb);
565	}
566
567	/* add new members to ls_nodes */
568
569	for (i = 0; i < rv->nodes_count; i++) {
570		node = &rv->nodes[i];
571		if (dlm_is_member(ls, node->nodeid))
572			continue;
573		dlm_add_member(ls, node);
574		log_debug(ls, "add member %d", node->nodeid);
575	}
576
577	list_for_each_entry(memb, &ls->ls_nodes, list) {
578		if (low == -1 || memb->nodeid < low)
579			low = memb->nodeid;
580	}
581	ls->ls_low_nodeid = low;
582
583	make_member_array(ls);
584	*neg_out = neg;
585
586	error = ping_members(ls);
587	if (!error || error == -EPROTO) {
588		/* new_lockspace() may be waiting to know if the config
589		   is good or bad */
590		ls->ls_members_result = error;
591		complete(&ls->ls_members_done);
592	}
593
594	log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
595	return error;
596}
597
598/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
599   dlm_ls_start() is called on any of them to start the new recovery. */
600
601int dlm_ls_stop(struct dlm_ls *ls)
602{
603	int new;
604
605	/*
606	 * Prevent dlm_recv from being in the middle of something when we do
607	 * the stop.  This includes ensuring dlm_recv isn't processing a
608	 * recovery message (rcom), while dlm_recoverd is aborting and
609	 * resetting things from an in-progress recovery.  i.e. we want
610	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
611	 * processing an rcom at the same time.  Stopping dlm_recv also makes
612	 * it easy for dlm_receive_message() to check locking stopped and add a
613	 * message to the requestqueue without races.
614	 */
615
616	down_write(&ls->ls_recv_active);
617
618	/*
619	 * Abort any recovery that's in progress (see RECOVERY_STOP,
620	 * dlm_recovery_stopped()) and tell any other threads running in the
621	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
622	 */
623
624	spin_lock(&ls->ls_recover_lock);
625	set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
626	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
627	ls->ls_recover_seq++;
628	spin_unlock(&ls->ls_recover_lock);
629
630	/*
631	 * Let dlm_recv run again, now any normal messages will be saved on the
632	 * requestqueue for later.
633	 */
634
635	up_write(&ls->ls_recv_active);
636
637	/*
638	 * This in_recovery lock does two things:
639	 * 1) Keeps this function from returning until all threads are out
640	 *    of locking routines and locking is truly stopped.
641	 * 2) Keeps any new requests from being processed until it's unlocked
642	 *    when recovery is complete.
643	 */
644
645	if (new)
646		down_write(&ls->ls_in_recovery);
 
 
 
 
647
648	/*
649	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
650	 * running) has noticed RECOVERY_STOP above and quit processing the
651	 * previous recovery.
652	 */
653
654	dlm_recoverd_suspend(ls);
655
656	spin_lock(&ls->ls_recover_lock);
657	kfree(ls->ls_slots);
658	ls->ls_slots = NULL;
659	ls->ls_num_slots = 0;
660	ls->ls_slots_size = 0;
661	ls->ls_recover_status = 0;
662	spin_unlock(&ls->ls_recover_lock);
663
664	dlm_recoverd_resume(ls);
665
666	if (!ls->ls_recover_begin)
667		ls->ls_recover_begin = jiffies;
668
669	dlm_lsop_recover_prep(ls);
670	return 0;
671}
672
673int dlm_ls_start(struct dlm_ls *ls)
674{
675	struct dlm_recover *rv = NULL, *rv_old;
676	struct dlm_config_node *nodes;
677	int error, count;
678
679	rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
680	if (!rv)
681		return -ENOMEM;
682
683	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
684	if (error < 0)
685		goto fail;
686
687	spin_lock(&ls->ls_recover_lock);
688
689	/* the lockspace needs to be stopped before it can be started */
690
691	if (!dlm_locking_stopped(ls)) {
692		spin_unlock(&ls->ls_recover_lock);
693		log_error(ls, "start ignored: lockspace running");
694		error = -EINVAL;
695		goto fail;
696	}
697
698	rv->nodes = nodes;
699	rv->nodes_count = count;
700	rv->seq = ++ls->ls_recover_seq;
701	rv_old = ls->ls_recover_args;
702	ls->ls_recover_args = rv;
703	spin_unlock(&ls->ls_recover_lock);
704
705	if (rv_old) {
706		log_error(ls, "unused recovery %llx %d",
707			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
708		kfree(rv_old->nodes);
709		kfree(rv_old);
710	}
711
712	dlm_recoverd_kick(ls);
 
713	return 0;
714
715 fail:
716	kfree(rv);
717	kfree(nodes);
718	return error;
719}
720