Linux Audio

Check our new training course

Loading...
v5.14.15
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  7**
  8**
  9*******************************************************************************
 10******************************************************************************/
 11
 12#include <linux/module.h>
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 19#include "midcomms.h"
 20#include "lowcomms.h"
 21#include "config.h"
 22#include "memory.h"
 23#include "lock.h"
 24#include "recover.h"
 25#include "requestqueue.h"
 26#include "user.h"
 27#include "ast.h"
 28
 29static int			ls_count;
 30static struct mutex		ls_lock;
 31static struct list_head		lslist;
 32static spinlock_t		lslist_lock;
 33static struct task_struct *	scand_task;
 34
 35
 36static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 37{
 38	ssize_t ret = len;
 39	int n;
 40	int rc = kstrtoint(buf, 0, &n);
 41
 42	if (rc)
 43		return rc;
 44	ls = dlm_find_lockspace_local(ls->ls_local_handle);
 45	if (!ls)
 46		return -EINVAL;
 47
 48	switch (n) {
 49	case 0:
 50		dlm_ls_stop(ls);
 51		break;
 52	case 1:
 53		dlm_ls_start(ls);
 54		break;
 55	default:
 56		ret = -EINVAL;
 57	}
 58	dlm_put_lockspace(ls);
 59	return ret;
 60}
 61
 62static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 63{
 64	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 65
 66	if (rc)
 67		return rc;
 68	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 69	wake_up(&ls->ls_uevent_wait);
 70	return len;
 71}
 72
 73static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 74{
 75	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 76}
 77
 78static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 79{
 80	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 81
 82	if (rc)
 83		return rc;
 84	return len;
 85}
 86
 87static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 88{
 89	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 90}
 91
 92static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 93{
 94	int val;
 95	int rc = kstrtoint(buf, 0, &val);
 96
 97	if (rc)
 98		return rc;
 99	if (val == 1)
100		set_bit(LSFL_NODIR, &ls->ls_flags);
101	return len;
102}
103
104static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105{
106	uint32_t status = dlm_recover_status(ls);
107	return snprintf(buf, PAGE_SIZE, "%x\n", status);
108}
109
110static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111{
112	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113}
114
115struct dlm_attr {
116	struct attribute attr;
117	ssize_t (*show)(struct dlm_ls *, char *);
118	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
119};
120
121static struct dlm_attr dlm_attr_control = {
122	.attr  = {.name = "control", .mode = S_IWUSR},
123	.store = dlm_control_store
124};
125
126static struct dlm_attr dlm_attr_event = {
127	.attr  = {.name = "event_done", .mode = S_IWUSR},
128	.store = dlm_event_store
129};
130
131static struct dlm_attr dlm_attr_id = {
132	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133	.show  = dlm_id_show,
134	.store = dlm_id_store
135};
136
137static struct dlm_attr dlm_attr_nodir = {
138	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139	.show  = dlm_nodir_show,
140	.store = dlm_nodir_store
141};
142
143static struct dlm_attr dlm_attr_recover_status = {
144	.attr  = {.name = "recover_status", .mode = S_IRUGO},
145	.show  = dlm_recover_status_show
146};
147
148static struct dlm_attr dlm_attr_recover_nodeid = {
149	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
150	.show  = dlm_recover_nodeid_show
151};
152
153static struct attribute *dlm_attrs[] = {
154	&dlm_attr_control.attr,
155	&dlm_attr_event.attr,
156	&dlm_attr_id.attr,
157	&dlm_attr_nodir.attr,
158	&dlm_attr_recover_status.attr,
159	&dlm_attr_recover_nodeid.attr,
160	NULL,
161};
162ATTRIBUTE_GROUPS(dlm);
163
164static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165			     char *buf)
166{
167	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169	return a->show ? a->show(ls, buf) : 0;
170}
171
172static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173			      const char *buf, size_t len)
174{
175	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177	return a->store ? a->store(ls, buf, len) : len;
178}
179
180static void lockspace_kobj_release(struct kobject *k)
181{
182	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183	kfree(ls);
184}
185
186static const struct sysfs_ops dlm_attr_ops = {
187	.show  = dlm_attr_show,
188	.store = dlm_attr_store,
189};
190
191static struct kobj_type dlm_ktype = {
192	.default_groups = dlm_groups,
193	.sysfs_ops     = &dlm_attr_ops,
194	.release       = lockspace_kobj_release,
195};
196
197static struct kset *dlm_kset;
198
199static int do_uevent(struct dlm_ls *ls, int in)
200{
 
 
201	if (in)
202		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203	else
204		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208	/* dlm_controld will see the uevent, do the necessary group management
209	   and then write to sysfs to wake us */
210
211	wait_event(ls->ls_uevent_wait,
212		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 
 
213
214	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
 
215
216	return ls->ls_uevent_result;
 
 
 
 
 
217}
218
219static int dlm_uevent(struct kset *kset, struct kobject *kobj,
220		      struct kobj_uevent_env *env)
221{
222	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
223
224	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
225	return 0;
226}
227
228static const struct kset_uevent_ops dlm_uevent_ops = {
229	.uevent = dlm_uevent,
230};
231
232int __init dlm_lockspace_init(void)
233{
234	ls_count = 0;
235	mutex_init(&ls_lock);
236	INIT_LIST_HEAD(&lslist);
237	spin_lock_init(&lslist_lock);
238
239	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
240	if (!dlm_kset) {
241		printk(KERN_WARNING "%s: can not create kset\n", __func__);
242		return -ENOMEM;
243	}
244	return 0;
245}
246
247void dlm_lockspace_exit(void)
248{
249	kset_unregister(dlm_kset);
250}
251
252static struct dlm_ls *find_ls_to_scan(void)
253{
254	struct dlm_ls *ls;
255
256	spin_lock(&lslist_lock);
257	list_for_each_entry(ls, &lslist, ls_list) {
258		if (time_after_eq(jiffies, ls->ls_scan_time +
259					    dlm_config.ci_scan_secs * HZ)) {
260			spin_unlock(&lslist_lock);
261			return ls;
262		}
263	}
264	spin_unlock(&lslist_lock);
265	return NULL;
266}
267
268static int dlm_scand(void *data)
269{
270	struct dlm_ls *ls;
271
272	while (!kthread_should_stop()) {
273		ls = find_ls_to_scan();
274		if (ls) {
275			if (dlm_lock_recovery_try(ls)) {
276				ls->ls_scan_time = jiffies;
277				dlm_scan_rsbs(ls);
278				dlm_scan_timeout(ls);
279				dlm_scan_waiters(ls);
280				dlm_unlock_recovery(ls);
281			} else {
282				ls->ls_scan_time += HZ;
283			}
284			continue;
285		}
286		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
287	}
288	return 0;
289}
290
291static int dlm_scand_start(void)
292{
293	struct task_struct *p;
294	int error = 0;
295
296	p = kthread_run(dlm_scand, NULL, "dlm_scand");
297	if (IS_ERR(p))
298		error = PTR_ERR(p);
299	else
300		scand_task = p;
301	return error;
302}
303
304static void dlm_scand_stop(void)
305{
306	kthread_stop(scand_task);
307}
308
309struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
310{
311	struct dlm_ls *ls;
312
313	spin_lock(&lslist_lock);
314
315	list_for_each_entry(ls, &lslist, ls_list) {
316		if (ls->ls_global_id == id) {
317			ls->ls_count++;
318			goto out;
319		}
320	}
321	ls = NULL;
322 out:
323	spin_unlock(&lslist_lock);
324	return ls;
325}
326
327struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
328{
329	struct dlm_ls *ls;
330
331	spin_lock(&lslist_lock);
332	list_for_each_entry(ls, &lslist, ls_list) {
333		if (ls->ls_local_handle == lockspace) {
334			ls->ls_count++;
335			goto out;
336		}
337	}
338	ls = NULL;
339 out:
340	spin_unlock(&lslist_lock);
341	return ls;
342}
343
344struct dlm_ls *dlm_find_lockspace_device(int minor)
345{
346	struct dlm_ls *ls;
347
348	spin_lock(&lslist_lock);
349	list_for_each_entry(ls, &lslist, ls_list) {
350		if (ls->ls_device.minor == minor) {
351			ls->ls_count++;
352			goto out;
353		}
354	}
355	ls = NULL;
356 out:
357	spin_unlock(&lslist_lock);
358	return ls;
359}
360
361void dlm_put_lockspace(struct dlm_ls *ls)
362{
363	spin_lock(&lslist_lock);
364	ls->ls_count--;
365	spin_unlock(&lslist_lock);
366}
367
368static void remove_lockspace(struct dlm_ls *ls)
369{
370	for (;;) {
371		spin_lock(&lslist_lock);
372		if (ls->ls_count == 0) {
373			WARN_ON(ls->ls_create_count != 0);
374			list_del(&ls->ls_list);
375			spin_unlock(&lslist_lock);
376			return;
377		}
378		spin_unlock(&lslist_lock);
379		ssleep(1);
380	}
381}
382
383static int threads_start(void)
384{
385	int error;
386
387	error = dlm_scand_start();
388	if (error) {
389		log_print("cannot start dlm_scand thread %d", error);
390		goto fail;
391	}
392
393	/* Thread for sending/receiving messages for all lockspace's */
394	error = dlm_midcomms_start();
395	if (error) {
396		log_print("cannot start dlm lowcomms %d", error);
397		goto scand_fail;
398	}
399
400	return 0;
401
402 scand_fail:
403	dlm_scand_stop();
404 fail:
405	return error;
406}
407
 
 
 
 
 
 
408static int new_lockspace(const char *name, const char *cluster,
409			 uint32_t flags, int lvblen,
410			 const struct dlm_lockspace_ops *ops, void *ops_arg,
411			 int *ops_result, dlm_lockspace_t **lockspace)
412{
413	struct dlm_ls *ls;
414	int i, size, error;
415	int do_unreg = 0;
416	int namelen = strlen(name);
417
418	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
419		return -EINVAL;
420
421	if (!lvblen || (lvblen % 8))
422		return -EINVAL;
423
424	if (!try_module_get(THIS_MODULE))
425		return -EINVAL;
426
427	if (!dlm_user_daemon_available()) {
428		log_print("dlm user daemon not available");
429		error = -EUNATCH;
430		goto out;
431	}
432
433	if (ops && ops_result) {
434	       	if (!dlm_config.ci_recover_callbacks)
435			*ops_result = -EOPNOTSUPP;
436		else
437			*ops_result = 0;
438	}
439
440	if (!cluster)
441		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
442			  dlm_config.ci_cluster_name);
443
444	if (dlm_config.ci_recover_callbacks && cluster &&
445	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
446		log_print("dlm cluster name '%s' does not match "
447			  "the application cluster name '%s'",
448			  dlm_config.ci_cluster_name, cluster);
449		error = -EBADR;
450		goto out;
451	}
452
453	error = 0;
454
455	spin_lock(&lslist_lock);
456	list_for_each_entry(ls, &lslist, ls_list) {
457		WARN_ON(ls->ls_create_count <= 0);
458		if (ls->ls_namelen != namelen)
459			continue;
460		if (memcmp(ls->ls_name, name, namelen))
461			continue;
462		if (flags & DLM_LSFL_NEWEXCL) {
463			error = -EEXIST;
464			break;
465		}
466		ls->ls_create_count++;
467		*lockspace = ls;
468		error = 1;
469		break;
470	}
471	spin_unlock(&lslist_lock);
472
473	if (error)
474		goto out;
475
476	error = -ENOMEM;
477
478	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
479	if (!ls)
480		goto out;
481	memcpy(ls->ls_name, name, namelen);
482	ls->ls_namelen = namelen;
483	ls->ls_lvblen = lvblen;
484	ls->ls_count = 0;
485	ls->ls_flags = 0;
486	ls->ls_scan_time = jiffies;
487
488	if (ops && dlm_config.ci_recover_callbacks) {
489		ls->ls_ops = ops;
490		ls->ls_ops_arg = ops_arg;
491	}
492
493	if (flags & DLM_LSFL_TIMEWARN)
494		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
495
496	/* ls_exflags are forced to match among nodes, and we don't
497	   need to require all nodes to have some flags set */
498	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
499				    DLM_LSFL_NEWEXCL));
500
501	size = dlm_config.ci_rsbtbl_size;
502	ls->ls_rsbtbl_size = size;
503
504	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
505	if (!ls->ls_rsbtbl)
506		goto out_lsfree;
507	for (i = 0; i < size; i++) {
508		ls->ls_rsbtbl[i].keep.rb_node = NULL;
509		ls->ls_rsbtbl[i].toss.rb_node = NULL;
510		spin_lock_init(&ls->ls_rsbtbl[i].lock);
511	}
512
513	spin_lock_init(&ls->ls_remove_spin);
514
515	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
516		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
517						 GFP_KERNEL);
518		if (!ls->ls_remove_names[i])
519			goto out_rsbtbl;
520	}
521
522	idr_init(&ls->ls_lkbidr);
523	spin_lock_init(&ls->ls_lkbidr_spin);
524
525	INIT_LIST_HEAD(&ls->ls_waiters);
526	mutex_init(&ls->ls_waiters_mutex);
527	INIT_LIST_HEAD(&ls->ls_orphans);
528	mutex_init(&ls->ls_orphans_mutex);
529	INIT_LIST_HEAD(&ls->ls_timeout);
530	mutex_init(&ls->ls_timeout_mutex);
531
532	INIT_LIST_HEAD(&ls->ls_new_rsb);
533	spin_lock_init(&ls->ls_new_rsb_spin);
534
535	INIT_LIST_HEAD(&ls->ls_nodes);
536	INIT_LIST_HEAD(&ls->ls_nodes_gone);
537	ls->ls_num_nodes = 0;
538	ls->ls_low_nodeid = 0;
539	ls->ls_total_weight = 0;
540	ls->ls_node_array = NULL;
541
542	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
543	ls->ls_stub_rsb.res_ls = ls;
544
545	ls->ls_debug_rsb_dentry = NULL;
546	ls->ls_debug_waiters_dentry = NULL;
547
548	init_waitqueue_head(&ls->ls_uevent_wait);
549	ls->ls_uevent_result = 0;
550	init_completion(&ls->ls_members_done);
551	ls->ls_members_result = -1;
552
553	mutex_init(&ls->ls_cb_mutex);
554	INIT_LIST_HEAD(&ls->ls_cb_delay);
555
556	ls->ls_recoverd_task = NULL;
557	mutex_init(&ls->ls_recoverd_active);
558	spin_lock_init(&ls->ls_recover_lock);
559	spin_lock_init(&ls->ls_rcom_spin);
560	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
561	ls->ls_recover_status = 0;
562	ls->ls_recover_seq = 0;
563	ls->ls_recover_args = NULL;
564	init_rwsem(&ls->ls_in_recovery);
565	init_rwsem(&ls->ls_recv_active);
566	INIT_LIST_HEAD(&ls->ls_requestqueue);
567	mutex_init(&ls->ls_requestqueue_mutex);
568	mutex_init(&ls->ls_clear_proc_locks);
569
570	/* Due backwards compatibility with 3.1 we need to use maximum
571	 * possible dlm message size to be sure the message will fit and
572	 * not having out of bounds issues. However on sending side 3.2
573	 * might send less.
574	 */
575	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
576	if (!ls->ls_recover_buf)
577		goto out_lkbidr;
578
579	ls->ls_slot = 0;
580	ls->ls_num_slots = 0;
581	ls->ls_slots_size = 0;
582	ls->ls_slots = NULL;
583
584	INIT_LIST_HEAD(&ls->ls_recover_list);
585	spin_lock_init(&ls->ls_recover_list_lock);
586	idr_init(&ls->ls_recover_idr);
587	spin_lock_init(&ls->ls_recover_idr_lock);
588	ls->ls_recover_list_count = 0;
589	ls->ls_local_handle = ls;
590	init_waitqueue_head(&ls->ls_wait_general);
591	INIT_LIST_HEAD(&ls->ls_root_list);
592	init_rwsem(&ls->ls_root_sem);
593
594	spin_lock(&lslist_lock);
595	ls->ls_create_count = 1;
596	list_add(&ls->ls_list, &lslist);
597	spin_unlock(&lslist_lock);
598
599	if (flags & DLM_LSFL_FS) {
600		error = dlm_callback_start(ls);
601		if (error) {
602			log_error(ls, "can't start dlm_callback %d", error);
603			goto out_delist;
604		}
605	}
606
607	init_waitqueue_head(&ls->ls_recover_lock_wait);
608
609	/*
610	 * Once started, dlm_recoverd first looks for ls in lslist, then
611	 * initializes ls_in_recovery as locked in "down" mode.  We need
612	 * to wait for the wakeup from dlm_recoverd because in_recovery
613	 * has to start out in down mode.
614	 */
615
616	error = dlm_recoverd_start(ls);
617	if (error) {
618		log_error(ls, "can't start dlm_recoverd %d", error);
619		goto out_callback;
620	}
621
622	wait_event(ls->ls_recover_lock_wait,
623		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
624
625	/* let kobject handle freeing of ls if there's an error */
626	do_unreg = 1;
627
628	ls->ls_kobj.kset = dlm_kset;
629	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
630				     "%s", ls->ls_name);
631	if (error)
632		goto out_recoverd;
633	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
634
 
 
 
635	/* This uevent triggers dlm_controld in userspace to add us to the
636	   group of nodes that are members of this lockspace (managed by the
637	   cluster infrastructure.)  Once it's done that, it tells us who the
638	   current lockspace members are (via configfs) and then tells the
639	   lockspace to start running (via sysfs) in dlm_ls_start(). */
640
641	error = do_uevent(ls, 1);
642	if (error)
643		goto out_recoverd;
644
645	wait_for_completion(&ls->ls_members_done);
646	error = ls->ls_members_result;
647	if (error)
648		goto out_members;
649
650	dlm_create_debug_file(ls);
651
652	log_rinfo(ls, "join complete");
653	*lockspace = ls;
654	return 0;
655
656 out_members:
657	do_uevent(ls, 0);
658	dlm_clear_members(ls);
659	kfree(ls->ls_node_array);
660 out_recoverd:
661	dlm_recoverd_stop(ls);
662 out_callback:
663	dlm_callback_stop(ls);
664 out_delist:
665	spin_lock(&lslist_lock);
666	list_del(&ls->ls_list);
667	spin_unlock(&lslist_lock);
668	idr_destroy(&ls->ls_recover_idr);
669	kfree(ls->ls_recover_buf);
670 out_lkbidr:
671	idr_destroy(&ls->ls_lkbidr);
672 out_rsbtbl:
673	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
674		kfree(ls->ls_remove_names[i]);
675	vfree(ls->ls_rsbtbl);
676 out_lsfree:
677	if (do_unreg)
678		kobject_put(&ls->ls_kobj);
679	else
680		kfree(ls);
681 out:
682	module_put(THIS_MODULE);
683	return error;
684}
685
686int dlm_new_lockspace(const char *name, const char *cluster,
687		      uint32_t flags, int lvblen,
688		      const struct dlm_lockspace_ops *ops, void *ops_arg,
689		      int *ops_result, dlm_lockspace_t **lockspace)
690{
691	int error = 0;
692
693	mutex_lock(&ls_lock);
694	if (!ls_count)
695		error = threads_start();
696	if (error)
697		goto out;
698
699	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
700			      ops_result, lockspace);
701	if (!error)
702		ls_count++;
703	if (error > 0)
704		error = 0;
705	if (!ls_count) {
706		dlm_scand_stop();
707		dlm_midcomms_shutdown();
708		dlm_lowcomms_stop();
709	}
710 out:
711	mutex_unlock(&ls_lock);
712	return error;
713}
714
715static int lkb_idr_is_local(int id, void *p, void *data)
716{
717	struct dlm_lkb *lkb = p;
718
719	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
720}
721
722static int lkb_idr_is_any(int id, void *p, void *data)
723{
724	return 1;
725}
726
727static int lkb_idr_free(int id, void *p, void *data)
728{
729	struct dlm_lkb *lkb = p;
730
731	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
732		dlm_free_lvb(lkb->lkb_lvbptr);
733
734	dlm_free_lkb(lkb);
735	return 0;
736}
737
738/* NOTE: We check the lkbidr here rather than the resource table.
739   This is because there may be LKBs queued as ASTs that have been unlinked
740   from their RSBs and are pending deletion once the AST has been delivered */
741
742static int lockspace_busy(struct dlm_ls *ls, int force)
743{
744	int rv;
745
746	spin_lock(&ls->ls_lkbidr_spin);
747	if (force == 0) {
748		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
749	} else if (force == 1) {
750		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
751	} else {
752		rv = 0;
753	}
754	spin_unlock(&ls->ls_lkbidr_spin);
755	return rv;
756}
757
758static int release_lockspace(struct dlm_ls *ls, int force)
759{
760	struct dlm_rsb *rsb;
761	struct rb_node *n;
762	int i, busy, rv;
763
764	busy = lockspace_busy(ls, force);
765
766	spin_lock(&lslist_lock);
767	if (ls->ls_create_count == 1) {
768		if (busy) {
769			rv = -EBUSY;
770		} else {
771			/* remove_lockspace takes ls off lslist */
772			ls->ls_create_count = 0;
773			rv = 0;
774		}
775	} else if (ls->ls_create_count > 1) {
776		rv = --ls->ls_create_count;
777	} else {
778		rv = -EINVAL;
779	}
780	spin_unlock(&lslist_lock);
781
782	if (rv) {
783		log_debug(ls, "release_lockspace no remove %d", rv);
784		return rv;
785	}
786
787	dlm_device_deregister(ls);
788
789	if (force < 3 && dlm_user_daemon_available())
790		do_uevent(ls, 0);
791
792	dlm_recoverd_stop(ls);
793
794	if (ls_count == 1) {
795		dlm_scand_stop();
796		dlm_midcomms_shutdown();
797	}
798
799	dlm_callback_stop(ls);
800
801	remove_lockspace(ls);
802
803	dlm_delete_debug_file(ls);
804
805	idr_destroy(&ls->ls_recover_idr);
806	kfree(ls->ls_recover_buf);
807
808	/*
809	 * Free all lkb's in idr
810	 */
811
812	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
813	idr_destroy(&ls->ls_lkbidr);
814
815	/*
816	 * Free all rsb's on rsbtbl[] lists
817	 */
818
819	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
820		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
821			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
822			rb_erase(n, &ls->ls_rsbtbl[i].keep);
823			dlm_free_rsb(rsb);
824		}
825
826		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
827			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
828			rb_erase(n, &ls->ls_rsbtbl[i].toss);
829			dlm_free_rsb(rsb);
830		}
831	}
832
833	vfree(ls->ls_rsbtbl);
834
835	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
836		kfree(ls->ls_remove_names[i]);
837
838	while (!list_empty(&ls->ls_new_rsb)) {
839		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
840				       res_hashchain);
841		list_del(&rsb->res_hashchain);
842		dlm_free_rsb(rsb);
843	}
844
845	/*
846	 * Free structures on any other lists
847	 */
848
849	dlm_purge_requestqueue(ls);
850	kfree(ls->ls_recover_args);
851	dlm_clear_members(ls);
852	dlm_clear_members_gone(ls);
853	kfree(ls->ls_node_array);
854	log_rinfo(ls, "release_lockspace final free");
855	kobject_put(&ls->ls_kobj);
856	/* The ls structure will be freed when the kobject is done with */
857
858	module_put(THIS_MODULE);
859	return 0;
860}
861
862/*
863 * Called when a system has released all its locks and is not going to use the
864 * lockspace any longer.  We free everything we're managing for this lockspace.
865 * Remaining nodes will go through the recovery process as if we'd died.  The
866 * lockspace must continue to function as usual, participating in recoveries,
867 * until this returns.
868 *
869 * Force has 4 possible values:
870 * 0 - don't destroy locksapce if it has any LKBs
871 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
872 * 2 - destroy lockspace regardless of LKBs
873 * 3 - destroy lockspace as part of a forced shutdown
874 */
875
876int dlm_release_lockspace(void *lockspace, int force)
877{
878	struct dlm_ls *ls;
879	int error;
880
881	ls = dlm_find_lockspace_local(lockspace);
882	if (!ls)
883		return -EINVAL;
884	dlm_put_lockspace(ls);
885
886	mutex_lock(&ls_lock);
887	error = release_lockspace(ls, force);
888	if (!error)
889		ls_count--;
890	if (!ls_count)
891		dlm_lowcomms_stop();
892	mutex_unlock(&ls_lock);
893
894	return error;
895}
896
897void dlm_stop_lockspaces(void)
898{
899	struct dlm_ls *ls;
900	int count;
901
902 restart:
903	count = 0;
904	spin_lock(&lslist_lock);
905	list_for_each_entry(ls, &lslist, ls_list) {
906		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
907			count++;
908			continue;
909		}
910		spin_unlock(&lslist_lock);
911		log_error(ls, "no userland control daemon, stopping lockspace");
912		dlm_ls_stop(ls);
913		goto restart;
914	}
915	spin_unlock(&lslist_lock);
916
917	if (count)
918		log_print("dlm user daemon left %d lockspaces", count);
919}
920
v5.4
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  7**
  8**
  9*******************************************************************************
 10******************************************************************************/
 11
 12#include <linux/module.h>
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 
 19#include "lowcomms.h"
 20#include "config.h"
 21#include "memory.h"
 22#include "lock.h"
 23#include "recover.h"
 24#include "requestqueue.h"
 25#include "user.h"
 26#include "ast.h"
 27
 28static int			ls_count;
 29static struct mutex		ls_lock;
 30static struct list_head		lslist;
 31static spinlock_t		lslist_lock;
 32static struct task_struct *	scand_task;
 33
 34
 35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 36{
 37	ssize_t ret = len;
 38	int n;
 39	int rc = kstrtoint(buf, 0, &n);
 40
 41	if (rc)
 42		return rc;
 43	ls = dlm_find_lockspace_local(ls->ls_local_handle);
 44	if (!ls)
 45		return -EINVAL;
 46
 47	switch (n) {
 48	case 0:
 49		dlm_ls_stop(ls);
 50		break;
 51	case 1:
 52		dlm_ls_start(ls);
 53		break;
 54	default:
 55		ret = -EINVAL;
 56	}
 57	dlm_put_lockspace(ls);
 58	return ret;
 59}
 60
 61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 62{
 63	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 64
 65	if (rc)
 66		return rc;
 67	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 68	wake_up(&ls->ls_uevent_wait);
 69	return len;
 70}
 71
 72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 73{
 74	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 75}
 76
 77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 78{
 79	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 80
 81	if (rc)
 82		return rc;
 83	return len;
 84}
 85
 86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 87{
 88	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 89}
 90
 91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 92{
 93	int val;
 94	int rc = kstrtoint(buf, 0, &val);
 95
 96	if (rc)
 97		return rc;
 98	if (val == 1)
 99		set_bit(LSFL_NODIR, &ls->ls_flags);
100	return len;
101}
102
103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104{
105	uint32_t status = dlm_recover_status(ls);
106	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107}
108
109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110{
111	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112}
113
114struct dlm_attr {
115	struct attribute attr;
116	ssize_t (*show)(struct dlm_ls *, char *);
117	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118};
119
120static struct dlm_attr dlm_attr_control = {
121	.attr  = {.name = "control", .mode = S_IWUSR},
122	.store = dlm_control_store
123};
124
125static struct dlm_attr dlm_attr_event = {
126	.attr  = {.name = "event_done", .mode = S_IWUSR},
127	.store = dlm_event_store
128};
129
130static struct dlm_attr dlm_attr_id = {
131	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132	.show  = dlm_id_show,
133	.store = dlm_id_store
134};
135
136static struct dlm_attr dlm_attr_nodir = {
137	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138	.show  = dlm_nodir_show,
139	.store = dlm_nodir_store
140};
141
142static struct dlm_attr dlm_attr_recover_status = {
143	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144	.show  = dlm_recover_status_show
145};
146
147static struct dlm_attr dlm_attr_recover_nodeid = {
148	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149	.show  = dlm_recover_nodeid_show
150};
151
152static struct attribute *dlm_attrs[] = {
153	&dlm_attr_control.attr,
154	&dlm_attr_event.attr,
155	&dlm_attr_id.attr,
156	&dlm_attr_nodir.attr,
157	&dlm_attr_recover_status.attr,
158	&dlm_attr_recover_nodeid.attr,
159	NULL,
160};
161ATTRIBUTE_GROUPS(dlm);
162
163static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164			     char *buf)
165{
166	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168	return a->show ? a->show(ls, buf) : 0;
169}
170
171static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172			      const char *buf, size_t len)
173{
174	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176	return a->store ? a->store(ls, buf, len) : len;
177}
178
179static void lockspace_kobj_release(struct kobject *k)
180{
181	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182	kfree(ls);
183}
184
185static const struct sysfs_ops dlm_attr_ops = {
186	.show  = dlm_attr_show,
187	.store = dlm_attr_store,
188};
189
190static struct kobj_type dlm_ktype = {
191	.default_groups = dlm_groups,
192	.sysfs_ops     = &dlm_attr_ops,
193	.release       = lockspace_kobj_release,
194};
195
196static struct kset *dlm_kset;
197
198static int do_uevent(struct dlm_ls *ls, int in)
199{
200	int error;
201
202	if (in)
203		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
204	else
205		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
206
207	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
208
209	/* dlm_controld will see the uevent, do the necessary group management
210	   and then write to sysfs to wake us */
211
212	error = wait_event_interruptible(ls->ls_uevent_wait,
213			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
214
215	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
216
217	if (error)
218		goto out;
219
220	error = ls->ls_uevent_result;
221 out:
222	if (error)
223		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
224			  error, ls->ls_uevent_result);
225	return error;
226}
227
228static int dlm_uevent(struct kset *kset, struct kobject *kobj,
229		      struct kobj_uevent_env *env)
230{
231	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
232
233	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
234	return 0;
235}
236
237static const struct kset_uevent_ops dlm_uevent_ops = {
238	.uevent = dlm_uevent,
239};
240
241int __init dlm_lockspace_init(void)
242{
243	ls_count = 0;
244	mutex_init(&ls_lock);
245	INIT_LIST_HEAD(&lslist);
246	spin_lock_init(&lslist_lock);
247
248	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
249	if (!dlm_kset) {
250		printk(KERN_WARNING "%s: can not create kset\n", __func__);
251		return -ENOMEM;
252	}
253	return 0;
254}
255
256void dlm_lockspace_exit(void)
257{
258	kset_unregister(dlm_kset);
259}
260
261static struct dlm_ls *find_ls_to_scan(void)
262{
263	struct dlm_ls *ls;
264
265	spin_lock(&lslist_lock);
266	list_for_each_entry(ls, &lslist, ls_list) {
267		if (time_after_eq(jiffies, ls->ls_scan_time +
268					    dlm_config.ci_scan_secs * HZ)) {
269			spin_unlock(&lslist_lock);
270			return ls;
271		}
272	}
273	spin_unlock(&lslist_lock);
274	return NULL;
275}
276
277static int dlm_scand(void *data)
278{
279	struct dlm_ls *ls;
280
281	while (!kthread_should_stop()) {
282		ls = find_ls_to_scan();
283		if (ls) {
284			if (dlm_lock_recovery_try(ls)) {
285				ls->ls_scan_time = jiffies;
286				dlm_scan_rsbs(ls);
287				dlm_scan_timeout(ls);
288				dlm_scan_waiters(ls);
289				dlm_unlock_recovery(ls);
290			} else {
291				ls->ls_scan_time += HZ;
292			}
293			continue;
294		}
295		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
296	}
297	return 0;
298}
299
300static int dlm_scand_start(void)
301{
302	struct task_struct *p;
303	int error = 0;
304
305	p = kthread_run(dlm_scand, NULL, "dlm_scand");
306	if (IS_ERR(p))
307		error = PTR_ERR(p);
308	else
309		scand_task = p;
310	return error;
311}
312
313static void dlm_scand_stop(void)
314{
315	kthread_stop(scand_task);
316}
317
318struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
319{
320	struct dlm_ls *ls;
321
322	spin_lock(&lslist_lock);
323
324	list_for_each_entry(ls, &lslist, ls_list) {
325		if (ls->ls_global_id == id) {
326			ls->ls_count++;
327			goto out;
328		}
329	}
330	ls = NULL;
331 out:
332	spin_unlock(&lslist_lock);
333	return ls;
334}
335
336struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
337{
338	struct dlm_ls *ls;
339
340	spin_lock(&lslist_lock);
341	list_for_each_entry(ls, &lslist, ls_list) {
342		if (ls->ls_local_handle == lockspace) {
343			ls->ls_count++;
344			goto out;
345		}
346	}
347	ls = NULL;
348 out:
349	spin_unlock(&lslist_lock);
350	return ls;
351}
352
353struct dlm_ls *dlm_find_lockspace_device(int minor)
354{
355	struct dlm_ls *ls;
356
357	spin_lock(&lslist_lock);
358	list_for_each_entry(ls, &lslist, ls_list) {
359		if (ls->ls_device.minor == minor) {
360			ls->ls_count++;
361			goto out;
362		}
363	}
364	ls = NULL;
365 out:
366	spin_unlock(&lslist_lock);
367	return ls;
368}
369
370void dlm_put_lockspace(struct dlm_ls *ls)
371{
372	spin_lock(&lslist_lock);
373	ls->ls_count--;
374	spin_unlock(&lslist_lock);
375}
376
377static void remove_lockspace(struct dlm_ls *ls)
378{
379	for (;;) {
380		spin_lock(&lslist_lock);
381		if (ls->ls_count == 0) {
382			WARN_ON(ls->ls_create_count != 0);
383			list_del(&ls->ls_list);
384			spin_unlock(&lslist_lock);
385			return;
386		}
387		spin_unlock(&lslist_lock);
388		ssleep(1);
389	}
390}
391
392static int threads_start(void)
393{
394	int error;
395
396	error = dlm_scand_start();
397	if (error) {
398		log_print("cannot start dlm_scand thread %d", error);
399		goto fail;
400	}
401
402	/* Thread for sending/receiving messages for all lockspace's */
403	error = dlm_lowcomms_start();
404	if (error) {
405		log_print("cannot start dlm lowcomms %d", error);
406		goto scand_fail;
407	}
408
409	return 0;
410
411 scand_fail:
412	dlm_scand_stop();
413 fail:
414	return error;
415}
416
417static void threads_stop(void)
418{
419	dlm_scand_stop();
420	dlm_lowcomms_stop();
421}
422
423static int new_lockspace(const char *name, const char *cluster,
424			 uint32_t flags, int lvblen,
425			 const struct dlm_lockspace_ops *ops, void *ops_arg,
426			 int *ops_result, dlm_lockspace_t **lockspace)
427{
428	struct dlm_ls *ls;
429	int i, size, error;
430	int do_unreg = 0;
431	int namelen = strlen(name);
432
433	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
434		return -EINVAL;
435
436	if (!lvblen || (lvblen % 8))
437		return -EINVAL;
438
439	if (!try_module_get(THIS_MODULE))
440		return -EINVAL;
441
442	if (!dlm_user_daemon_available()) {
443		log_print("dlm user daemon not available");
444		error = -EUNATCH;
445		goto out;
446	}
447
448	if (ops && ops_result) {
449	       	if (!dlm_config.ci_recover_callbacks)
450			*ops_result = -EOPNOTSUPP;
451		else
452			*ops_result = 0;
453	}
454
455	if (!cluster)
456		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
457			  dlm_config.ci_cluster_name);
458
459	if (dlm_config.ci_recover_callbacks && cluster &&
460	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
461		log_print("dlm cluster name '%s' does not match "
462			  "the application cluster name '%s'",
463			  dlm_config.ci_cluster_name, cluster);
464		error = -EBADR;
465		goto out;
466	}
467
468	error = 0;
469
470	spin_lock(&lslist_lock);
471	list_for_each_entry(ls, &lslist, ls_list) {
472		WARN_ON(ls->ls_create_count <= 0);
473		if (ls->ls_namelen != namelen)
474			continue;
475		if (memcmp(ls->ls_name, name, namelen))
476			continue;
477		if (flags & DLM_LSFL_NEWEXCL) {
478			error = -EEXIST;
479			break;
480		}
481		ls->ls_create_count++;
482		*lockspace = ls;
483		error = 1;
484		break;
485	}
486	spin_unlock(&lslist_lock);
487
488	if (error)
489		goto out;
490
491	error = -ENOMEM;
492
493	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
494	if (!ls)
495		goto out;
496	memcpy(ls->ls_name, name, namelen);
497	ls->ls_namelen = namelen;
498	ls->ls_lvblen = lvblen;
499	ls->ls_count = 0;
500	ls->ls_flags = 0;
501	ls->ls_scan_time = jiffies;
502
503	if (ops && dlm_config.ci_recover_callbacks) {
504		ls->ls_ops = ops;
505		ls->ls_ops_arg = ops_arg;
506	}
507
508	if (flags & DLM_LSFL_TIMEWARN)
509		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
510
511	/* ls_exflags are forced to match among nodes, and we don't
512	   need to require all nodes to have some flags set */
513	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
514				    DLM_LSFL_NEWEXCL));
515
516	size = dlm_config.ci_rsbtbl_size;
517	ls->ls_rsbtbl_size = size;
518
519	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
520	if (!ls->ls_rsbtbl)
521		goto out_lsfree;
522	for (i = 0; i < size; i++) {
523		ls->ls_rsbtbl[i].keep.rb_node = NULL;
524		ls->ls_rsbtbl[i].toss.rb_node = NULL;
525		spin_lock_init(&ls->ls_rsbtbl[i].lock);
526	}
527
528	spin_lock_init(&ls->ls_remove_spin);
529
530	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
531		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
532						 GFP_KERNEL);
533		if (!ls->ls_remove_names[i])
534			goto out_rsbtbl;
535	}
536
537	idr_init(&ls->ls_lkbidr);
538	spin_lock_init(&ls->ls_lkbidr_spin);
539
540	INIT_LIST_HEAD(&ls->ls_waiters);
541	mutex_init(&ls->ls_waiters_mutex);
542	INIT_LIST_HEAD(&ls->ls_orphans);
543	mutex_init(&ls->ls_orphans_mutex);
544	INIT_LIST_HEAD(&ls->ls_timeout);
545	mutex_init(&ls->ls_timeout_mutex);
546
547	INIT_LIST_HEAD(&ls->ls_new_rsb);
548	spin_lock_init(&ls->ls_new_rsb_spin);
549
550	INIT_LIST_HEAD(&ls->ls_nodes);
551	INIT_LIST_HEAD(&ls->ls_nodes_gone);
552	ls->ls_num_nodes = 0;
553	ls->ls_low_nodeid = 0;
554	ls->ls_total_weight = 0;
555	ls->ls_node_array = NULL;
556
557	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
558	ls->ls_stub_rsb.res_ls = ls;
559
560	ls->ls_debug_rsb_dentry = NULL;
561	ls->ls_debug_waiters_dentry = NULL;
562
563	init_waitqueue_head(&ls->ls_uevent_wait);
564	ls->ls_uevent_result = 0;
565	init_completion(&ls->ls_members_done);
566	ls->ls_members_result = -1;
567
568	mutex_init(&ls->ls_cb_mutex);
569	INIT_LIST_HEAD(&ls->ls_cb_delay);
570
571	ls->ls_recoverd_task = NULL;
572	mutex_init(&ls->ls_recoverd_active);
573	spin_lock_init(&ls->ls_recover_lock);
574	spin_lock_init(&ls->ls_rcom_spin);
575	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
576	ls->ls_recover_status = 0;
577	ls->ls_recover_seq = 0;
578	ls->ls_recover_args = NULL;
579	init_rwsem(&ls->ls_in_recovery);
580	init_rwsem(&ls->ls_recv_active);
581	INIT_LIST_HEAD(&ls->ls_requestqueue);
582	mutex_init(&ls->ls_requestqueue_mutex);
583	mutex_init(&ls->ls_clear_proc_locks);
584
585	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 
 
 
 
 
586	if (!ls->ls_recover_buf)
587		goto out_lkbidr;
588
589	ls->ls_slot = 0;
590	ls->ls_num_slots = 0;
591	ls->ls_slots_size = 0;
592	ls->ls_slots = NULL;
593
594	INIT_LIST_HEAD(&ls->ls_recover_list);
595	spin_lock_init(&ls->ls_recover_list_lock);
596	idr_init(&ls->ls_recover_idr);
597	spin_lock_init(&ls->ls_recover_idr_lock);
598	ls->ls_recover_list_count = 0;
599	ls->ls_local_handle = ls;
600	init_waitqueue_head(&ls->ls_wait_general);
601	INIT_LIST_HEAD(&ls->ls_root_list);
602	init_rwsem(&ls->ls_root_sem);
603
604	spin_lock(&lslist_lock);
605	ls->ls_create_count = 1;
606	list_add(&ls->ls_list, &lslist);
607	spin_unlock(&lslist_lock);
608
609	if (flags & DLM_LSFL_FS) {
610		error = dlm_callback_start(ls);
611		if (error) {
612			log_error(ls, "can't start dlm_callback %d", error);
613			goto out_delist;
614		}
615	}
616
617	init_waitqueue_head(&ls->ls_recover_lock_wait);
618
619	/*
620	 * Once started, dlm_recoverd first looks for ls in lslist, then
621	 * initializes ls_in_recovery as locked in "down" mode.  We need
622	 * to wait for the wakeup from dlm_recoverd because in_recovery
623	 * has to start out in down mode.
624	 */
625
626	error = dlm_recoverd_start(ls);
627	if (error) {
628		log_error(ls, "can't start dlm_recoverd %d", error);
629		goto out_callback;
630	}
631
632	wait_event(ls->ls_recover_lock_wait,
633		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
634
 
 
 
635	ls->ls_kobj.kset = dlm_kset;
636	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
637				     "%s", ls->ls_name);
638	if (error)
639		goto out_recoverd;
640	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
641
642	/* let kobject handle freeing of ls if there's an error */
643	do_unreg = 1;
644
645	/* This uevent triggers dlm_controld in userspace to add us to the
646	   group of nodes that are members of this lockspace (managed by the
647	   cluster infrastructure.)  Once it's done that, it tells us who the
648	   current lockspace members are (via configfs) and then tells the
649	   lockspace to start running (via sysfs) in dlm_ls_start(). */
650
651	error = do_uevent(ls, 1);
652	if (error)
653		goto out_recoverd;
654
655	wait_for_completion(&ls->ls_members_done);
656	error = ls->ls_members_result;
657	if (error)
658		goto out_members;
659
660	dlm_create_debug_file(ls);
661
662	log_rinfo(ls, "join complete");
663	*lockspace = ls;
664	return 0;
665
666 out_members:
667	do_uevent(ls, 0);
668	dlm_clear_members(ls);
669	kfree(ls->ls_node_array);
670 out_recoverd:
671	dlm_recoverd_stop(ls);
672 out_callback:
673	dlm_callback_stop(ls);
674 out_delist:
675	spin_lock(&lslist_lock);
676	list_del(&ls->ls_list);
677	spin_unlock(&lslist_lock);
678	idr_destroy(&ls->ls_recover_idr);
679	kfree(ls->ls_recover_buf);
680 out_lkbidr:
681	idr_destroy(&ls->ls_lkbidr);
682 out_rsbtbl:
683	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
684		kfree(ls->ls_remove_names[i]);
685	vfree(ls->ls_rsbtbl);
686 out_lsfree:
687	if (do_unreg)
688		kobject_put(&ls->ls_kobj);
689	else
690		kfree(ls);
691 out:
692	module_put(THIS_MODULE);
693	return error;
694}
695
696int dlm_new_lockspace(const char *name, const char *cluster,
697		      uint32_t flags, int lvblen,
698		      const struct dlm_lockspace_ops *ops, void *ops_arg,
699		      int *ops_result, dlm_lockspace_t **lockspace)
700{
701	int error = 0;
702
703	mutex_lock(&ls_lock);
704	if (!ls_count)
705		error = threads_start();
706	if (error)
707		goto out;
708
709	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
710			      ops_result, lockspace);
711	if (!error)
712		ls_count++;
713	if (error > 0)
714		error = 0;
715	if (!ls_count)
716		threads_stop();
 
 
 
717 out:
718	mutex_unlock(&ls_lock);
719	return error;
720}
721
722static int lkb_idr_is_local(int id, void *p, void *data)
723{
724	struct dlm_lkb *lkb = p;
725
726	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
727}
728
729static int lkb_idr_is_any(int id, void *p, void *data)
730{
731	return 1;
732}
733
734static int lkb_idr_free(int id, void *p, void *data)
735{
736	struct dlm_lkb *lkb = p;
737
738	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
739		dlm_free_lvb(lkb->lkb_lvbptr);
740
741	dlm_free_lkb(lkb);
742	return 0;
743}
744
745/* NOTE: We check the lkbidr here rather than the resource table.
746   This is because there may be LKBs queued as ASTs that have been unlinked
747   from their RSBs and are pending deletion once the AST has been delivered */
748
749static int lockspace_busy(struct dlm_ls *ls, int force)
750{
751	int rv;
752
753	spin_lock(&ls->ls_lkbidr_spin);
754	if (force == 0) {
755		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
756	} else if (force == 1) {
757		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
758	} else {
759		rv = 0;
760	}
761	spin_unlock(&ls->ls_lkbidr_spin);
762	return rv;
763}
764
765static int release_lockspace(struct dlm_ls *ls, int force)
766{
767	struct dlm_rsb *rsb;
768	struct rb_node *n;
769	int i, busy, rv;
770
771	busy = lockspace_busy(ls, force);
772
773	spin_lock(&lslist_lock);
774	if (ls->ls_create_count == 1) {
775		if (busy) {
776			rv = -EBUSY;
777		} else {
778			/* remove_lockspace takes ls off lslist */
779			ls->ls_create_count = 0;
780			rv = 0;
781		}
782	} else if (ls->ls_create_count > 1) {
783		rv = --ls->ls_create_count;
784	} else {
785		rv = -EINVAL;
786	}
787	spin_unlock(&lslist_lock);
788
789	if (rv) {
790		log_debug(ls, "release_lockspace no remove %d", rv);
791		return rv;
792	}
793
794	dlm_device_deregister(ls);
795
796	if (force < 3 && dlm_user_daemon_available())
797		do_uevent(ls, 0);
798
799	dlm_recoverd_stop(ls);
800
 
 
 
 
 
801	dlm_callback_stop(ls);
802
803	remove_lockspace(ls);
804
805	dlm_delete_debug_file(ls);
806
807	idr_destroy(&ls->ls_recover_idr);
808	kfree(ls->ls_recover_buf);
809
810	/*
811	 * Free all lkb's in idr
812	 */
813
814	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
815	idr_destroy(&ls->ls_lkbidr);
816
817	/*
818	 * Free all rsb's on rsbtbl[] lists
819	 */
820
821	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
822		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
823			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
824			rb_erase(n, &ls->ls_rsbtbl[i].keep);
825			dlm_free_rsb(rsb);
826		}
827
828		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
829			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
830			rb_erase(n, &ls->ls_rsbtbl[i].toss);
831			dlm_free_rsb(rsb);
832		}
833	}
834
835	vfree(ls->ls_rsbtbl);
836
837	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
838		kfree(ls->ls_remove_names[i]);
839
840	while (!list_empty(&ls->ls_new_rsb)) {
841		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
842				       res_hashchain);
843		list_del(&rsb->res_hashchain);
844		dlm_free_rsb(rsb);
845	}
846
847	/*
848	 * Free structures on any other lists
849	 */
850
851	dlm_purge_requestqueue(ls);
852	kfree(ls->ls_recover_args);
853	dlm_clear_members(ls);
854	dlm_clear_members_gone(ls);
855	kfree(ls->ls_node_array);
856	log_rinfo(ls, "release_lockspace final free");
857	kobject_put(&ls->ls_kobj);
858	/* The ls structure will be freed when the kobject is done with */
859
860	module_put(THIS_MODULE);
861	return 0;
862}
863
864/*
865 * Called when a system has released all its locks and is not going to use the
866 * lockspace any longer.  We free everything we're managing for this lockspace.
867 * Remaining nodes will go through the recovery process as if we'd died.  The
868 * lockspace must continue to function as usual, participating in recoveries,
869 * until this returns.
870 *
871 * Force has 4 possible values:
872 * 0 - don't destroy locksapce if it has any LKBs
873 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
874 * 2 - destroy lockspace regardless of LKBs
875 * 3 - destroy lockspace as part of a forced shutdown
876 */
877
878int dlm_release_lockspace(void *lockspace, int force)
879{
880	struct dlm_ls *ls;
881	int error;
882
883	ls = dlm_find_lockspace_local(lockspace);
884	if (!ls)
885		return -EINVAL;
886	dlm_put_lockspace(ls);
887
888	mutex_lock(&ls_lock);
889	error = release_lockspace(ls, force);
890	if (!error)
891		ls_count--;
892	if (!ls_count)
893		threads_stop();
894	mutex_unlock(&ls_lock);
895
896	return error;
897}
898
899void dlm_stop_lockspaces(void)
900{
901	struct dlm_ls *ls;
902	int count;
903
904 restart:
905	count = 0;
906	spin_lock(&lslist_lock);
907	list_for_each_entry(ls, &lslist, ls_list) {
908		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
909			count++;
910			continue;
911		}
912		spin_unlock(&lslist_lock);
913		log_error(ls, "no userland control daemon, stopping lockspace");
914		dlm_ls_stop(ls);
915		goto restart;
916	}
917	spin_unlock(&lslist_lock);
918
919	if (count)
920		log_print("dlm user daemon left %d lockspaces", count);
921}
922