Linux Audio

Check our new training course

Loading...
v3.1
 
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) 2005-2009 Red Hat, Inc.  All rights reserved.
  5**
  6**  This copyrighted material is made available to anyone wishing to use,
  7**  modify, copy, or redistribute it subject to the terms and conditions
  8**  of the GNU General Public License v.2.
  9**
 10*******************************************************************************
 11******************************************************************************/
 12
 13#include "dlm_internal.h"
 14#include "lockspace.h"
 15#include "member.h"
 16#include "recoverd.h"
 17#include "recover.h"
 18#include "rcom.h"
 19#include "config.h"
 
 20#include "lowcomms.h"
 21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 22static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 23{
 24	struct dlm_member *memb = NULL;
 25	struct list_head *tmp;
 26	struct list_head *newlist = &new->list;
 27	struct list_head *head = &ls->ls_nodes;
 28
 29	list_for_each(tmp, head) {
 30		memb = list_entry(tmp, struct dlm_member, list);
 31		if (new->nodeid < memb->nodeid)
 32			break;
 33	}
 34
 35	if (!memb)
 36		list_add_tail(newlist, head);
 37	else {
 38		/* FIXME: can use list macro here */
 39		newlist->prev = tmp->prev;
 40		newlist->next = tmp;
 41		tmp->prev->next = newlist;
 42		tmp->prev = newlist;
 43	}
 44}
 45
 46static int dlm_add_member(struct dlm_ls *ls, int nodeid)
 47{
 48	struct dlm_member *memb;
 49	int w, error;
 50
 51	memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
 52	if (!memb)
 53		return -ENOMEM;
 54
 55	w = dlm_node_weight(ls->ls_name, nodeid);
 56	if (w < 0) {
 57		kfree(memb);
 58		return w;
 59	}
 60
 61	error = dlm_lowcomms_connect_node(nodeid);
 62	if (error < 0) {
 63		kfree(memb);
 64		return error;
 65	}
 66
 67	memb->nodeid = nodeid;
 68	memb->weight = w;
 
 
 69	add_ordered_member(ls, memb);
 70	ls->ls_num_nodes++;
 71	return 0;
 72}
 73
 74static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
 75{
 76	list_move(&memb->list, &ls->ls_nodes_gone);
 77	ls->ls_num_nodes--;
 78}
 79
 80int dlm_is_member(struct dlm_ls *ls, int nodeid)
 81{
 82	struct dlm_member *memb;
 83
 84	list_for_each_entry(memb, &ls->ls_nodes, list) {
 85		if (memb->nodeid == nodeid)
 86			return 1;
 87	}
 
 
 
 
 
 
 
 88	return 0;
 89}
 90
 91int dlm_is_removed(struct dlm_ls *ls, int nodeid)
 92{
 93	struct dlm_member *memb;
 94
 95	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
 96		if (memb->nodeid == nodeid)
 97			return 1;
 98	}
 99	return 0;
100}
101
102static void clear_memb_list(struct list_head *head)
 
103{
104	struct dlm_member *memb;
105
106	while (!list_empty(head)) {
107		memb = list_entry(head->next, struct dlm_member, list);
108		list_del(&memb->list);
 
 
109		kfree(memb);
110	}
111}
112
 
 
 
 
 
113void dlm_clear_members(struct dlm_ls *ls)
114{
115	clear_memb_list(&ls->ls_nodes);
116	ls->ls_num_nodes = 0;
117}
118
119void dlm_clear_members_gone(struct dlm_ls *ls)
120{
121	clear_memb_list(&ls->ls_nodes_gone);
122}
123
124static void make_member_array(struct dlm_ls *ls)
125{
126	struct dlm_member *memb;
127	int i, w, x = 0, total = 0, all_zero = 0, *array;
128
129	kfree(ls->ls_node_array);
130	ls->ls_node_array = NULL;
131
132	list_for_each_entry(memb, &ls->ls_nodes, list) {
133		if (memb->weight)
134			total += memb->weight;
135	}
136
137	/* all nodes revert to weight of 1 if all have weight 0 */
138
139	if (!total) {
140		total = ls->ls_num_nodes;
141		all_zero = 1;
142	}
143
144	ls->ls_total_weight = total;
145
146	array = kmalloc(sizeof(int) * total, GFP_NOFS);
147	if (!array)
148		return;
149
150	list_for_each_entry(memb, &ls->ls_nodes, list) {
151		if (!all_zero && !memb->weight)
152			continue;
153
154		if (all_zero)
155			w = 1;
156		else
157			w = memb->weight;
158
159		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
160
161		for (i = 0; i < w; i++)
162			array[x++] = memb->nodeid;
163	}
164
165	ls->ls_node_array = array;
166}
167
168/* send a status request to all members just to establish comms connections */
169
170static int ping_members(struct dlm_ls *ls)
171{
172	struct dlm_member *memb;
173	int error = 0;
174
175	list_for_each_entry(memb, &ls->ls_nodes, list) {
176		error = dlm_recovery_stopped(ls);
177		if (error)
178			break;
179		error = dlm_rcom_status(ls, memb->nodeid);
 
180		if (error)
181			break;
182	}
183	if (error)
184		log_debug(ls, "ping_members aborted %d last nodeid %d",
185			  error, ls->ls_recover_nodeid);
186	return error;
187}
188
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
189int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
190{
191	struct dlm_member *memb, *safe;
192	int i, error, found, pos = 0, neg = 0, low = -1;
 
193
194	/* previously removed members that we've not finished removing need to
195	   count as a negative change so the "neg" recovery steps will happen */
 
 
 
 
196
197	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
198		log_debug(ls, "prev removed member %d", memb->nodeid);
199		neg++;
200	}
201
202	/* move departed members from ls_nodes to ls_nodes_gone */
203
204	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
205		found = 0;
206		for (i = 0; i < rv->node_count; i++) {
207			if (memb->nodeid == rv->nodeids[i]) {
208				found = 1;
209				break;
210			}
211		}
212
213		if (!found) {
214			neg++;
215			dlm_remove_member(ls, memb);
216			log_debug(ls, "remove member %d", memb->nodeid);
 
 
217		}
218	}
219
220	/* Add an entry to ls_nodes_gone for members that were removed and
221	   then added again, so that previous state for these nodes will be
222	   cleared during recovery. */
223
224	for (i = 0; i < rv->new_count; i++) {
225		if (!dlm_is_member(ls, rv->new[i]))
226			continue;
227		log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
228
229		memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
230		if (!memb)
231			return -ENOMEM;
232		memb->nodeid = rv->new[i];
233		list_add_tail(&memb->list, &ls->ls_nodes_gone);
234		neg++;
 
 
 
 
235	}
236
237	/* add new members to ls_nodes */
238
239	for (i = 0; i < rv->node_count; i++) {
240		if (dlm_is_member(ls, rv->nodeids[i]))
 
241			continue;
242		dlm_add_member(ls, rv->nodeids[i]);
243		pos++;
244		log_debug(ls, "add member %d", rv->nodeids[i]);
 
 
245	}
246
247	list_for_each_entry(memb, &ls->ls_nodes, list) {
248		if (low == -1 || memb->nodeid < low)
249			low = memb->nodeid;
250	}
251	ls->ls_low_nodeid = low;
252
253	make_member_array(ls);
254	dlm_set_recover_status(ls, DLM_RS_NODES);
255	*neg_out = neg;
256
257	error = ping_members(ls);
258	if (!error || error == -EPROTO) {
259		/* new_lockspace() may be waiting to know if the config
260		   is good or bad */
261		ls->ls_members_result = error;
262		complete(&ls->ls_members_done);
263	}
264	if (error)
265		goto out;
266
267	error = dlm_recover_members_wait(ls);
268 out:
269	log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
270	return error;
271}
272
273/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
274   dlm_ls_start() is called on any of them to start the new recovery. */
275
276int dlm_ls_stop(struct dlm_ls *ls)
277{
278	int new;
279
280	/*
281	 * Prevent dlm_recv from being in the middle of something when we do
282	 * the stop.  This includes ensuring dlm_recv isn't processing a
283	 * recovery message (rcom), while dlm_recoverd is aborting and
284	 * resetting things from an in-progress recovery.  i.e. we want
285	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
286	 * processing an rcom at the same time.  Stopping dlm_recv also makes
287	 * it easy for dlm_receive_message() to check locking stopped and add a
288	 * message to the requestqueue without races.
289	 */
290
291	down_write(&ls->ls_recv_active);
292
293	/*
294	 * Abort any recovery that's in progress (see RECOVERY_STOP,
295	 * dlm_recovery_stopped()) and tell any other threads running in the
296	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
297	 */
298
299	spin_lock(&ls->ls_recover_lock);
300	set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
301	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
302	ls->ls_recover_seq++;
303	spin_unlock(&ls->ls_recover_lock);
304
305	/*
306	 * Let dlm_recv run again, now any normal messages will be saved on the
307	 * requestqueue for later.
308	 */
309
310	up_write(&ls->ls_recv_active);
311
312	/*
313	 * This in_recovery lock does two things:
314	 * 1) Keeps this function from returning until all threads are out
315	 *    of locking routines and locking is truly stopped.
316	 * 2) Keeps any new requests from being processed until it's unlocked
317	 *    when recovery is complete.
318	 */
319
320	if (new)
321		down_write(&ls->ls_in_recovery);
 
 
 
 
322
323	/*
324	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
325	 * running) has noticed RECOVERY_STOP above and quit processing the
326	 * previous recovery.
327	 */
328
329	dlm_recoverd_suspend(ls);
 
 
 
 
 
 
330	ls->ls_recover_status = 0;
 
 
331	dlm_recoverd_resume(ls);
332
333	if (!ls->ls_recover_begin)
334		ls->ls_recover_begin = jiffies;
 
 
 
 
 
 
 
 
 
 
 
335	return 0;
336}
337
338int dlm_ls_start(struct dlm_ls *ls)
339{
340	struct dlm_recover *rv = NULL, *rv_old;
341	int *ids = NULL, *new = NULL;
342	int error, ids_count = 0, new_count = 0;
343
344	rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
345	if (!rv)
346		return -ENOMEM;
347
348	error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
349				&new, &new_count);
350	if (error < 0)
351		goto fail;
352
353	spin_lock(&ls->ls_recover_lock);
354
355	/* the lockspace needs to be stopped before it can be started */
356
357	if (!dlm_locking_stopped(ls)) {
358		spin_unlock(&ls->ls_recover_lock);
359		log_error(ls, "start ignored: lockspace running");
360		error = -EINVAL;
361		goto fail;
362	}
363
364	rv->nodeids = ids;
365	rv->node_count = ids_count;
366	rv->new = new;
367	rv->new_count = new_count;
368	rv->seq = ++ls->ls_recover_seq;
369	rv_old = ls->ls_recover_args;
370	ls->ls_recover_args = rv;
371	spin_unlock(&ls->ls_recover_lock);
372
373	if (rv_old) {
374		log_error(ls, "unused recovery %llx %d",
375			  (unsigned long long)rv_old->seq, rv_old->node_count);
376		kfree(rv_old->nodeids);
377		kfree(rv_old->new);
378		kfree(rv_old);
379	}
380
381	dlm_recoverd_kick(ls);
 
382	return 0;
383
384 fail:
 
 
385	kfree(rv);
386	kfree(ids);
387	kfree(new);
388	return error;
389}
390
v6.2
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
  6**
 
 
 
  7**
  8*******************************************************************************
  9******************************************************************************/
 10
 11#include "dlm_internal.h"
 12#include "lockspace.h"
 13#include "member.h"
 14#include "recoverd.h"
 15#include "recover.h"
 16#include "rcom.h"
 17#include "config.h"
 18#include "midcomms.h"
 19#include "lowcomms.h"
 20
 21int dlm_slots_version(struct dlm_header *h)
 22{
 23	if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS)
 24		return 0;
 25	return 1;
 26}
 27
 28void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
 29		   struct dlm_member *memb)
 30{
 31	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
 32
 33	if (!dlm_slots_version(&rc->rc_header))
 34		return;
 35
 36	memb->slot = le16_to_cpu(rf->rf_our_slot);
 37	memb->generation = le32_to_cpu(rf->rf_generation);
 38}
 39
 40void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
 41{
 42	struct dlm_slot *slot;
 43	struct rcom_slot *ro;
 44	int i;
 45
 46	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
 47
 48	/* ls_slots array is sparse, but not rcom_slots */
 49
 50	for (i = 0; i < ls->ls_slots_size; i++) {
 51		slot = &ls->ls_slots[i];
 52		if (!slot->nodeid)
 53			continue;
 54		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
 55		ro->ro_slot = cpu_to_le16(slot->slot);
 56		ro++;
 57	}
 58}
 59
 60#define SLOT_DEBUG_LINE 128
 61
 62static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
 63		      struct rcom_slot *ro0, struct dlm_slot *array,
 64		      int array_size)
 65{
 66	char line[SLOT_DEBUG_LINE];
 67	int len = SLOT_DEBUG_LINE - 1;
 68	int pos = 0;
 69	int ret, i;
 70
 71	memset(line, 0, sizeof(line));
 72
 73	if (array) {
 74		for (i = 0; i < array_size; i++) {
 75			if (!array[i].nodeid)
 76				continue;
 77
 78			ret = snprintf(line + pos, len - pos, " %d:%d",
 79				       array[i].slot, array[i].nodeid);
 80			if (ret >= len - pos)
 81				break;
 82			pos += ret;
 83		}
 84	} else if (ro0) {
 85		for (i = 0; i < num_slots; i++) {
 86			ret = snprintf(line + pos, len - pos, " %d:%d",
 87				       ro0[i].ro_slot, ro0[i].ro_nodeid);
 88			if (ret >= len - pos)
 89				break;
 90			pos += ret;
 91		}
 92	}
 93
 94	log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line);
 95}
 96
 97int dlm_slots_copy_in(struct dlm_ls *ls)
 98{
 99	struct dlm_member *memb;
100	struct dlm_rcom *rc = ls->ls_recover_buf;
101	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
102	struct rcom_slot *ro0, *ro;
103	int our_nodeid = dlm_our_nodeid();
104	int i, num_slots;
105	uint32_t gen;
106
107	if (!dlm_slots_version(&rc->rc_header))
108		return -1;
109
110	gen = le32_to_cpu(rf->rf_generation);
111	if (gen <= ls->ls_generation) {
112		log_error(ls, "dlm_slots_copy_in gen %u old %u",
113			  gen, ls->ls_generation);
114	}
115	ls->ls_generation = gen;
116
117	num_slots = le16_to_cpu(rf->rf_num_slots);
118	if (!num_slots)
119		return -1;
120
121	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
122
123	log_slots(ls, gen, num_slots, ro0, NULL, 0);
124
125	list_for_each_entry(memb, &ls->ls_nodes, list) {
126		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
127			if (le32_to_cpu(ro->ro_nodeid) != memb->nodeid)
128				continue;
129			memb->slot = le16_to_cpu(ro->ro_slot);
130			memb->slot_prev = memb->slot;
131			break;
132		}
133
134		if (memb->nodeid == our_nodeid) {
135			if (ls->ls_slot && ls->ls_slot != memb->slot) {
136				log_error(ls, "dlm_slots_copy_in our slot "
137					  "changed %d %d", ls->ls_slot,
138					  memb->slot);
139				return -1;
140			}
141
142			if (!ls->ls_slot)
143				ls->ls_slot = memb->slot;
144		}
145
146		if (!memb->slot) {
147			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
148				   memb->nodeid);
149			return -1;
150		}
151	}
152
153	return 0;
154}
155
156/* for any nodes that do not support slots, we will not have set memb->slot
157   in wait_status_all(), so memb->slot will remain -1, and we will not
158   assign slots or set ls_num_slots here */
159
160int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
161		     struct dlm_slot **slots_out, uint32_t *gen_out)
162{
163	struct dlm_member *memb;
164	struct dlm_slot *array;
165	int our_nodeid = dlm_our_nodeid();
166	int array_size, max_slots, i;
167	int need = 0;
168	int max = 0;
169	int num = 0;
170	uint32_t gen = 0;
171
172	/* our own memb struct will have slot -1 gen 0 */
173
174	list_for_each_entry(memb, &ls->ls_nodes, list) {
175		if (memb->nodeid == our_nodeid) {
176			memb->slot = ls->ls_slot;
177			memb->generation = ls->ls_generation;
178			break;
179		}
180	}
181
182	list_for_each_entry(memb, &ls->ls_nodes, list) {
183		if (memb->generation > gen)
184			gen = memb->generation;
185
186		/* node doesn't support slots */
187
188		if (memb->slot == -1)
189			return -1;
190
191		/* node needs a slot assigned */
192
193		if (!memb->slot)
194			need++;
195
196		/* node has a slot assigned */
197
198		num++;
199
200		if (!max || max < memb->slot)
201			max = memb->slot;
202
203		/* sanity check, once slot is assigned it shouldn't change */
204
205		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
206			log_error(ls, "nodeid %d slot changed %d %d",
207				  memb->nodeid, memb->slot_prev, memb->slot);
208			return -1;
209		}
210		memb->slot_prev = memb->slot;
211	}
212
213	array_size = max + need;
214	array = kcalloc(array_size, sizeof(*array), GFP_NOFS);
215	if (!array)
216		return -ENOMEM;
217
218	num = 0;
219
220	/* fill in slots (offsets) that are used */
221
222	list_for_each_entry(memb, &ls->ls_nodes, list) {
223		if (!memb->slot)
224			continue;
225
226		if (memb->slot > array_size) {
227			log_error(ls, "invalid slot number %d", memb->slot);
228			kfree(array);
229			return -1;
230		}
231
232		array[memb->slot - 1].nodeid = memb->nodeid;
233		array[memb->slot - 1].slot = memb->slot;
234		num++;
235	}
236
237	/* assign new slots from unused offsets */
238
239	list_for_each_entry(memb, &ls->ls_nodes, list) {
240		if (memb->slot)
241			continue;
242
243		for (i = 0; i < array_size; i++) {
244			if (array[i].nodeid)
245				continue;
246
247			memb->slot = i + 1;
248			memb->slot_prev = memb->slot;
249			array[i].nodeid = memb->nodeid;
250			array[i].slot = memb->slot;
251			num++;
252
253			if (!ls->ls_slot && memb->nodeid == our_nodeid)
254				ls->ls_slot = memb->slot;
255			break;
256		}
257
258		if (!memb->slot) {
259			log_error(ls, "no free slot found");
260			kfree(array);
261			return -1;
262		}
263	}
264
265	gen++;
266
267	log_slots(ls, gen, num, NULL, array, array_size);
268
269	max_slots = (DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom) -
270		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
271
272	if (num > max_slots) {
273		log_error(ls, "num_slots %d exceeds max_slots %d",
274			  num, max_slots);
275		kfree(array);
276		return -1;
277	}
278
279	*gen_out = gen;
280	*slots_out = array;
281	*slots_size = array_size;
282	*num_slots = num;
283	return 0;
284}
285
286static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
287{
288	struct dlm_member *memb = NULL;
289	struct list_head *tmp;
290	struct list_head *newlist = &new->list;
291	struct list_head *head = &ls->ls_nodes;
292
293	list_for_each(tmp, head) {
294		memb = list_entry(tmp, struct dlm_member, list);
295		if (new->nodeid < memb->nodeid)
296			break;
297	}
298
299	if (!memb)
300		list_add_tail(newlist, head);
301	else {
302		/* FIXME: can use list macro here */
303		newlist->prev = tmp->prev;
304		newlist->next = tmp;
305		tmp->prev->next = newlist;
306		tmp->prev = newlist;
307	}
308}
309
310static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
311{
312	struct dlm_member *memb;
313	int error;
314
315	memb = kzalloc(sizeof(*memb), GFP_NOFS);
316	if (!memb)
317		return -ENOMEM;
318
319	error = dlm_lowcomms_connect_node(node->nodeid);
 
 
 
 
 
 
320	if (error < 0) {
321		kfree(memb);
322		return error;
323	}
324
325	memb->nodeid = node->nodeid;
326	memb->weight = node->weight;
327	memb->comm_seq = node->comm_seq;
328	dlm_midcomms_add_member(node->nodeid);
329	add_ordered_member(ls, memb);
330	ls->ls_num_nodes++;
331	return 0;
332}
333
334static struct dlm_member *find_memb(struct list_head *head, int nodeid)
 
 
 
 
 
 
335{
336	struct dlm_member *memb;
337
338	list_for_each_entry(memb, head, list) {
339		if (memb->nodeid == nodeid)
340			return memb;
341	}
342	return NULL;
343}
344
345int dlm_is_member(struct dlm_ls *ls, int nodeid)
346{
347	if (find_memb(&ls->ls_nodes, nodeid))
348		return 1;
349	return 0;
350}
351
352int dlm_is_removed(struct dlm_ls *ls, int nodeid)
353{
354	if (find_memb(&ls->ls_nodes_gone, nodeid))
355		return 1;
 
 
 
 
356	return 0;
357}
358
359static void clear_memb_list(struct list_head *head,
360			    void (*after_del)(int nodeid))
361{
362	struct dlm_member *memb;
363
364	while (!list_empty(head)) {
365		memb = list_entry(head->next, struct dlm_member, list);
366		list_del(&memb->list);
367		if (after_del)
368			after_del(memb->nodeid);
369		kfree(memb);
370	}
371}
372
373static void clear_members_cb(int nodeid)
374{
375	dlm_midcomms_remove_member(nodeid);
376}
377
378void dlm_clear_members(struct dlm_ls *ls)
379{
380	clear_memb_list(&ls->ls_nodes, clear_members_cb);
381	ls->ls_num_nodes = 0;
382}
383
384void dlm_clear_members_gone(struct dlm_ls *ls)
385{
386	clear_memb_list(&ls->ls_nodes_gone, NULL);
387}
388
389static void make_member_array(struct dlm_ls *ls)
390{
391	struct dlm_member *memb;
392	int i, w, x = 0, total = 0, all_zero = 0, *array;
393
394	kfree(ls->ls_node_array);
395	ls->ls_node_array = NULL;
396
397	list_for_each_entry(memb, &ls->ls_nodes, list) {
398		if (memb->weight)
399			total += memb->weight;
400	}
401
402	/* all nodes revert to weight of 1 if all have weight 0 */
403
404	if (!total) {
405		total = ls->ls_num_nodes;
406		all_zero = 1;
407	}
408
409	ls->ls_total_weight = total;
410	array = kmalloc_array(total, sizeof(*array), GFP_NOFS);
 
411	if (!array)
412		return;
413
414	list_for_each_entry(memb, &ls->ls_nodes, list) {
415		if (!all_zero && !memb->weight)
416			continue;
417
418		if (all_zero)
419			w = 1;
420		else
421			w = memb->weight;
422
423		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
424
425		for (i = 0; i < w; i++)
426			array[x++] = memb->nodeid;
427	}
428
429	ls->ls_node_array = array;
430}
431
432/* send a status request to all members just to establish comms connections */
433
434static int ping_members(struct dlm_ls *ls)
435{
436	struct dlm_member *memb;
437	int error = 0;
438
439	list_for_each_entry(memb, &ls->ls_nodes, list) {
440		if (dlm_recovery_stopped(ls)) {
441			error = -EINTR;
442			break;
443		}
444		error = dlm_rcom_status(ls, memb->nodeid, 0);
445		if (error)
446			break;
447	}
448	if (error)
449		log_rinfo(ls, "ping_members aborted %d last nodeid %d",
450			  error, ls->ls_recover_nodeid);
451	return error;
452}
453
454static void dlm_lsop_recover_prep(struct dlm_ls *ls)
455{
456	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
457		return;
458	ls->ls_ops->recover_prep(ls->ls_ops_arg);
459}
460
461static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
462{
463	struct dlm_slot slot;
464	uint32_t seq;
465	int error;
466
467	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
468		return;
469
470	/* if there is no comms connection with this node
471	   or the present comms connection is newer
472	   than the one when this member was added, then
473	   we consider the node to have failed (versus
474	   being removed due to dlm_release_lockspace) */
475
476	error = dlm_comm_seq(memb->nodeid, &seq);
477
478	if (!error && seq == memb->comm_seq)
479		return;
480
481	slot.nodeid = memb->nodeid;
482	slot.slot = memb->slot;
483
484	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
485}
486
487void dlm_lsop_recover_done(struct dlm_ls *ls)
488{
489	struct dlm_member *memb;
490	struct dlm_slot *slots;
491	int i, num;
492
493	if (!ls->ls_ops || !ls->ls_ops->recover_done)
494		return;
495
496	num = ls->ls_num_nodes;
497	slots = kcalloc(num, sizeof(*slots), GFP_KERNEL);
498	if (!slots)
499		return;
500
501	i = 0;
502	list_for_each_entry(memb, &ls->ls_nodes, list) {
503		if (i == num) {
504			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
505			goto out;
506		}
507		slots[i].nodeid = memb->nodeid;
508		slots[i].slot = memb->slot;
509		i++;
510	}
511
512	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
513				 ls->ls_slot, ls->ls_generation);
514 out:
515	kfree(slots);
516}
517
518static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
519						int nodeid)
520{
521	int i;
522
523	for (i = 0; i < rv->nodes_count; i++) {
524		if (rv->nodes[i].nodeid == nodeid)
525			return &rv->nodes[i];
526	}
527	return NULL;
528}
529
530int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
531{
532	struct dlm_member *memb, *safe;
533	struct dlm_config_node *node;
534	int i, error, neg = 0, low = -1;
535
536	/* previously removed members that we've not finished removing need to
537	 * count as a negative change so the "neg" recovery steps will happen
538	 *
539	 * This functionality must report all member changes to lsops or
540	 * midcomms layer and must never return before.
541	 */
542
543	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
544		log_rinfo(ls, "prev removed member %d", memb->nodeid);
545		neg++;
546	}
547
548	/* move departed members from ls_nodes to ls_nodes_gone */
549
550	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
551		node = find_config_node(rv, memb->nodeid);
552		if (node && !node->new)
553			continue;
 
 
 
 
554
555		if (!node) {
556			log_rinfo(ls, "remove member %d", memb->nodeid);
557		} else {
558			/* removed and re-added */
559			log_rinfo(ls, "remove member %d comm_seq %u %u",
560				  memb->nodeid, memb->comm_seq, node->comm_seq);
561		}
 
 
 
 
 
 
 
 
 
 
562
 
 
 
 
 
563		neg++;
564		list_move(&memb->list, &ls->ls_nodes_gone);
565		dlm_midcomms_remove_member(memb->nodeid);
566		ls->ls_num_nodes--;
567		dlm_lsop_recover_slot(ls, memb);
568	}
569
570	/* add new members to ls_nodes */
571
572	for (i = 0; i < rv->nodes_count; i++) {
573		node = &rv->nodes[i];
574		if (dlm_is_member(ls, node->nodeid))
575			continue;
576		error = dlm_add_member(ls, node);
577		if (error)
578			return error;
579
580		log_rinfo(ls, "add member %d", node->nodeid);
581	}
582
583	list_for_each_entry(memb, &ls->ls_nodes, list) {
584		if (low == -1 || memb->nodeid < low)
585			low = memb->nodeid;
586	}
587	ls->ls_low_nodeid = low;
588
589	make_member_array(ls);
 
590	*neg_out = neg;
591
592	error = ping_members(ls);
593	log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
 
 
 
 
 
 
 
 
 
 
 
594	return error;
595}
596
597/* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
598   dlm_ls_start() is called on any of them to start the new recovery. */
599
600int dlm_ls_stop(struct dlm_ls *ls)
601{
602	int new;
603
604	/*
605	 * Prevent dlm_recv from being in the middle of something when we do
606	 * the stop.  This includes ensuring dlm_recv isn't processing a
607	 * recovery message (rcom), while dlm_recoverd is aborting and
608	 * resetting things from an in-progress recovery.  i.e. we want
609	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
610	 * processing an rcom at the same time.  Stopping dlm_recv also makes
611	 * it easy for dlm_receive_message() to check locking stopped and add a
612	 * message to the requestqueue without races.
613	 */
614
615	down_write(&ls->ls_recv_active);
616
617	/*
618	 * Abort any recovery that's in progress (see RECOVER_STOP,
619	 * dlm_recovery_stopped()) and tell any other threads running in the
620	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
621	 */
622
623	spin_lock(&ls->ls_recover_lock);
624	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
625	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
626	ls->ls_recover_seq++;
627	spin_unlock(&ls->ls_recover_lock);
628
629	/*
630	 * Let dlm_recv run again, now any normal messages will be saved on the
631	 * requestqueue for later.
632	 */
633
634	up_write(&ls->ls_recv_active);
635
636	/*
637	 * This in_recovery lock does two things:
638	 * 1) Keeps this function from returning until all threads are out
639	 *    of locking routines and locking is truly stopped.
640	 * 2) Keeps any new requests from being processed until it's unlocked
641	 *    when recovery is complete.
642	 */
643
644	if (new) {
645		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
646		wake_up_process(ls->ls_recoverd_task);
647		wait_event(ls->ls_recover_lock_wait,
648			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
649	}
650
651	/*
652	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
653	 * running) has noticed RECOVER_STOP above and quit processing the
654	 * previous recovery.
655	 */
656
657	dlm_recoverd_suspend(ls);
658
659	spin_lock(&ls->ls_recover_lock);
660	kfree(ls->ls_slots);
661	ls->ls_slots = NULL;
662	ls->ls_num_slots = 0;
663	ls->ls_slots_size = 0;
664	ls->ls_recover_status = 0;
665	spin_unlock(&ls->ls_recover_lock);
666
667	dlm_recoverd_resume(ls);
668
669	if (!ls->ls_recover_begin)
670		ls->ls_recover_begin = jiffies;
671
672	/* call recover_prep ops only once and not multiple times
673	 * for each possible dlm_ls_stop() when recovery is already
674	 * stopped.
675	 *
676	 * If we successful was able to clear LSFL_RUNNING bit and
677	 * it was set we know it is the first dlm_ls_stop() call.
678	 */
679	if (new)
680		dlm_lsop_recover_prep(ls);
681
682	return 0;
683}
684
685int dlm_ls_start(struct dlm_ls *ls)
686{
687	struct dlm_recover *rv, *rv_old;
688	struct dlm_config_node *nodes = NULL;
689	int error, count;
690
691	rv = kzalloc(sizeof(*rv), GFP_NOFS);
692	if (!rv)
693		return -ENOMEM;
694
695	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
 
696	if (error < 0)
697		goto fail_rv;
698
699	spin_lock(&ls->ls_recover_lock);
700
701	/* the lockspace needs to be stopped before it can be started */
702
703	if (!dlm_locking_stopped(ls)) {
704		spin_unlock(&ls->ls_recover_lock);
705		log_error(ls, "start ignored: lockspace running");
706		error = -EINVAL;
707		goto fail;
708	}
709
710	rv->nodes = nodes;
711	rv->nodes_count = count;
 
 
712	rv->seq = ++ls->ls_recover_seq;
713	rv_old = ls->ls_recover_args;
714	ls->ls_recover_args = rv;
715	spin_unlock(&ls->ls_recover_lock);
716
717	if (rv_old) {
718		log_error(ls, "unused recovery %llx %d",
719			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
720		kfree(rv_old->nodes);
 
721		kfree(rv_old);
722	}
723
724	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
725	wake_up_process(ls->ls_recoverd_task);
726	return 0;
727
728 fail:
729	kfree(nodes);
730 fail_rv:
731	kfree(rv);
 
 
732	return error;
733}
734