Linux Audio

Check our new training course

Loading...
v6.13.7
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  7**
 
 
 
  8**
  9*******************************************************************************
 10******************************************************************************/
 11
 12#include <linux/module.h>
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 19#include "midcomms.h"
 20#include "config.h"
 21#include "memory.h"
 22#include "lock.h"
 23#include "recover.h"
 24#include "requestqueue.h"
 25#include "user.h"
 26#include "ast.h"
 27
 28static int			ls_count;
 29static struct mutex		ls_lock;
 30static struct list_head		lslist;
 31static spinlock_t		lslist_lock;
 
 
 32
 33static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 34{
 35	ssize_t ret = len;
 36	int n;
 37	int rc = kstrtoint(buf, 0, &n);
 38
 39	if (rc)
 40		return rc;
 41	ls = dlm_find_lockspace_local(ls);
 42	if (!ls)
 43		return -EINVAL;
 44
 45	switch (n) {
 46	case 0:
 47		dlm_ls_stop(ls);
 48		break;
 49	case 1:
 50		dlm_ls_start(ls);
 51		break;
 52	default:
 53		ret = -EINVAL;
 54	}
 55	dlm_put_lockspace(ls);
 56	return ret;
 57}
 58
 59static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 60{
 61	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 62
 63	if (rc)
 64		return rc;
 65	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 66	wake_up(&ls->ls_uevent_wait);
 67	return len;
 68}
 69
 70static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 71{
 72	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 73}
 74
 75static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 76{
 77	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 78
 79	if (rc)
 80		return rc;
 81	return len;
 82}
 83
 84static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 85{
 86	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 87}
 88
 89static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 90{
 91	int val;
 92	int rc = kstrtoint(buf, 0, &val);
 93
 94	if (rc)
 95		return rc;
 96	if (val == 1)
 97		set_bit(LSFL_NODIR, &ls->ls_flags);
 98	return len;
 99}
100
101static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102{
103	uint32_t status = dlm_recover_status(ls);
104	return snprintf(buf, PAGE_SIZE, "%x\n", status);
105}
106
107static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108{
109	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110}
111
112struct dlm_attr {
113	struct attribute attr;
114	ssize_t (*show)(struct dlm_ls *, char *);
115	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116};
117
118static struct dlm_attr dlm_attr_control = {
119	.attr  = {.name = "control", .mode = S_IWUSR},
120	.store = dlm_control_store
121};
122
123static struct dlm_attr dlm_attr_event = {
124	.attr  = {.name = "event_done", .mode = S_IWUSR},
125	.store = dlm_event_store
126};
127
128static struct dlm_attr dlm_attr_id = {
129	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130	.show  = dlm_id_show,
131	.store = dlm_id_store
132};
133
134static struct dlm_attr dlm_attr_nodir = {
135	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136	.show  = dlm_nodir_show,
137	.store = dlm_nodir_store
138};
139
140static struct dlm_attr dlm_attr_recover_status = {
141	.attr  = {.name = "recover_status", .mode = S_IRUGO},
142	.show  = dlm_recover_status_show
143};
144
145static struct dlm_attr dlm_attr_recover_nodeid = {
146	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147	.show  = dlm_recover_nodeid_show
148};
149
150static struct attribute *dlm_attrs[] = {
151	&dlm_attr_control.attr,
152	&dlm_attr_event.attr,
153	&dlm_attr_id.attr,
154	&dlm_attr_nodir.attr,
155	&dlm_attr_recover_status.attr,
156	&dlm_attr_recover_nodeid.attr,
157	NULL,
158};
159ATTRIBUTE_GROUPS(dlm);
160
161static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162			     char *buf)
163{
164	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166	return a->show ? a->show(ls, buf) : 0;
167}
168
169static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170			      const char *buf, size_t len)
171{
172	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174	return a->store ? a->store(ls, buf, len) : len;
175}
176
 
 
 
 
 
 
177static const struct sysfs_ops dlm_attr_ops = {
178	.show  = dlm_attr_show,
179	.store = dlm_attr_store,
180};
181
182static struct kobj_type dlm_ktype = {
183	.default_groups = dlm_groups,
184	.sysfs_ops     = &dlm_attr_ops,
 
185};
186
187static struct kset *dlm_kset;
188
189static int do_uevent(struct dlm_ls *ls, int in)
190{
 
 
191	if (in)
192		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
193	else
194		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
195
196	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
197
198	/* dlm_controld will see the uevent, do the necessary group management
199	   and then write to sysfs to wake us */
200
201	wait_event(ls->ls_uevent_wait,
202		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
203
204	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
205
206	return ls->ls_uevent_result;
 
 
 
 
 
 
 
 
207}
208
209static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
 
210{
211	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
212
213	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
214	return 0;
215}
216
217static const struct kset_uevent_ops dlm_uevent_ops = {
218	.uevent = dlm_uevent,
219};
220
221int __init dlm_lockspace_init(void)
222{
223	ls_count = 0;
224	mutex_init(&ls_lock);
225	INIT_LIST_HEAD(&lslist);
226	spin_lock_init(&lslist_lock);
227
228	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
229	if (!dlm_kset) {
230		printk(KERN_WARNING "%s: can not create kset\n", __func__);
231		return -ENOMEM;
232	}
233	return 0;
234}
235
236void dlm_lockspace_exit(void)
237{
238	kset_unregister(dlm_kset);
239}
240
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
241struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
242{
243	struct dlm_ls *ls;
244
245	spin_lock_bh(&lslist_lock);
246
247	list_for_each_entry(ls, &lslist, ls_list) {
248		if (ls->ls_global_id == id) {
249			atomic_inc(&ls->ls_count);
250			goto out;
251		}
252	}
253	ls = NULL;
254 out:
255	spin_unlock_bh(&lslist_lock);
256	return ls;
257}
258
259struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
260{
261	struct dlm_ls *ls = lockspace;
262
263	atomic_inc(&ls->ls_count);
 
 
 
 
 
 
 
 
 
264	return ls;
265}
266
267struct dlm_ls *dlm_find_lockspace_device(int minor)
268{
269	struct dlm_ls *ls;
270
271	spin_lock_bh(&lslist_lock);
272	list_for_each_entry(ls, &lslist, ls_list) {
273		if (ls->ls_device.minor == minor) {
274			atomic_inc(&ls->ls_count);
275			goto out;
276		}
277	}
278	ls = NULL;
279 out:
280	spin_unlock_bh(&lslist_lock);
281	return ls;
282}
283
284void dlm_put_lockspace(struct dlm_ls *ls)
285{
286	if (atomic_dec_and_test(&ls->ls_count))
287		wake_up(&ls->ls_count_wait);
 
288}
289
290static void remove_lockspace(struct dlm_ls *ls)
291{
292retry:
293	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
294
295	spin_lock_bh(&lslist_lock);
296	if (atomic_read(&ls->ls_count) != 0) {
297		spin_unlock_bh(&lslist_lock);
298		goto retry;
 
 
 
299	}
300
301	WARN_ON(ls->ls_create_count != 0);
302	list_del(&ls->ls_list);
303	spin_unlock_bh(&lslist_lock);
304}
305
306static int threads_start(void)
307{
308	int error;
309
310	/* Thread for sending/receiving messages for all lockspace's */
311	error = dlm_midcomms_start();
312	if (error)
313		log_print("cannot start dlm midcomms %d", error);
314
315	return error;
316}
317
318static int lkb_idr_free(struct dlm_lkb *lkb)
319{
320	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
321		dlm_free_lvb(lkb->lkb_lvbptr);
 
 
322
323	dlm_free_lkb(lkb);
324	return 0;
325}
326
327static void rhash_free_rsb(void *ptr, void *arg)
328{
329	struct dlm_rsb *rsb = ptr;
330
331	dlm_free_rsb(rsb);
 
 
 
332}
333
334static void free_lockspace(struct work_struct *work)
335{
336	struct dlm_ls *ls  = container_of(work, struct dlm_ls, ls_free_work);
337	struct dlm_lkb *lkb;
338	unsigned long id;
339
340	/*
341	 * Free all lkb's in xa
342	 */
343	xa_for_each(&ls->ls_lkbxa, id, lkb) {
344		lkb_idr_free(lkb);
345	}
346	xa_destroy(&ls->ls_lkbxa);
347
348	/*
349	 * Free all rsb's on rsbtbl
350	 */
351	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
352
353	kfree(ls);
354}
355
356static int new_lockspace(const char *name, const char *cluster,
357			 uint32_t flags, int lvblen,
358			 const struct dlm_lockspace_ops *ops, void *ops_arg,
359			 int *ops_result, dlm_lockspace_t **lockspace)
360{
361	struct dlm_ls *ls;
 
 
362	int namelen = strlen(name);
363	int error;
364
365	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
366		return -EINVAL;
367
368	if (lvblen % 8)
369		return -EINVAL;
370
371	if (!try_module_get(THIS_MODULE))
372		return -EINVAL;
373
374	if (!dlm_user_daemon_available()) {
375		log_print("dlm user daemon not available");
376		error = -EUNATCH;
377		goto out;
378	}
379
380	if (ops && ops_result) {
381	       	if (!dlm_config.ci_recover_callbacks)
382			*ops_result = -EOPNOTSUPP;
383		else
384			*ops_result = 0;
385	}
386
387	if (!cluster)
388		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389			  dlm_config.ci_cluster_name);
390
391	if (dlm_config.ci_recover_callbacks && cluster &&
392	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
393		log_print("dlm cluster name '%s' does not match "
394			  "the application cluster name '%s'",
395			  dlm_config.ci_cluster_name, cluster);
396		error = -EBADR;
397		goto out;
398	}
399
400	error = 0;
401
402	spin_lock_bh(&lslist_lock);
403	list_for_each_entry(ls, &lslist, ls_list) {
404		WARN_ON(ls->ls_create_count <= 0);
405		if (ls->ls_namelen != namelen)
406			continue;
407		if (memcmp(ls->ls_name, name, namelen))
408			continue;
409		if (flags & DLM_LSFL_NEWEXCL) {
410			error = -EEXIST;
411			break;
412		}
413		ls->ls_create_count++;
414		*lockspace = ls;
415		error = 1;
416		break;
417	}
418	spin_unlock_bh(&lslist_lock);
419
420	if (error)
421		goto out;
422
423	error = -ENOMEM;
424
425	ls = kzalloc(sizeof(*ls), GFP_NOFS);
426	if (!ls)
427		goto out;
428	memcpy(ls->ls_name, name, namelen);
429	ls->ls_namelen = namelen;
430	ls->ls_lvblen = lvblen;
431	atomic_set(&ls->ls_count, 0);
432	init_waitqueue_head(&ls->ls_count_wait);
433	ls->ls_flags = 0;
 
434
435	if (ops && dlm_config.ci_recover_callbacks) {
436		ls->ls_ops = ops;
437		ls->ls_ops_arg = ops_arg;
438	}
439
440	if (flags & DLM_LSFL_SOFTIRQ)
441		set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
442
443	/* ls_exflags are forced to match among nodes, and we don't
444	 * need to require all nodes to have some flags set
445	 */
446	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
447				    DLM_LSFL_SOFTIRQ));
448
449	INIT_LIST_HEAD(&ls->ls_slow_inactive);
450	INIT_LIST_HEAD(&ls->ls_slow_active);
451	rwlock_init(&ls->ls_rsbtbl_lock);
452
453	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
454	if (error)
455		goto out_lsfree;
 
 
 
 
 
456
457	xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
458	rwlock_init(&ls->ls_lkbxa_lock);
 
 
 
 
 
 
 
 
 
459
460	INIT_LIST_HEAD(&ls->ls_waiters);
461	spin_lock_init(&ls->ls_waiters_lock);
462	INIT_LIST_HEAD(&ls->ls_orphans);
463	spin_lock_init(&ls->ls_orphans_lock);
 
 
 
 
 
464
465	INIT_LIST_HEAD(&ls->ls_nodes);
466	INIT_LIST_HEAD(&ls->ls_nodes_gone);
467	ls->ls_num_nodes = 0;
468	ls->ls_low_nodeid = 0;
469	ls->ls_total_weight = 0;
470	ls->ls_node_array = NULL;
471
472	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
473	ls->ls_local_rsb.res_ls = ls;
474
475	ls->ls_debug_rsb_dentry = NULL;
476	ls->ls_debug_waiters_dentry = NULL;
477
478	init_waitqueue_head(&ls->ls_uevent_wait);
479	ls->ls_uevent_result = 0;
480	init_completion(&ls->ls_recovery_done);
481	ls->ls_recovery_result = -1;
482
483	spin_lock_init(&ls->ls_cb_lock);
484	INIT_LIST_HEAD(&ls->ls_cb_delay);
485
486	INIT_WORK(&ls->ls_free_work, free_lockspace);
487
488	ls->ls_recoverd_task = NULL;
489	mutex_init(&ls->ls_recoverd_active);
490	spin_lock_init(&ls->ls_recover_lock);
491	spin_lock_init(&ls->ls_rcom_spin);
492	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
493	ls->ls_recover_status = 0;
494	ls->ls_recover_seq = get_random_u64();
495	ls->ls_recover_args = NULL;
496	init_rwsem(&ls->ls_in_recovery);
497	rwlock_init(&ls->ls_recv_active);
498	INIT_LIST_HEAD(&ls->ls_requestqueue);
499	rwlock_init(&ls->ls_requestqueue_lock);
500	spin_lock_init(&ls->ls_clear_proc_locks);
501
502	/* Due backwards compatibility with 3.1 we need to use maximum
503	 * possible dlm message size to be sure the message will fit and
504	 * not having out of bounds issues. However on sending side 3.2
505	 * might send less.
506	 */
507	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
508	if (!ls->ls_recover_buf) {
509		error = -ENOMEM;
510		goto out_lkbxa;
511	}
512
513	ls->ls_slot = 0;
514	ls->ls_num_slots = 0;
515	ls->ls_slots_size = 0;
516	ls->ls_slots = NULL;
517
518	INIT_LIST_HEAD(&ls->ls_recover_list);
519	spin_lock_init(&ls->ls_recover_list_lock);
520	xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
521	spin_lock_init(&ls->ls_recover_xa_lock);
522	ls->ls_recover_list_count = 0;
 
523	init_waitqueue_head(&ls->ls_wait_general);
524	INIT_LIST_HEAD(&ls->ls_masters_list);
525	rwlock_init(&ls->ls_masters_lock);
526	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
527	rwlock_init(&ls->ls_dir_dump_lock);
528
529	INIT_LIST_HEAD(&ls->ls_scan_list);
530	spin_lock_init(&ls->ls_scan_lock);
531	timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
532
533	spin_lock_bh(&lslist_lock);
534	ls->ls_create_count = 1;
535	list_add(&ls->ls_list, &lslist);
536	spin_unlock_bh(&lslist_lock);
537
538	if (flags & DLM_LSFL_FS)
539		set_bit(LSFL_FS, &ls->ls_flags);
540
541	error = dlm_callback_start(ls);
542	if (error) {
543		log_error(ls, "can't start dlm_callback %d", error);
544		goto out_delist;
 
 
545	}
546
547	init_waitqueue_head(&ls->ls_recover_lock_wait);
548
549	/*
550	 * Once started, dlm_recoverd first looks for ls in lslist, then
551	 * initializes ls_in_recovery as locked in "down" mode.  We need
552	 * to wait for the wakeup from dlm_recoverd because in_recovery
553	 * has to start out in down mode.
554	 */
555
556	error = dlm_recoverd_start(ls);
557	if (error) {
558		log_error(ls, "can't start dlm_recoverd %d", error);
559		goto out_callback;
560	}
561
562	wait_event(ls->ls_recover_lock_wait,
563		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
564
565	ls->ls_kobj.kset = dlm_kset;
566	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
567				     "%s", ls->ls_name);
568	if (error)
569		goto out_recoverd;
570	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
571
 
 
 
572	/* This uevent triggers dlm_controld in userspace to add us to the
573	   group of nodes that are members of this lockspace (managed by the
574	   cluster infrastructure.)  Once it's done that, it tells us who the
575	   current lockspace members are (via configfs) and then tells the
576	   lockspace to start running (via sysfs) in dlm_ls_start(). */
577
578	error = do_uevent(ls, 1);
579	if (error)
580		goto out_recoverd;
581
582	/* wait until recovery is successful or failed */
583	wait_for_completion(&ls->ls_recovery_done);
584	error = ls->ls_recovery_result;
585	if (error)
586		goto out_members;
587
588	dlm_create_debug_file(ls);
589
590	log_rinfo(ls, "join complete");
591	*lockspace = ls;
592	return 0;
593
594 out_members:
595	do_uevent(ls, 0);
596	dlm_clear_members(ls);
597	kfree(ls->ls_node_array);
598 out_recoverd:
599	dlm_recoverd_stop(ls);
600 out_callback:
601	dlm_callback_stop(ls);
602 out_delist:
603	spin_lock_bh(&lslist_lock);
604	list_del(&ls->ls_list);
605	spin_unlock_bh(&lslist_lock);
606	xa_destroy(&ls->ls_recover_xa);
607	kfree(ls->ls_recover_buf);
608 out_lkbxa:
609	xa_destroy(&ls->ls_lkbxa);
610	rhashtable_destroy(&ls->ls_rsbtbl);
 
 
 
 
 
611 out_lsfree:
612	kobject_put(&ls->ls_kobj);
613	kfree(ls);
 
 
614 out:
615	module_put(THIS_MODULE);
616	return error;
617}
618
619static int __dlm_new_lockspace(const char *name, const char *cluster,
620			       uint32_t flags, int lvblen,
621			       const struct dlm_lockspace_ops *ops,
622			       void *ops_arg, int *ops_result,
623			       dlm_lockspace_t **lockspace)
624{
625	int error = 0;
626
627	mutex_lock(&ls_lock);
628	if (!ls_count)
629		error = threads_start();
630	if (error)
631		goto out;
632
633	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
634			      ops_result, lockspace);
635	if (!error)
636		ls_count++;
637	if (error > 0)
638		error = 0;
639	if (!ls_count) {
640		dlm_midcomms_shutdown();
641		dlm_midcomms_stop();
642	}
643 out:
644	mutex_unlock(&ls_lock);
645	return error;
646}
647
648int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
649		      int lvblen, const struct dlm_lockspace_ops *ops,
650		      void *ops_arg, int *ops_result,
651		      dlm_lockspace_t **lockspace)
652{
653	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
654				   ops, ops_arg, ops_result, lockspace);
655}
656
657int dlm_new_user_lockspace(const char *name, const char *cluster,
658			   uint32_t flags, int lvblen,
659			   const struct dlm_lockspace_ops *ops,
660			   void *ops_arg, int *ops_result,
661			   dlm_lockspace_t **lockspace)
662{
663	if (flags & DLM_LSFL_SOFTIRQ)
664		return -EINVAL;
665
666	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
667				   ops_arg, ops_result, lockspace);
668}
669
670/* NOTE: We check the lkbxa here rather than the resource table.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
671   This is because there may be LKBs queued as ASTs that have been unlinked
672   from their RSBs and are pending deletion once the AST has been delivered */
673
674static int lockspace_busy(struct dlm_ls *ls, int force)
675{
676	struct dlm_lkb *lkb;
677	unsigned long id;
678	int rv = 0;
679
680	read_lock_bh(&ls->ls_lkbxa_lock);
681	if (force == 0) {
682		xa_for_each(&ls->ls_lkbxa, id, lkb) {
683			rv = 1;
684			break;
685		}
686	} else if (force == 1) {
687		xa_for_each(&ls->ls_lkbxa, id, lkb) {
688			if (lkb->lkb_nodeid == 0 &&
689			    lkb->lkb_grmode != DLM_LOCK_IV) {
690				rv = 1;
691				break;
692			}
693		}
694	} else {
695		rv = 0;
696	}
697	read_unlock_bh(&ls->ls_lkbxa_lock);
698	return rv;
699}
700
701static int release_lockspace(struct dlm_ls *ls, int force)
702{
703	int busy, rv;
 
 
704
705	busy = lockspace_busy(ls, force);
706
707	spin_lock_bh(&lslist_lock);
708	if (ls->ls_create_count == 1) {
709		if (busy) {
710			rv = -EBUSY;
711		} else {
712			/* remove_lockspace takes ls off lslist */
713			ls->ls_create_count = 0;
714			rv = 0;
715		}
716	} else if (ls->ls_create_count > 1) {
717		rv = --ls->ls_create_count;
718	} else {
719		rv = -EINVAL;
720	}
721	spin_unlock_bh(&lslist_lock);
722
723	if (rv) {
724		log_debug(ls, "release_lockspace no remove %d", rv);
725		return rv;
726	}
727
728	if (ls_count == 1)
729		dlm_midcomms_version_wait();
730
731	dlm_device_deregister(ls);
732
733	if (force < 3 && dlm_user_daemon_available())
734		do_uevent(ls, 0);
735
736	dlm_recoverd_stop(ls);
737
738	/* clear the LSFL_RUNNING flag to fast up
739	 * time_shutdown_sync(), we don't care anymore
740	 */
741	clear_bit(LSFL_RUNNING, &ls->ls_flags);
742	timer_shutdown_sync(&ls->ls_scan_timer);
743
744	if (ls_count == 1) {
745		dlm_clear_members(ls);
746		dlm_midcomms_shutdown();
747	}
748
749	dlm_callback_stop(ls);
750
751	remove_lockspace(ls);
752
753	dlm_delete_debug_file(ls);
754
755	kobject_put(&ls->ls_kobj);
756
757	xa_destroy(&ls->ls_recover_xa);
758	kfree(ls->ls_recover_buf);
759
760	/*
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
761	 * Free structures on any other lists
762	 */
763
764	dlm_purge_requestqueue(ls);
765	kfree(ls->ls_recover_args);
766	dlm_clear_members(ls);
767	dlm_clear_members_gone(ls);
768	kfree(ls->ls_node_array);
 
 
 
769
770	log_rinfo(ls, "%s final free", __func__);
771
772	/* delayed free of data structures see free_lockspace() */
773	queue_work(dlm_wq, &ls->ls_free_work);
774	module_put(THIS_MODULE);
775	return 0;
776}
777
778/*
779 * Called when a system has released all its locks and is not going to use the
780 * lockspace any longer.  We free everything we're managing for this lockspace.
781 * Remaining nodes will go through the recovery process as if we'd died.  The
782 * lockspace must continue to function as usual, participating in recoveries,
783 * until this returns.
784 *
785 * Force has 4 possible values:
786 * 0 - don't destroy lockspace if it has any LKBs
787 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788 * 2 - destroy lockspace regardless of LKBs
789 * 3 - destroy lockspace as part of a forced shutdown
790 */
791
792int dlm_release_lockspace(void *lockspace, int force)
793{
794	struct dlm_ls *ls;
795	int error;
796
797	ls = dlm_find_lockspace_local(lockspace);
798	if (!ls)
799		return -EINVAL;
800	dlm_put_lockspace(ls);
801
802	mutex_lock(&ls_lock);
803	error = release_lockspace(ls, force);
804	if (!error)
805		ls_count--;
806	if (!ls_count)
807		dlm_midcomms_stop();
808	mutex_unlock(&ls_lock);
809
810	return error;
811}
812
813void dlm_stop_lockspaces(void)
814{
815	struct dlm_ls *ls;
816	int count;
817
818 restart:
819	count = 0;
820	spin_lock_bh(&lslist_lock);
821	list_for_each_entry(ls, &lslist, ls_list) {
822		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
823			count++;
824			continue;
825		}
826		spin_unlock_bh(&lslist_lock);
827		log_error(ls, "no userland control daemon, stopping lockspace");
828		dlm_ls_stop(ls);
829		goto restart;
830	}
831	spin_unlock_bh(&lslist_lock);
832
833	if (count)
834		log_print("dlm user daemon left %d lockspaces", count);
835}
v3.15
 
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 
 
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 19#include "lowcomms.h"
 20#include "config.h"
 21#include "memory.h"
 22#include "lock.h"
 23#include "recover.h"
 24#include "requestqueue.h"
 25#include "user.h"
 26#include "ast.h"
 27
 28static int			ls_count;
 29static struct mutex		ls_lock;
 30static struct list_head		lslist;
 31static spinlock_t		lslist_lock;
 32static struct task_struct *	scand_task;
 33
 34
 35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 36{
 37	ssize_t ret = len;
 38	int n = simple_strtol(buf, NULL, 0);
 
 39
 40	ls = dlm_find_lockspace_local(ls->ls_local_handle);
 
 
 41	if (!ls)
 42		return -EINVAL;
 43
 44	switch (n) {
 45	case 0:
 46		dlm_ls_stop(ls);
 47		break;
 48	case 1:
 49		dlm_ls_start(ls);
 50		break;
 51	default:
 52		ret = -EINVAL;
 53	}
 54	dlm_put_lockspace(ls);
 55	return ret;
 56}
 57
 58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 59{
 60	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
 
 
 
 61	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 62	wake_up(&ls->ls_uevent_wait);
 63	return len;
 64}
 65
 66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 67{
 68	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 69}
 70
 71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 72{
 73	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
 
 
 
 74	return len;
 75}
 76
 77static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 78{
 79	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 80}
 81
 82static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 83{
 84	int val = simple_strtoul(buf, NULL, 0);
 
 
 
 
 85	if (val == 1)
 86		set_bit(LSFL_NODIR, &ls->ls_flags);
 87	return len;
 88}
 89
 90static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 91{
 92	uint32_t status = dlm_recover_status(ls);
 93	return snprintf(buf, PAGE_SIZE, "%x\n", status);
 94}
 95
 96static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 97{
 98	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 99}
100
101struct dlm_attr {
102	struct attribute attr;
103	ssize_t (*show)(struct dlm_ls *, char *);
104	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
105};
106
107static struct dlm_attr dlm_attr_control = {
108	.attr  = {.name = "control", .mode = S_IWUSR},
109	.store = dlm_control_store
110};
111
112static struct dlm_attr dlm_attr_event = {
113	.attr  = {.name = "event_done", .mode = S_IWUSR},
114	.store = dlm_event_store
115};
116
117static struct dlm_attr dlm_attr_id = {
118	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119	.show  = dlm_id_show,
120	.store = dlm_id_store
121};
122
123static struct dlm_attr dlm_attr_nodir = {
124	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125	.show  = dlm_nodir_show,
126	.store = dlm_nodir_store
127};
128
129static struct dlm_attr dlm_attr_recover_status = {
130	.attr  = {.name = "recover_status", .mode = S_IRUGO},
131	.show  = dlm_recover_status_show
132};
133
134static struct dlm_attr dlm_attr_recover_nodeid = {
135	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
136	.show  = dlm_recover_nodeid_show
137};
138
139static struct attribute *dlm_attrs[] = {
140	&dlm_attr_control.attr,
141	&dlm_attr_event.attr,
142	&dlm_attr_id.attr,
143	&dlm_attr_nodir.attr,
144	&dlm_attr_recover_status.attr,
145	&dlm_attr_recover_nodeid.attr,
146	NULL,
147};
 
148
149static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150			     char *buf)
151{
152	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
153	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154	return a->show ? a->show(ls, buf) : 0;
155}
156
157static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158			      const char *buf, size_t len)
159{
160	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
161	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162	return a->store ? a->store(ls, buf, len) : len;
163}
164
165static void lockspace_kobj_release(struct kobject *k)
166{
167	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
168	kfree(ls);
169}
170
171static const struct sysfs_ops dlm_attr_ops = {
172	.show  = dlm_attr_show,
173	.store = dlm_attr_store,
174};
175
176static struct kobj_type dlm_ktype = {
177	.default_attrs = dlm_attrs,
178	.sysfs_ops     = &dlm_attr_ops,
179	.release       = lockspace_kobj_release,
180};
181
182static struct kset *dlm_kset;
183
184static int do_uevent(struct dlm_ls *ls, int in)
185{
186	int error;
187
188	if (in)
189		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
190	else
191		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
192
193	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
194
195	/* dlm_controld will see the uevent, do the necessary group management
196	   and then write to sysfs to wake us */
197
198	error = wait_event_interruptible(ls->ls_uevent_wait,
199			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
200
201	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
202
203	if (error)
204		goto out;
205
206	error = ls->ls_uevent_result;
207 out:
208	if (error)
209		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210			  error, ls->ls_uevent_result);
211	return error;
212}
213
214static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215		      struct kobj_uevent_env *env)
216{
217	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
218
219	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220	return 0;
221}
222
223static struct kset_uevent_ops dlm_uevent_ops = {
224	.uevent = dlm_uevent,
225};
226
227int __init dlm_lockspace_init(void)
228{
229	ls_count = 0;
230	mutex_init(&ls_lock);
231	INIT_LIST_HEAD(&lslist);
232	spin_lock_init(&lslist_lock);
233
234	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235	if (!dlm_kset) {
236		printk(KERN_WARNING "%s: can not create kset\n", __func__);
237		return -ENOMEM;
238	}
239	return 0;
240}
241
242void dlm_lockspace_exit(void)
243{
244	kset_unregister(dlm_kset);
245}
246
247static struct dlm_ls *find_ls_to_scan(void)
248{
249	struct dlm_ls *ls;
250
251	spin_lock(&lslist_lock);
252	list_for_each_entry(ls, &lslist, ls_list) {
253		if (time_after_eq(jiffies, ls->ls_scan_time +
254					    dlm_config.ci_scan_secs * HZ)) {
255			spin_unlock(&lslist_lock);
256			return ls;
257		}
258	}
259	spin_unlock(&lslist_lock);
260	return NULL;
261}
262
263static int dlm_scand(void *data)
264{
265	struct dlm_ls *ls;
266
267	while (!kthread_should_stop()) {
268		ls = find_ls_to_scan();
269		if (ls) {
270			if (dlm_lock_recovery_try(ls)) {
271				ls->ls_scan_time = jiffies;
272				dlm_scan_rsbs(ls);
273				dlm_scan_timeout(ls);
274				dlm_scan_waiters(ls);
275				dlm_unlock_recovery(ls);
276			} else {
277				ls->ls_scan_time += HZ;
278			}
279			continue;
280		}
281		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
282	}
283	return 0;
284}
285
286static int dlm_scand_start(void)
287{
288	struct task_struct *p;
289	int error = 0;
290
291	p = kthread_run(dlm_scand, NULL, "dlm_scand");
292	if (IS_ERR(p))
293		error = PTR_ERR(p);
294	else
295		scand_task = p;
296	return error;
297}
298
299static void dlm_scand_stop(void)
300{
301	kthread_stop(scand_task);
302}
303
304struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
305{
306	struct dlm_ls *ls;
307
308	spin_lock(&lslist_lock);
309
310	list_for_each_entry(ls, &lslist, ls_list) {
311		if (ls->ls_global_id == id) {
312			ls->ls_count++;
313			goto out;
314		}
315	}
316	ls = NULL;
317 out:
318	spin_unlock(&lslist_lock);
319	return ls;
320}
321
322struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
323{
324	struct dlm_ls *ls;
325
326	spin_lock(&lslist_lock);
327	list_for_each_entry(ls, &lslist, ls_list) {
328		if (ls->ls_local_handle == lockspace) {
329			ls->ls_count++;
330			goto out;
331		}
332	}
333	ls = NULL;
334 out:
335	spin_unlock(&lslist_lock);
336	return ls;
337}
338
339struct dlm_ls *dlm_find_lockspace_device(int minor)
340{
341	struct dlm_ls *ls;
342
343	spin_lock(&lslist_lock);
344	list_for_each_entry(ls, &lslist, ls_list) {
345		if (ls->ls_device.minor == minor) {
346			ls->ls_count++;
347			goto out;
348		}
349	}
350	ls = NULL;
351 out:
352	spin_unlock(&lslist_lock);
353	return ls;
354}
355
356void dlm_put_lockspace(struct dlm_ls *ls)
357{
358	spin_lock(&lslist_lock);
359	ls->ls_count--;
360	spin_unlock(&lslist_lock);
361}
362
363static void remove_lockspace(struct dlm_ls *ls)
364{
365	for (;;) {
366		spin_lock(&lslist_lock);
367		if (ls->ls_count == 0) {
368			WARN_ON(ls->ls_create_count != 0);
369			list_del(&ls->ls_list);
370			spin_unlock(&lslist_lock);
371			return;
372		}
373		spin_unlock(&lslist_lock);
374		ssleep(1);
375	}
 
 
 
 
376}
377
378static int threads_start(void)
379{
380	int error;
381
382	error = dlm_scand_start();
383	if (error) {
384		log_print("cannot start dlm_scand thread %d", error);
385		goto fail;
386	}
 
 
387
388	/* Thread for sending/receiving messages for all lockspace's */
389	error = dlm_lowcomms_start();
390	if (error) {
391		log_print("cannot start dlm lowcomms %d", error);
392		goto scand_fail;
393	}
394
 
395	return 0;
 
 
 
 
 
396
397 scand_fail:
398	dlm_scand_stop();
399 fail:
400	return error;
401}
402
403static void threads_stop(void)
404{
405	dlm_scand_stop();
406	dlm_lowcomms_stop();
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
407}
408
409static int new_lockspace(const char *name, const char *cluster,
410			 uint32_t flags, int lvblen,
411			 const struct dlm_lockspace_ops *ops, void *ops_arg,
412			 int *ops_result, dlm_lockspace_t **lockspace)
413{
414	struct dlm_ls *ls;
415	int i, size, error;
416	int do_unreg = 0;
417	int namelen = strlen(name);
 
418
419	if (namelen > DLM_LOCKSPACE_LEN)
420		return -EINVAL;
421
422	if (!lvblen || (lvblen % 8))
423		return -EINVAL;
424
425	if (!try_module_get(THIS_MODULE))
426		return -EINVAL;
427
428	if (!dlm_user_daemon_available()) {
429		log_print("dlm user daemon not available");
430		error = -EUNATCH;
431		goto out;
432	}
433
434	if (ops && ops_result) {
435	       	if (!dlm_config.ci_recover_callbacks)
436			*ops_result = -EOPNOTSUPP;
437		else
438			*ops_result = 0;
439	}
440
 
 
 
 
441	if (dlm_config.ci_recover_callbacks && cluster &&
442	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443		log_print("dlm cluster name %s mismatch %s",
 
444			  dlm_config.ci_cluster_name, cluster);
445		error = -EBADR;
446		goto out;
447	}
448
449	error = 0;
450
451	spin_lock(&lslist_lock);
452	list_for_each_entry(ls, &lslist, ls_list) {
453		WARN_ON(ls->ls_create_count <= 0);
454		if (ls->ls_namelen != namelen)
455			continue;
456		if (memcmp(ls->ls_name, name, namelen))
457			continue;
458		if (flags & DLM_LSFL_NEWEXCL) {
459			error = -EEXIST;
460			break;
461		}
462		ls->ls_create_count++;
463		*lockspace = ls;
464		error = 1;
465		break;
466	}
467	spin_unlock(&lslist_lock);
468
469	if (error)
470		goto out;
471
472	error = -ENOMEM;
473
474	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475	if (!ls)
476		goto out;
477	memcpy(ls->ls_name, name, namelen);
478	ls->ls_namelen = namelen;
479	ls->ls_lvblen = lvblen;
480	ls->ls_count = 0;
 
481	ls->ls_flags = 0;
482	ls->ls_scan_time = jiffies;
483
484	if (ops && dlm_config.ci_recover_callbacks) {
485		ls->ls_ops = ops;
486		ls->ls_ops_arg = ops_arg;
487	}
488
489	if (flags & DLM_LSFL_TIMEWARN)
490		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
491
492	/* ls_exflags are forced to match among nodes, and we don't
493	   need to require all nodes to have some flags set */
494	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
495				    DLM_LSFL_NEWEXCL));
 
496
497	size = dlm_config.ci_rsbtbl_size;
498	ls->ls_rsbtbl_size = size;
 
499
500	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501	if (!ls->ls_rsbtbl)
502		goto out_lsfree;
503	for (i = 0; i < size; i++) {
504		ls->ls_rsbtbl[i].keep.rb_node = NULL;
505		ls->ls_rsbtbl[i].toss.rb_node = NULL;
506		spin_lock_init(&ls->ls_rsbtbl[i].lock);
507	}
508
509	spin_lock_init(&ls->ls_remove_spin);
510
511	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
512		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
513						 GFP_KERNEL);
514		if (!ls->ls_remove_names[i])
515			goto out_rsbtbl;
516	}
517
518	idr_init(&ls->ls_lkbidr);
519	spin_lock_init(&ls->ls_lkbidr_spin);
520
521	INIT_LIST_HEAD(&ls->ls_waiters);
522	mutex_init(&ls->ls_waiters_mutex);
523	INIT_LIST_HEAD(&ls->ls_orphans);
524	mutex_init(&ls->ls_orphans_mutex);
525	INIT_LIST_HEAD(&ls->ls_timeout);
526	mutex_init(&ls->ls_timeout_mutex);
527
528	INIT_LIST_HEAD(&ls->ls_new_rsb);
529	spin_lock_init(&ls->ls_new_rsb_spin);
530
531	INIT_LIST_HEAD(&ls->ls_nodes);
532	INIT_LIST_HEAD(&ls->ls_nodes_gone);
533	ls->ls_num_nodes = 0;
534	ls->ls_low_nodeid = 0;
535	ls->ls_total_weight = 0;
536	ls->ls_node_array = NULL;
537
538	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
539	ls->ls_stub_rsb.res_ls = ls;
540
541	ls->ls_debug_rsb_dentry = NULL;
542	ls->ls_debug_waiters_dentry = NULL;
543
544	init_waitqueue_head(&ls->ls_uevent_wait);
545	ls->ls_uevent_result = 0;
546	init_completion(&ls->ls_members_done);
547	ls->ls_members_result = -1;
548
549	mutex_init(&ls->ls_cb_mutex);
550	INIT_LIST_HEAD(&ls->ls_cb_delay);
551
 
 
552	ls->ls_recoverd_task = NULL;
553	mutex_init(&ls->ls_recoverd_active);
554	spin_lock_init(&ls->ls_recover_lock);
555	spin_lock_init(&ls->ls_rcom_spin);
556	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
557	ls->ls_recover_status = 0;
558	ls->ls_recover_seq = 0;
559	ls->ls_recover_args = NULL;
560	init_rwsem(&ls->ls_in_recovery);
561	init_rwsem(&ls->ls_recv_active);
562	INIT_LIST_HEAD(&ls->ls_requestqueue);
563	mutex_init(&ls->ls_requestqueue_mutex);
564	mutex_init(&ls->ls_clear_proc_locks);
565
566	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
567	if (!ls->ls_recover_buf)
568		goto out_lkbidr;
 
 
 
 
 
 
 
569
570	ls->ls_slot = 0;
571	ls->ls_num_slots = 0;
572	ls->ls_slots_size = 0;
573	ls->ls_slots = NULL;
574
575	INIT_LIST_HEAD(&ls->ls_recover_list);
576	spin_lock_init(&ls->ls_recover_list_lock);
577	idr_init(&ls->ls_recover_idr);
578	spin_lock_init(&ls->ls_recover_idr_lock);
579	ls->ls_recover_list_count = 0;
580	ls->ls_local_handle = ls;
581	init_waitqueue_head(&ls->ls_wait_general);
582	INIT_LIST_HEAD(&ls->ls_root_list);
583	init_rwsem(&ls->ls_root_sem);
 
 
 
 
 
 
584
585	spin_lock(&lslist_lock);
586	ls->ls_create_count = 1;
587	list_add(&ls->ls_list, &lslist);
588	spin_unlock(&lslist_lock);
 
 
 
589
590	if (flags & DLM_LSFL_FS) {
591		error = dlm_callback_start(ls);
592		if (error) {
593			log_error(ls, "can't start dlm_callback %d", error);
594			goto out_delist;
595		}
596	}
597
598	init_waitqueue_head(&ls->ls_recover_lock_wait);
599
600	/*
601	 * Once started, dlm_recoverd first looks for ls in lslist, then
602	 * initializes ls_in_recovery as locked in "down" mode.  We need
603	 * to wait for the wakeup from dlm_recoverd because in_recovery
604	 * has to start out in down mode.
605	 */
606
607	error = dlm_recoverd_start(ls);
608	if (error) {
609		log_error(ls, "can't start dlm_recoverd %d", error);
610		goto out_callback;
611	}
612
613	wait_event(ls->ls_recover_lock_wait,
614		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
615
616	ls->ls_kobj.kset = dlm_kset;
617	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
618				     "%s", ls->ls_name);
619	if (error)
620		goto out_recoverd;
621	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
622
623	/* let kobject handle freeing of ls if there's an error */
624	do_unreg = 1;
625
626	/* This uevent triggers dlm_controld in userspace to add us to the
627	   group of nodes that are members of this lockspace (managed by the
628	   cluster infrastructure.)  Once it's done that, it tells us who the
629	   current lockspace members are (via configfs) and then tells the
630	   lockspace to start running (via sysfs) in dlm_ls_start(). */
631
632	error = do_uevent(ls, 1);
633	if (error)
634		goto out_recoverd;
635
636	wait_for_completion(&ls->ls_members_done);
637	error = ls->ls_members_result;
 
638	if (error)
639		goto out_members;
640
641	dlm_create_debug_file(ls);
642
643	log_rinfo(ls, "join complete");
644	*lockspace = ls;
645	return 0;
646
647 out_members:
648	do_uevent(ls, 0);
649	dlm_clear_members(ls);
650	kfree(ls->ls_node_array);
651 out_recoverd:
652	dlm_recoverd_stop(ls);
653 out_callback:
654	dlm_callback_stop(ls);
655 out_delist:
656	spin_lock(&lslist_lock);
657	list_del(&ls->ls_list);
658	spin_unlock(&lslist_lock);
659	idr_destroy(&ls->ls_recover_idr);
660	kfree(ls->ls_recover_buf);
661 out_lkbidr:
662	idr_destroy(&ls->ls_lkbidr);
663	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
664		if (ls->ls_remove_names[i])
665			kfree(ls->ls_remove_names[i]);
666	}
667 out_rsbtbl:
668	vfree(ls->ls_rsbtbl);
669 out_lsfree:
670	if (do_unreg)
671		kobject_put(&ls->ls_kobj);
672	else
673		kfree(ls);
674 out:
675	module_put(THIS_MODULE);
676	return error;
677}
678
679int dlm_new_lockspace(const char *name, const char *cluster,
680		      uint32_t flags, int lvblen,
681		      const struct dlm_lockspace_ops *ops, void *ops_arg,
682		      int *ops_result, dlm_lockspace_t **lockspace)
 
683{
684	int error = 0;
685
686	mutex_lock(&ls_lock);
687	if (!ls_count)
688		error = threads_start();
689	if (error)
690		goto out;
691
692	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
693			      ops_result, lockspace);
694	if (!error)
695		ls_count++;
696	if (error > 0)
697		error = 0;
698	if (!ls_count)
699		threads_stop();
 
 
700 out:
701	mutex_unlock(&ls_lock);
702	return error;
703}
704
705static int lkb_idr_is_local(int id, void *p, void *data)
 
 
 
 
 
 
 
 
 
 
 
 
 
706{
707	struct dlm_lkb *lkb = p;
 
708
709	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 
710}
711
712static int lkb_idr_is_any(int id, void *p, void *data)
713{
714	return 1;
715}
716
717static int lkb_idr_free(int id, void *p, void *data)
718{
719	struct dlm_lkb *lkb = p;
720
721	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
722		dlm_free_lvb(lkb->lkb_lvbptr);
723
724	dlm_free_lkb(lkb);
725	return 0;
726}
727
728/* NOTE: We check the lkbidr here rather than the resource table.
729   This is because there may be LKBs queued as ASTs that have been unlinked
730   from their RSBs and are pending deletion once the AST has been delivered */
731
732static int lockspace_busy(struct dlm_ls *ls, int force)
733{
734	int rv;
 
 
735
736	spin_lock(&ls->ls_lkbidr_spin);
737	if (force == 0) {
738		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 
 
 
739	} else if (force == 1) {
740		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 
 
 
 
 
 
741	} else {
742		rv = 0;
743	}
744	spin_unlock(&ls->ls_lkbidr_spin);
745	return rv;
746}
747
748static int release_lockspace(struct dlm_ls *ls, int force)
749{
750	struct dlm_rsb *rsb;
751	struct rb_node *n;
752	int i, busy, rv;
753
754	busy = lockspace_busy(ls, force);
755
756	spin_lock(&lslist_lock);
757	if (ls->ls_create_count == 1) {
758		if (busy) {
759			rv = -EBUSY;
760		} else {
761			/* remove_lockspace takes ls off lslist */
762			ls->ls_create_count = 0;
763			rv = 0;
764		}
765	} else if (ls->ls_create_count > 1) {
766		rv = --ls->ls_create_count;
767	} else {
768		rv = -EINVAL;
769	}
770	spin_unlock(&lslist_lock);
771
772	if (rv) {
773		log_debug(ls, "release_lockspace no remove %d", rv);
774		return rv;
775	}
776
 
 
 
777	dlm_device_deregister(ls);
778
779	if (force < 3 && dlm_user_daemon_available())
780		do_uevent(ls, 0);
781
782	dlm_recoverd_stop(ls);
783
 
 
 
 
 
 
 
 
 
 
 
784	dlm_callback_stop(ls);
785
786	remove_lockspace(ls);
787
788	dlm_delete_debug_file(ls);
789
 
 
 
790	kfree(ls->ls_recover_buf);
791
792	/*
793	 * Free all lkb's in idr
794	 */
795
796	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
797	idr_destroy(&ls->ls_lkbidr);
798
799	/*
800	 * Free all rsb's on rsbtbl[] lists
801	 */
802
803	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
804		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
805			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
806			rb_erase(n, &ls->ls_rsbtbl[i].keep);
807			dlm_free_rsb(rsb);
808		}
809
810		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
811			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
812			rb_erase(n, &ls->ls_rsbtbl[i].toss);
813			dlm_free_rsb(rsb);
814		}
815	}
816
817	vfree(ls->ls_rsbtbl);
818
819	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
820		kfree(ls->ls_remove_names[i]);
821
822	while (!list_empty(&ls->ls_new_rsb)) {
823		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
824				       res_hashchain);
825		list_del(&rsb->res_hashchain);
826		dlm_free_rsb(rsb);
827	}
828
829	/*
830	 * Free structures on any other lists
831	 */
832
833	dlm_purge_requestqueue(ls);
834	kfree(ls->ls_recover_args);
835	dlm_clear_members(ls);
836	dlm_clear_members_gone(ls);
837	kfree(ls->ls_node_array);
838	log_rinfo(ls, "release_lockspace final free");
839	kobject_put(&ls->ls_kobj);
840	/* The ls structure will be freed when the kobject is done with */
841
 
 
 
 
842	module_put(THIS_MODULE);
843	return 0;
844}
845
846/*
847 * Called when a system has released all its locks and is not going to use the
848 * lockspace any longer.  We free everything we're managing for this lockspace.
849 * Remaining nodes will go through the recovery process as if we'd died.  The
850 * lockspace must continue to function as usual, participating in recoveries,
851 * until this returns.
852 *
853 * Force has 4 possible values:
854 * 0 - don't destroy locksapce if it has any LKBs
855 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
856 * 2 - destroy lockspace regardless of LKBs
857 * 3 - destroy lockspace as part of a forced shutdown
858 */
859
860int dlm_release_lockspace(void *lockspace, int force)
861{
862	struct dlm_ls *ls;
863	int error;
864
865	ls = dlm_find_lockspace_local(lockspace);
866	if (!ls)
867		return -EINVAL;
868	dlm_put_lockspace(ls);
869
870	mutex_lock(&ls_lock);
871	error = release_lockspace(ls, force);
872	if (!error)
873		ls_count--;
874	if (!ls_count)
875		threads_stop();
876	mutex_unlock(&ls_lock);
877
878	return error;
879}
880
881void dlm_stop_lockspaces(void)
882{
883	struct dlm_ls *ls;
884	int count;
885
886 restart:
887	count = 0;
888	spin_lock(&lslist_lock);
889	list_for_each_entry(ls, &lslist, ls_list) {
890		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
891			count++;
892			continue;
893		}
894		spin_unlock(&lslist_lock);
895		log_error(ls, "no userland control daemon, stopping lockspace");
896		dlm_ls_stop(ls);
897		goto restart;
898	}
899	spin_unlock(&lslist_lock);
900
901	if (count)
902		log_print("dlm user daemon left %d lockspaces", count);
903}
904