Linux Audio

Check our new training course

Loading...
v3.1
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) 2005-2009 Red Hat, Inc.  All rights reserved.
  5**
  6**  This copyrighted material is made available to anyone wishing to use,
  7**  modify, copy, or redistribute it subject to the terms and conditions
  8**  of the GNU General Public License v.2.
  9**
 10*******************************************************************************
 11******************************************************************************/
 12
 13#include "dlm_internal.h"
 14#include "lockspace.h"
 15#include "member.h"
 16#include "recoverd.h"
 17#include "recover.h"
 18#include "rcom.h"
 19#include "config.h"
 20#include "lowcomms.h"
 21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 22static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 23{
 24	struct dlm_member *memb = NULL;
 25	struct list_head *tmp;
 26	struct list_head *newlist = &new->list;
 27	struct list_head *head = &ls->ls_nodes;
 28
 29	list_for_each(tmp, head) {
 30		memb = list_entry(tmp, struct dlm_member, list);
 31		if (new->nodeid < memb->nodeid)
 32			break;
 33	}
 34
 35	if (!memb)
 36		list_add_tail(newlist, head);
 37	else {
 38		/* FIXME: can use list macro here */
 39		newlist->prev = tmp->prev;
 40		newlist->next = tmp;
 41		tmp->prev->next = newlist;
 42		tmp->prev = newlist;
 43	}
 44}
 45
 46static int dlm_add_member(struct dlm_ls *ls, int nodeid)
 47{
 48	struct dlm_member *memb;
 49	int w, error;
 50
 51	memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
 52	if (!memb)
 53		return -ENOMEM;
 54
 55	w = dlm_node_weight(ls->ls_name, nodeid);
 56	if (w < 0) {
 57		kfree(memb);
 58		return w;
 59	}
 60
 61	error = dlm_lowcomms_connect_node(nodeid);
 62	if (error < 0) {
 63		kfree(memb);
 64		return error;
 65	}
 66
 67	memb->nodeid = nodeid;
 68	memb->weight = w;
 
 69	add_ordered_member(ls, memb);
 70	ls->ls_num_nodes++;
 71	return 0;
 72}
 73
 74static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
 75{
 76	list_move(&memb->list, &ls->ls_nodes_gone);
 77	ls->ls_num_nodes--;
 78}
 79
 80int dlm_is_member(struct dlm_ls *ls, int nodeid)
 81{
 82	struct dlm_member *memb;
 83
 84	list_for_each_entry(memb, &ls->ls_nodes, list) {
 85		if (memb->nodeid == nodeid)
 86			return 1;
 87	}
 
 
 
 
 
 
 
 88	return 0;
 89}
 90
 91int dlm_is_removed(struct dlm_ls *ls, int nodeid)
 92{
 93	struct dlm_member *memb;
 94
 95	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
 96		if (memb->nodeid == nodeid)
 97			return 1;
 98	}
 99	return 0;
100}
101
102static void clear_memb_list(struct list_head *head)
103{
104	struct dlm_member *memb;
105
106	while (!list_empty(head)) {
107		memb = list_entry(head->next, struct dlm_member, list);
108		list_del(&memb->list);
109		kfree(memb);
110	}
111}
112
113void dlm_clear_members(struct dlm_ls *ls)
114{
115	clear_memb_list(&ls->ls_nodes);
116	ls->ls_num_nodes = 0;
117}
118
119void dlm_clear_members_gone(struct dlm_ls *ls)
120{
121	clear_memb_list(&ls->ls_nodes_gone);
122}
123
124static void make_member_array(struct dlm_ls *ls)
125{
126	struct dlm_member *memb;
127	int i, w, x = 0, total = 0, all_zero = 0, *array;
128
129	kfree(ls->ls_node_array);
130	ls->ls_node_array = NULL;
131
132	list_for_each_entry(memb, &ls->ls_nodes, list) {
133		if (memb->weight)
134			total += memb->weight;
135	}
136
137	/* all nodes revert to weight of 1 if all have weight 0 */
138
139	if (!total) {
140		total = ls->ls_num_nodes;
141		all_zero = 1;
142	}
143
144	ls->ls_total_weight = total;
145
146	array = kmalloc(sizeof(int) * total, GFP_NOFS);
147	if (!array)
148		return;
149
150	list_for_each_entry(memb, &ls->ls_nodes, list) {
151		if (!all_zero && !memb->weight)
152			continue;
153
154		if (all_zero)
155			w = 1;
156		else
157			w = memb->weight;
158
159		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
160
161		for (i = 0; i < w; i++)
162			array[x++] = memb->nodeid;
163	}
164
165	ls->ls_node_array = array;
166}
167
168/* send a status request to all members just to establish comms connections */
169
170static int ping_members(struct dlm_ls *ls)
171{
172	struct dlm_member *memb;
173	int error = 0;
174
175	list_for_each_entry(memb, &ls->ls_nodes, list) {
176		error = dlm_recovery_stopped(ls);
177		if (error)
178			break;
179		error = dlm_rcom_status(ls, memb->nodeid);
180		if (error)
181			break;
182	}
183	if (error)
184		log_debug(ls, "ping_members aborted %d last nodeid %d",
185			  error, ls->ls_recover_nodeid);
186	return error;
187}
188
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
189int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
190{
191	struct dlm_member *memb, *safe;
192	int i, error, found, pos = 0, neg = 0, low = -1;
 
193
194	/* previously removed members that we've not finished removing need to
195	   count as a negative change so the "neg" recovery steps will happen */
196
197	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
198		log_debug(ls, "prev removed member %d", memb->nodeid);
199		neg++;
200	}
201
202	/* move departed members from ls_nodes to ls_nodes_gone */
203
204	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
205		found = 0;
206		for (i = 0; i < rv->node_count; i++) {
207			if (memb->nodeid == rv->nodeids[i]) {
208				found = 1;
209				break;
210			}
211		}
212
213		if (!found) {
214			neg++;
215			dlm_remove_member(ls, memb);
216			log_debug(ls, "remove member %d", memb->nodeid);
 
 
217		}
218	}
219
220	/* Add an entry to ls_nodes_gone for members that were removed and
221	   then added again, so that previous state for these nodes will be
222	   cleared during recovery. */
223
224	for (i = 0; i < rv->new_count; i++) {
225		if (!dlm_is_member(ls, rv->new[i]))
226			continue;
227		log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
228
229		memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
230		if (!memb)
231			return -ENOMEM;
232		memb->nodeid = rv->new[i];
233		list_add_tail(&memb->list, &ls->ls_nodes_gone);
234		neg++;
 
 
 
235	}
236
237	/* add new members to ls_nodes */
238
239	for (i = 0; i < rv->node_count; i++) {
240		if (dlm_is_member(ls, rv->nodeids[i]))
 
241			continue;
242		dlm_add_member(ls, rv->nodeids[i]);
243		pos++;
244		log_debug(ls, "add member %d", rv->nodeids[i]);
245	}
246
247	list_for_each_entry(memb, &ls->ls_nodes, list) {
248		if (low == -1 || memb->nodeid < low)
249			low = memb->nodeid;
250	}
251	ls->ls_low_nodeid = low;
252
253	make_member_array(ls);
254	dlm_set_recover_status(ls, DLM_RS_NODES);
255	*neg_out = neg;
256
257	error = ping_members(ls);
258	if (!error || error == -EPROTO) {
259		/* new_lockspace() may be waiting to know if the config
260		   is good or bad */
261		ls->ls_members_result = error;
262		complete(&ls->ls_members_done);
263	}
264	if (error)
265		goto out;
266
267	error = dlm_recover_members_wait(ls);
268 out:
269	log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
270	return error;
271}
272
273/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
274   dlm_ls_start() is called on any of them to start the new recovery. */
275
276int dlm_ls_stop(struct dlm_ls *ls)
277{
278	int new;
279
280	/*
281	 * Prevent dlm_recv from being in the middle of something when we do
282	 * the stop.  This includes ensuring dlm_recv isn't processing a
283	 * recovery message (rcom), while dlm_recoverd is aborting and
284	 * resetting things from an in-progress recovery.  i.e. we want
285	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
286	 * processing an rcom at the same time.  Stopping dlm_recv also makes
287	 * it easy for dlm_receive_message() to check locking stopped and add a
288	 * message to the requestqueue without races.
289	 */
290
291	down_write(&ls->ls_recv_active);
292
293	/*
294	 * Abort any recovery that's in progress (see RECOVERY_STOP,
295	 * dlm_recovery_stopped()) and tell any other threads running in the
296	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
297	 */
298
299	spin_lock(&ls->ls_recover_lock);
300	set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
301	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
302	ls->ls_recover_seq++;
303	spin_unlock(&ls->ls_recover_lock);
304
305	/*
306	 * Let dlm_recv run again, now any normal messages will be saved on the
307	 * requestqueue for later.
308	 */
309
310	up_write(&ls->ls_recv_active);
311
312	/*
313	 * This in_recovery lock does two things:
314	 * 1) Keeps this function from returning until all threads are out
315	 *    of locking routines and locking is truly stopped.
316	 * 2) Keeps any new requests from being processed until it's unlocked
317	 *    when recovery is complete.
318	 */
319
320	if (new)
321		down_write(&ls->ls_in_recovery);
 
 
 
 
322
323	/*
324	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
325	 * running) has noticed RECOVERY_STOP above and quit processing the
326	 * previous recovery.
327	 */
328
329	dlm_recoverd_suspend(ls);
 
 
 
 
 
 
330	ls->ls_recover_status = 0;
 
 
331	dlm_recoverd_resume(ls);
332
333	if (!ls->ls_recover_begin)
334		ls->ls_recover_begin = jiffies;
 
 
335	return 0;
336}
337
338int dlm_ls_start(struct dlm_ls *ls)
339{
340	struct dlm_recover *rv = NULL, *rv_old;
341	int *ids = NULL, *new = NULL;
342	int error, ids_count = 0, new_count = 0;
343
344	rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
345	if (!rv)
346		return -ENOMEM;
347
348	error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
349				&new, &new_count);
350	if (error < 0)
351		goto fail;
352
353	spin_lock(&ls->ls_recover_lock);
354
355	/* the lockspace needs to be stopped before it can be started */
356
357	if (!dlm_locking_stopped(ls)) {
358		spin_unlock(&ls->ls_recover_lock);
359		log_error(ls, "start ignored: lockspace running");
360		error = -EINVAL;
361		goto fail;
362	}
363
364	rv->nodeids = ids;
365	rv->node_count = ids_count;
366	rv->new = new;
367	rv->new_count = new_count;
368	rv->seq = ++ls->ls_recover_seq;
369	rv_old = ls->ls_recover_args;
370	ls->ls_recover_args = rv;
371	spin_unlock(&ls->ls_recover_lock);
372
373	if (rv_old) {
374		log_error(ls, "unused recovery %llx %d",
375			  (unsigned long long)rv_old->seq, rv_old->node_count);
376		kfree(rv_old->nodeids);
377		kfree(rv_old->new);
378		kfree(rv_old);
379	}
380
381	dlm_recoverd_kick(ls);
 
382	return 0;
383
384 fail:
385	kfree(rv);
386	kfree(ids);
387	kfree(new);
388	return error;
389}
390
v4.17
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
  5**
  6**  This copyrighted material is made available to anyone wishing to use,
  7**  modify, copy, or redistribute it subject to the terms and conditions
  8**  of the GNU General Public License v.2.
  9**
 10*******************************************************************************
 11******************************************************************************/
 12
 13#include "dlm_internal.h"
 14#include "lockspace.h"
 15#include "member.h"
 16#include "recoverd.h"
 17#include "recover.h"
 18#include "rcom.h"
 19#include "config.h"
 20#include "lowcomms.h"
 21
 22int dlm_slots_version(struct dlm_header *h)
 23{
 24	if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
 25		return 0;
 26	return 1;
 27}
 28
 29void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
 30		   struct dlm_member *memb)
 31{
 32	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 33
 34	if (!dlm_slots_version(&rc->rc_header))
 35		return;
 36
 37	memb->slot = le16_to_cpu(rf->rf_our_slot);
 38	memb->generation = le32_to_cpu(rf->rf_generation);
 39}
 40
 41void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
 42{
 43	struct dlm_slot *slot;
 44	struct rcom_slot *ro;
 45	int i;
 46
 47	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 48
 49	/* ls_slots array is sparse, but not rcom_slots */
 50
 51	for (i = 0; i < ls->ls_slots_size; i++) {
 52		slot = &ls->ls_slots[i];
 53		if (!slot->nodeid)
 54			continue;
 55		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
 56		ro->ro_slot = cpu_to_le16(slot->slot);
 57		ro++;
 58	}
 59}
 60
 61#define SLOT_DEBUG_LINE 128
 62
 63static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
 64		      struct rcom_slot *ro0, struct dlm_slot *array,
 65		      int array_size)
 66{
 67	char line[SLOT_DEBUG_LINE];
 68	int len = SLOT_DEBUG_LINE - 1;
 69	int pos = 0;
 70	int ret, i;
 71
 72	memset(line, 0, sizeof(line));
 73
 74	if (array) {
 75		for (i = 0; i < array_size; i++) {
 76			if (!array[i].nodeid)
 77				continue;
 78
 79			ret = snprintf(line + pos, len - pos, " %d:%d",
 80				       array[i].slot, array[i].nodeid);
 81			if (ret >= len - pos)
 82				break;
 83			pos += ret;
 84		}
 85	} else if (ro0) {
 86		for (i = 0; i < num_slots; i++) {
 87			ret = snprintf(line + pos, len - pos, " %d:%d",
 88				       ro0[i].ro_slot, ro0[i].ro_nodeid);
 89			if (ret >= len - pos)
 90				break;
 91			pos += ret;
 92		}
 93	}
 94
 95	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
 96}
 97
 98int dlm_slots_copy_in(struct dlm_ls *ls)
 99{
100	struct dlm_member *memb;
101	struct dlm_rcom *rc = ls->ls_recover_buf;
102	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
103	struct rcom_slot *ro0, *ro;
104	int our_nodeid = dlm_our_nodeid();
105	int i, num_slots;
106	uint32_t gen;
107
108	if (!dlm_slots_version(&rc->rc_header))
109		return -1;
110
111	gen = le32_to_cpu(rf->rf_generation);
112	if (gen <= ls->ls_generation) {
113		log_error(ls, "dlm_slots_copy_in gen %u old %u",
114			  gen, ls->ls_generation);
115	}
116	ls->ls_generation = gen;
117
118	num_slots = le16_to_cpu(rf->rf_num_slots);
119	if (!num_slots)
120		return -1;
121
122	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
123
124	for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
125		ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
126		ro->ro_slot = le16_to_cpu(ro->ro_slot);
127	}
128
129	log_slots(ls, gen, num_slots, ro0, NULL, 0);
130
131	list_for_each_entry(memb, &ls->ls_nodes, list) {
132		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
133			if (ro->ro_nodeid != memb->nodeid)
134				continue;
135			memb->slot = ro->ro_slot;
136			memb->slot_prev = memb->slot;
137			break;
138		}
139
140		if (memb->nodeid == our_nodeid) {
141			if (ls->ls_slot && ls->ls_slot != memb->slot) {
142				log_error(ls, "dlm_slots_copy_in our slot "
143					  "changed %d %d", ls->ls_slot,
144					  memb->slot);
145				return -1;
146			}
147
148			if (!ls->ls_slot)
149				ls->ls_slot = memb->slot;
150		}
151
152		if (!memb->slot) {
153			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
154				   memb->nodeid);
155			return -1;
156		}
157	}
158
159	return 0;
160}
161
162/* for any nodes that do not support slots, we will not have set memb->slot
163   in wait_status_all(), so memb->slot will remain -1, and we will not
164   assign slots or set ls_num_slots here */
165
166int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
167		     struct dlm_slot **slots_out, uint32_t *gen_out)
168{
169	struct dlm_member *memb;
170	struct dlm_slot *array;
171	int our_nodeid = dlm_our_nodeid();
172	int array_size, max_slots, i;
173	int need = 0;
174	int max = 0;
175	int num = 0;
176	uint32_t gen = 0;
177
178	/* our own memb struct will have slot -1 gen 0 */
179
180	list_for_each_entry(memb, &ls->ls_nodes, list) {
181		if (memb->nodeid == our_nodeid) {
182			memb->slot = ls->ls_slot;
183			memb->generation = ls->ls_generation;
184			break;
185		}
186	}
187
188	list_for_each_entry(memb, &ls->ls_nodes, list) {
189		if (memb->generation > gen)
190			gen = memb->generation;
191
192		/* node doesn't support slots */
193
194		if (memb->slot == -1)
195			return -1;
196
197		/* node needs a slot assigned */
198
199		if (!memb->slot)
200			need++;
201
202		/* node has a slot assigned */
203
204		num++;
205
206		if (!max || max < memb->slot)
207			max = memb->slot;
208
209		/* sanity check, once slot is assigned it shouldn't change */
210
211		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
212			log_error(ls, "nodeid %d slot changed %d %d",
213				  memb->nodeid, memb->slot_prev, memb->slot);
214			return -1;
215		}
216		memb->slot_prev = memb->slot;
217	}
218
219	array_size = max + need;
220	array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
221	if (!array)
222		return -ENOMEM;
223
224	num = 0;
225
226	/* fill in slots (offsets) that are used */
227
228	list_for_each_entry(memb, &ls->ls_nodes, list) {
229		if (!memb->slot)
230			continue;
231
232		if (memb->slot > array_size) {
233			log_error(ls, "invalid slot number %d", memb->slot);
234			kfree(array);
235			return -1;
236		}
237
238		array[memb->slot - 1].nodeid = memb->nodeid;
239		array[memb->slot - 1].slot = memb->slot;
240		num++;
241	}
242
243	/* assign new slots from unused offsets */
244
245	list_for_each_entry(memb, &ls->ls_nodes, list) {
246		if (memb->slot)
247			continue;
248
249		for (i = 0; i < array_size; i++) {
250			if (array[i].nodeid)
251				continue;
252
253			memb->slot = i + 1;
254			memb->slot_prev = memb->slot;
255			array[i].nodeid = memb->nodeid;
256			array[i].slot = memb->slot;
257			num++;
258
259			if (!ls->ls_slot && memb->nodeid == our_nodeid)
260				ls->ls_slot = memb->slot;
261			break;
262		}
263
264		if (!memb->slot) {
265			log_error(ls, "no free slot found");
266			kfree(array);
267			return -1;
268		}
269	}
270
271	gen++;
272
273	log_slots(ls, gen, num, NULL, array, array_size);
274
275	max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
276		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
277
278	if (num > max_slots) {
279		log_error(ls, "num_slots %d exceeds max_slots %d",
280			  num, max_slots);
281		kfree(array);
282		return -1;
283	}
284
285	*gen_out = gen;
286	*slots_out = array;
287	*slots_size = array_size;
288	*num_slots = num;
289	return 0;
290}
291
292static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
293{
294	struct dlm_member *memb = NULL;
295	struct list_head *tmp;
296	struct list_head *newlist = &new->list;
297	struct list_head *head = &ls->ls_nodes;
298
299	list_for_each(tmp, head) {
300		memb = list_entry(tmp, struct dlm_member, list);
301		if (new->nodeid < memb->nodeid)
302			break;
303	}
304
305	if (!memb)
306		list_add_tail(newlist, head);
307	else {
308		/* FIXME: can use list macro here */
309		newlist->prev = tmp->prev;
310		newlist->next = tmp;
311		tmp->prev->next = newlist;
312		tmp->prev = newlist;
313	}
314}
315
316static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
317{
318	struct dlm_member *memb;
319	int error;
320
321	memb = kzalloc(sizeof(*memb), GFP_NOFS);
322	if (!memb)
323		return -ENOMEM;
324
325	error = dlm_lowcomms_connect_node(node->nodeid);
 
 
 
 
 
 
326	if (error < 0) {
327		kfree(memb);
328		return error;
329	}
330
331	memb->nodeid = node->nodeid;
332	memb->weight = node->weight;
333	memb->comm_seq = node->comm_seq;
334	add_ordered_member(ls, memb);
335	ls->ls_num_nodes++;
336	return 0;
337}
338
339static struct dlm_member *find_memb(struct list_head *head, int nodeid)
 
 
 
 
 
 
340{
341	struct dlm_member *memb;
342
343	list_for_each_entry(memb, head, list) {
344		if (memb->nodeid == nodeid)
345			return memb;
346	}
347	return NULL;
348}
349
350int dlm_is_member(struct dlm_ls *ls, int nodeid)
351{
352	if (find_memb(&ls->ls_nodes, nodeid))
353		return 1;
354	return 0;
355}
356
357int dlm_is_removed(struct dlm_ls *ls, int nodeid)
358{
359	if (find_memb(&ls->ls_nodes_gone, nodeid))
360		return 1;
 
 
 
 
361	return 0;
362}
363
364static void clear_memb_list(struct list_head *head)
365{
366	struct dlm_member *memb;
367
368	while (!list_empty(head)) {
369		memb = list_entry(head->next, struct dlm_member, list);
370		list_del(&memb->list);
371		kfree(memb);
372	}
373}
374
375void dlm_clear_members(struct dlm_ls *ls)
376{
377	clear_memb_list(&ls->ls_nodes);
378	ls->ls_num_nodes = 0;
379}
380
381void dlm_clear_members_gone(struct dlm_ls *ls)
382{
383	clear_memb_list(&ls->ls_nodes_gone);
384}
385
386static void make_member_array(struct dlm_ls *ls)
387{
388	struct dlm_member *memb;
389	int i, w, x = 0, total = 0, all_zero = 0, *array;
390
391	kfree(ls->ls_node_array);
392	ls->ls_node_array = NULL;
393
394	list_for_each_entry(memb, &ls->ls_nodes, list) {
395		if (memb->weight)
396			total += memb->weight;
397	}
398
399	/* all nodes revert to weight of 1 if all have weight 0 */
400
401	if (!total) {
402		total = ls->ls_num_nodes;
403		all_zero = 1;
404	}
405
406	ls->ls_total_weight = total;
407	array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
 
408	if (!array)
409		return;
410
411	list_for_each_entry(memb, &ls->ls_nodes, list) {
412		if (!all_zero && !memb->weight)
413			continue;
414
415		if (all_zero)
416			w = 1;
417		else
418			w = memb->weight;
419
420		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
421
422		for (i = 0; i < w; i++)
423			array[x++] = memb->nodeid;
424	}
425
426	ls->ls_node_array = array;
427}
428
429/* send a status request to all members just to establish comms connections */
430
431static int ping_members(struct dlm_ls *ls)
432{
433	struct dlm_member *memb;
434	int error = 0;
435
436	list_for_each_entry(memb, &ls->ls_nodes, list) {
437		error = dlm_recovery_stopped(ls);
438		if (error)
439			break;
440		error = dlm_rcom_status(ls, memb->nodeid, 0);
441		if (error)
442			break;
443	}
444	if (error)
445		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
446			  error, ls->ls_recover_nodeid);
447	return error;
448}
449
450static void dlm_lsop_recover_prep(struct dlm_ls *ls)
451{
452	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
453		return;
454	ls->ls_ops->recover_prep(ls->ls_ops_arg);
455}
456
457static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
458{
459	struct dlm_slot slot;
460	uint32_t seq;
461	int error;
462
463	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
464		return;
465
466	/* if there is no comms connection with this node
467	   or the present comms connection is newer
468	   than the one when this member was added, then
469	   we consider the node to have failed (versus
470	   being removed due to dlm_release_lockspace) */
471
472	error = dlm_comm_seq(memb->nodeid, &seq);
473
474	if (!error && seq == memb->comm_seq)
475		return;
476
477	slot.nodeid = memb->nodeid;
478	slot.slot = memb->slot;
479
480	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
481}
482
483void dlm_lsop_recover_done(struct dlm_ls *ls)
484{
485	struct dlm_member *memb;
486	struct dlm_slot *slots;
487	int i, num;
488
489	if (!ls->ls_ops || !ls->ls_ops->recover_done)
490		return;
491
492	num = ls->ls_num_nodes;
493	slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
494	if (!slots)
495		return;
496
497	i = 0;
498	list_for_each_entry(memb, &ls->ls_nodes, list) {
499		if (i == num) {
500			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
501			goto out;
502		}
503		slots[i].nodeid = memb->nodeid;
504		slots[i].slot = memb->slot;
505		i++;
506	}
507
508	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
509				 ls->ls_slot, ls->ls_generation);
510 out:
511	kfree(slots);
512}
513
514static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
515						int nodeid)
516{
517	int i;
518
519	for (i = 0; i < rv->nodes_count; i++) {
520		if (rv->nodes[i].nodeid == nodeid)
521			return &rv->nodes[i];
522	}
523	return NULL;
524}
525
526int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
527{
528	struct dlm_member *memb, *safe;
529	struct dlm_config_node *node;
530	int i, error, neg = 0, low = -1;
531
532	/* previously removed members that we've not finished removing need to
533	   count as a negative change so the "neg" recovery steps will happen */
534
535	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
536		log_rinfo(ls, "prev removed member %d", memb->nodeid);
537		neg++;
538	}
539
540	/* move departed members from ls_nodes to ls_nodes_gone */
541
542	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
543		node = find_config_node(rv, memb->nodeid);
544		if (node && !node->new)
545			continue;
 
 
 
 
546
547		if (!node) {
548			log_rinfo(ls, "remove member %d", memb->nodeid);
549		} else {
550			/* removed and re-added */
551			log_rinfo(ls, "remove member %d comm_seq %u %u",
552				  memb->nodeid, memb->comm_seq, node->comm_seq);
553		}
 
554
 
 
 
 
 
 
 
 
 
 
 
 
 
 
555		neg++;
556		list_move(&memb->list, &ls->ls_nodes_gone);
557		ls->ls_num_nodes--;
558		dlm_lsop_recover_slot(ls, memb);
559	}
560
561	/* add new members to ls_nodes */
562
563	for (i = 0; i < rv->nodes_count; i++) {
564		node = &rv->nodes[i];
565		if (dlm_is_member(ls, node->nodeid))
566			continue;
567		dlm_add_member(ls, node);
568		log_rinfo(ls, "add member %d", node->nodeid);
 
569	}
570
571	list_for_each_entry(memb, &ls->ls_nodes, list) {
572		if (low == -1 || memb->nodeid < low)
573			low = memb->nodeid;
574	}
575	ls->ls_low_nodeid = low;
576
577	make_member_array(ls);
 
578	*neg_out = neg;
579
580	error = ping_members(ls);
581	if (!error || error == -EPROTO) {
582		/* new_lockspace() may be waiting to know if the config
583		   is good or bad */
584		ls->ls_members_result = error;
585		complete(&ls->ls_members_done);
586	}
 
 
587
588	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
 
 
589	return error;
590}
591
592/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
593   dlm_ls_start() is called on any of them to start the new recovery. */
594
595int dlm_ls_stop(struct dlm_ls *ls)
596{
597	int new;
598
599	/*
600	 * Prevent dlm_recv from being in the middle of something when we do
601	 * the stop.  This includes ensuring dlm_recv isn't processing a
602	 * recovery message (rcom), while dlm_recoverd is aborting and
603	 * resetting things from an in-progress recovery.  i.e. we want
604	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
605	 * processing an rcom at the same time.  Stopping dlm_recv also makes
606	 * it easy for dlm_receive_message() to check locking stopped and add a
607	 * message to the requestqueue without races.
608	 */
609
610	down_write(&ls->ls_recv_active);
611
612	/*
613	 * Abort any recovery that's in progress (see RECOVER_STOP,
614	 * dlm_recovery_stopped()) and tell any other threads running in the
615	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
616	 */
617
618	spin_lock(&ls->ls_recover_lock);
619	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
620	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
621	ls->ls_recover_seq++;
622	spin_unlock(&ls->ls_recover_lock);
623
624	/*
625	 * Let dlm_recv run again, now any normal messages will be saved on the
626	 * requestqueue for later.
627	 */
628
629	up_write(&ls->ls_recv_active);
630
631	/*
632	 * This in_recovery lock does two things:
633	 * 1) Keeps this function from returning until all threads are out
634	 *    of locking routines and locking is truly stopped.
635	 * 2) Keeps any new requests from being processed until it's unlocked
636	 *    when recovery is complete.
637	 */
638
639	if (new) {
640		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
641		wake_up_process(ls->ls_recoverd_task);
642		wait_event(ls->ls_recover_lock_wait,
643			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
644	}
645
646	/*
647	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
648	 * running) has noticed RECOVER_STOP above and quit processing the
649	 * previous recovery.
650	 */
651
652	dlm_recoverd_suspend(ls);
653
654	spin_lock(&ls->ls_recover_lock);
655	kfree(ls->ls_slots);
656	ls->ls_slots = NULL;
657	ls->ls_num_slots = 0;
658	ls->ls_slots_size = 0;
659	ls->ls_recover_status = 0;
660	spin_unlock(&ls->ls_recover_lock);
661
662	dlm_recoverd_resume(ls);
663
664	if (!ls->ls_recover_begin)
665		ls->ls_recover_begin = jiffies;
666
667	dlm_lsop_recover_prep(ls);
668	return 0;
669}
670
671int dlm_ls_start(struct dlm_ls *ls)
672{
673	struct dlm_recover *rv, *rv_old;
674	struct dlm_config_node *nodes;
675	int error, count;
676
677	rv = kzalloc(sizeof(*rv), GFP_NOFS);
678	if (!rv)
679		return -ENOMEM;
680
681	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
 
682	if (error < 0)
683		goto fail;
684
685	spin_lock(&ls->ls_recover_lock);
686
687	/* the lockspace needs to be stopped before it can be started */
688
689	if (!dlm_locking_stopped(ls)) {
690		spin_unlock(&ls->ls_recover_lock);
691		log_error(ls, "start ignored: lockspace running");
692		error = -EINVAL;
693		goto fail;
694	}
695
696	rv->nodes = nodes;
697	rv->nodes_count = count;
 
 
698	rv->seq = ++ls->ls_recover_seq;
699	rv_old = ls->ls_recover_args;
700	ls->ls_recover_args = rv;
701	spin_unlock(&ls->ls_recover_lock);
702
703	if (rv_old) {
704		log_error(ls, "unused recovery %llx %d",
705			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
706		kfree(rv_old->nodes);
 
707		kfree(rv_old);
708	}
709
710	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
711	wake_up_process(ls->ls_recoverd_task);
712	return 0;
713
714 fail:
715	kfree(rv);
716	kfree(nodes);
 
717	return error;
718}
719