Linux Audio

Check our new training course

Loading...
v4.10.11
 
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 14#include <linux/module.h>
 15
 16#include "dlm_internal.h"
 17#include "lockspace.h"
 18#include "member.h"
 19#include "recoverd.h"
 20#include "dir.h"
 21#include "lowcomms.h"
 22#include "config.h"
 23#include "memory.h"
 24#include "lock.h"
 25#include "recover.h"
 26#include "requestqueue.h"
 27#include "user.h"
 28#include "ast.h"
 29
 30static int			ls_count;
 31static struct mutex		ls_lock;
 32static struct list_head		lslist;
 33static spinlock_t		lslist_lock;
 34static struct task_struct *	scand_task;
 35
 36
 37static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 38{
 39	ssize_t ret = len;
 40	int n;
 41	int rc = kstrtoint(buf, 0, &n);
 42
 43	if (rc)
 44		return rc;
 45	ls = dlm_find_lockspace_local(ls->ls_local_handle);
 46	if (!ls)
 47		return -EINVAL;
 48
 49	switch (n) {
 50	case 0:
 51		dlm_ls_stop(ls);
 52		break;
 53	case 1:
 54		dlm_ls_start(ls);
 55		break;
 56	default:
 57		ret = -EINVAL;
 58	}
 59	dlm_put_lockspace(ls);
 60	return ret;
 61}
 62
 63static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 64{
 65	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 66
 67	if (rc)
 68		return rc;
 69	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 70	wake_up(&ls->ls_uevent_wait);
 71	return len;
 72}
 73
 74static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 75{
 76	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 77}
 78
 79static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 80{
 81	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 82
 83	if (rc)
 84		return rc;
 85	return len;
 86}
 87
 88static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 89{
 90	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 91}
 92
 93static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 94{
 95	int val;
 96	int rc = kstrtoint(buf, 0, &val);
 97
 98	if (rc)
 99		return rc;
100	if (val == 1)
101		set_bit(LSFL_NODIR, &ls->ls_flags);
102	return len;
103}
104
105static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106{
107	uint32_t status = dlm_recover_status(ls);
108	return snprintf(buf, PAGE_SIZE, "%x\n", status);
109}
110
111static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112{
113	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
114}
115
116struct dlm_attr {
117	struct attribute attr;
118	ssize_t (*show)(struct dlm_ls *, char *);
119	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120};
121
122static struct dlm_attr dlm_attr_control = {
123	.attr  = {.name = "control", .mode = S_IWUSR},
124	.store = dlm_control_store
125};
126
127static struct dlm_attr dlm_attr_event = {
128	.attr  = {.name = "event_done", .mode = S_IWUSR},
129	.store = dlm_event_store
130};
131
132static struct dlm_attr dlm_attr_id = {
133	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134	.show  = dlm_id_show,
135	.store = dlm_id_store
136};
137
138static struct dlm_attr dlm_attr_nodir = {
139	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
140	.show  = dlm_nodir_show,
141	.store = dlm_nodir_store
142};
143
144static struct dlm_attr dlm_attr_recover_status = {
145	.attr  = {.name = "recover_status", .mode = S_IRUGO},
146	.show  = dlm_recover_status_show
147};
148
149static struct dlm_attr dlm_attr_recover_nodeid = {
150	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
151	.show  = dlm_recover_nodeid_show
152};
153
154static struct attribute *dlm_attrs[] = {
155	&dlm_attr_control.attr,
156	&dlm_attr_event.attr,
157	&dlm_attr_id.attr,
158	&dlm_attr_nodir.attr,
159	&dlm_attr_recover_status.attr,
160	&dlm_attr_recover_nodeid.attr,
161	NULL,
162};
 
163
164static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165			     char *buf)
166{
167	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169	return a->show ? a->show(ls, buf) : 0;
170}
171
172static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173			      const char *buf, size_t len)
174{
175	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177	return a->store ? a->store(ls, buf, len) : len;
178}
179
180static void lockspace_kobj_release(struct kobject *k)
181{
182	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183	kfree(ls);
184}
185
186static const struct sysfs_ops dlm_attr_ops = {
187	.show  = dlm_attr_show,
188	.store = dlm_attr_store,
189};
190
191static struct kobj_type dlm_ktype = {
192	.default_attrs = dlm_attrs,
193	.sysfs_ops     = &dlm_attr_ops,
194	.release       = lockspace_kobj_release,
195};
196
197static struct kset *dlm_kset;
198
199static int do_uevent(struct dlm_ls *ls, int in)
200{
201	int error;
202
203	if (in)
204		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
205	else
206		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
207
208	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
209
210	/* dlm_controld will see the uevent, do the necessary group management
211	   and then write to sysfs to wake us */
212
213	error = wait_event_interruptible(ls->ls_uevent_wait,
214			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
215
216	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
217
218	if (error)
219		goto out;
220
221	error = ls->ls_uevent_result;
222 out:
223	if (error)
224		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
225			  error, ls->ls_uevent_result);
226	return error;
227}
228
229static int dlm_uevent(struct kset *kset, struct kobject *kobj,
230		      struct kobj_uevent_env *env)
231{
232	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
233
234	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
235	return 0;
236}
237
238static struct kset_uevent_ops dlm_uevent_ops = {
239	.uevent = dlm_uevent,
240};
241
242int __init dlm_lockspace_init(void)
243{
244	ls_count = 0;
245	mutex_init(&ls_lock);
246	INIT_LIST_HEAD(&lslist);
247	spin_lock_init(&lslist_lock);
248
249	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
250	if (!dlm_kset) {
251		printk(KERN_WARNING "%s: can not create kset\n", __func__);
252		return -ENOMEM;
253	}
254	return 0;
255}
256
257void dlm_lockspace_exit(void)
258{
259	kset_unregister(dlm_kset);
260}
261
262static struct dlm_ls *find_ls_to_scan(void)
263{
264	struct dlm_ls *ls;
265
266	spin_lock(&lslist_lock);
267	list_for_each_entry(ls, &lslist, ls_list) {
268		if (time_after_eq(jiffies, ls->ls_scan_time +
269					    dlm_config.ci_scan_secs * HZ)) {
270			spin_unlock(&lslist_lock);
271			return ls;
272		}
273	}
274	spin_unlock(&lslist_lock);
275	return NULL;
276}
277
278static int dlm_scand(void *data)
279{
280	struct dlm_ls *ls;
281
282	while (!kthread_should_stop()) {
283		ls = find_ls_to_scan();
284		if (ls) {
285			if (dlm_lock_recovery_try(ls)) {
286				ls->ls_scan_time = jiffies;
287				dlm_scan_rsbs(ls);
288				dlm_scan_timeout(ls);
289				dlm_scan_waiters(ls);
290				dlm_unlock_recovery(ls);
291			} else {
292				ls->ls_scan_time += HZ;
293			}
294			continue;
295		}
296		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
297	}
298	return 0;
299}
300
301static int dlm_scand_start(void)
302{
303	struct task_struct *p;
304	int error = 0;
305
306	p = kthread_run(dlm_scand, NULL, "dlm_scand");
307	if (IS_ERR(p))
308		error = PTR_ERR(p);
309	else
310		scand_task = p;
311	return error;
312}
313
314static void dlm_scand_stop(void)
315{
316	kthread_stop(scand_task);
317}
318
319struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
320{
321	struct dlm_ls *ls;
322
323	spin_lock(&lslist_lock);
324
325	list_for_each_entry(ls, &lslist, ls_list) {
326		if (ls->ls_global_id == id) {
327			ls->ls_count++;
328			goto out;
329		}
330	}
331	ls = NULL;
332 out:
333	spin_unlock(&lslist_lock);
334	return ls;
335}
336
337struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
338{
339	struct dlm_ls *ls;
340
341	spin_lock(&lslist_lock);
342	list_for_each_entry(ls, &lslist, ls_list) {
343		if (ls->ls_local_handle == lockspace) {
344			ls->ls_count++;
345			goto out;
346		}
347	}
348	ls = NULL;
349 out:
350	spin_unlock(&lslist_lock);
351	return ls;
352}
353
354struct dlm_ls *dlm_find_lockspace_device(int minor)
355{
356	struct dlm_ls *ls;
357
358	spin_lock(&lslist_lock);
359	list_for_each_entry(ls, &lslist, ls_list) {
360		if (ls->ls_device.minor == minor) {
361			ls->ls_count++;
362			goto out;
363		}
364	}
365	ls = NULL;
366 out:
367	spin_unlock(&lslist_lock);
368	return ls;
369}
370
371void dlm_put_lockspace(struct dlm_ls *ls)
372{
373	spin_lock(&lslist_lock);
374	ls->ls_count--;
375	spin_unlock(&lslist_lock);
376}
377
378static void remove_lockspace(struct dlm_ls *ls)
379{
380	for (;;) {
381		spin_lock(&lslist_lock);
382		if (ls->ls_count == 0) {
383			WARN_ON(ls->ls_create_count != 0);
384			list_del(&ls->ls_list);
385			spin_unlock(&lslist_lock);
386			return;
387		}
388		spin_unlock(&lslist_lock);
389		ssleep(1);
390	}
 
 
 
 
391}
392
393static int threads_start(void)
394{
395	int error;
396
397	error = dlm_scand_start();
398	if (error) {
399		log_print("cannot start dlm_scand thread %d", error);
400		goto fail;
401	}
402
403	/* Thread for sending/receiving messages for all lockspace's */
404	error = dlm_lowcomms_start();
405	if (error) {
406		log_print("cannot start dlm lowcomms %d", error);
407		goto scand_fail;
408	}
409
410	return 0;
411
412 scand_fail:
413	dlm_scand_stop();
414 fail:
415	return error;
416}
417
418static void threads_stop(void)
419{
420	dlm_scand_stop();
421	dlm_lowcomms_stop();
422}
423
424static int new_lockspace(const char *name, const char *cluster,
425			 uint32_t flags, int lvblen,
426			 const struct dlm_lockspace_ops *ops, void *ops_arg,
427			 int *ops_result, dlm_lockspace_t **lockspace)
428{
429	struct dlm_ls *ls;
430	int i, size, error;
431	int do_unreg = 0;
432	int namelen = strlen(name);
433
434	if (namelen > DLM_LOCKSPACE_LEN)
435		return -EINVAL;
436
437	if (!lvblen || (lvblen % 8))
438		return -EINVAL;
439
440	if (!try_module_get(THIS_MODULE))
441		return -EINVAL;
442
443	if (!dlm_user_daemon_available()) {
444		log_print("dlm user daemon not available");
445		error = -EUNATCH;
446		goto out;
447	}
448
449	if (ops && ops_result) {
450	       	if (!dlm_config.ci_recover_callbacks)
451			*ops_result = -EOPNOTSUPP;
452		else
453			*ops_result = 0;
454	}
455
 
 
 
 
456	if (dlm_config.ci_recover_callbacks && cluster &&
457	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
458		log_print("dlm cluster name %s mismatch %s",
 
459			  dlm_config.ci_cluster_name, cluster);
460		error = -EBADR;
461		goto out;
462	}
463
464	error = 0;
465
466	spin_lock(&lslist_lock);
467	list_for_each_entry(ls, &lslist, ls_list) {
468		WARN_ON(ls->ls_create_count <= 0);
469		if (ls->ls_namelen != namelen)
470			continue;
471		if (memcmp(ls->ls_name, name, namelen))
472			continue;
473		if (flags & DLM_LSFL_NEWEXCL) {
474			error = -EEXIST;
475			break;
476		}
477		ls->ls_create_count++;
478		*lockspace = ls;
479		error = 1;
480		break;
481	}
482	spin_unlock(&lslist_lock);
483
484	if (error)
485		goto out;
486
487	error = -ENOMEM;
488
489	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
490	if (!ls)
491		goto out;
492	memcpy(ls->ls_name, name, namelen);
493	ls->ls_namelen = namelen;
494	ls->ls_lvblen = lvblen;
495	ls->ls_count = 0;
 
496	ls->ls_flags = 0;
497	ls->ls_scan_time = jiffies;
498
499	if (ops && dlm_config.ci_recover_callbacks) {
500		ls->ls_ops = ops;
501		ls->ls_ops_arg = ops_arg;
502	}
503
504	if (flags & DLM_LSFL_TIMEWARN)
 
 
 
 
 
 
 
505		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 
506
507	/* ls_exflags are forced to match among nodes, and we don't
508	   need to require all nodes to have some flags set */
 
509	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
510				    DLM_LSFL_NEWEXCL));
 
 
 
 
 
 
511
512	size = dlm_config.ci_rsbtbl_size;
513	ls->ls_rsbtbl_size = size;
514
515	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
516	if (!ls->ls_rsbtbl)
517		goto out_lsfree;
518	for (i = 0; i < size; i++) {
519		ls->ls_rsbtbl[i].keep.rb_node = NULL;
520		ls->ls_rsbtbl[i].toss.rb_node = NULL;
521		spin_lock_init(&ls->ls_rsbtbl[i].lock);
522	}
523
524	spin_lock_init(&ls->ls_remove_spin);
525
526	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
527		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
528						 GFP_KERNEL);
529		if (!ls->ls_remove_names[i])
530			goto out_rsbtbl;
531	}
532
533	idr_init(&ls->ls_lkbidr);
534	spin_lock_init(&ls->ls_lkbidr_spin);
535
536	INIT_LIST_HEAD(&ls->ls_waiters);
537	mutex_init(&ls->ls_waiters_mutex);
538	INIT_LIST_HEAD(&ls->ls_orphans);
539	mutex_init(&ls->ls_orphans_mutex);
 
540	INIT_LIST_HEAD(&ls->ls_timeout);
541	mutex_init(&ls->ls_timeout_mutex);
 
542
543	INIT_LIST_HEAD(&ls->ls_new_rsb);
544	spin_lock_init(&ls->ls_new_rsb_spin);
545
546	INIT_LIST_HEAD(&ls->ls_nodes);
547	INIT_LIST_HEAD(&ls->ls_nodes_gone);
548	ls->ls_num_nodes = 0;
549	ls->ls_low_nodeid = 0;
550	ls->ls_total_weight = 0;
551	ls->ls_node_array = NULL;
552
553	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
554	ls->ls_stub_rsb.res_ls = ls;
555
556	ls->ls_debug_rsb_dentry = NULL;
557	ls->ls_debug_waiters_dentry = NULL;
558
559	init_waitqueue_head(&ls->ls_uevent_wait);
560	ls->ls_uevent_result = 0;
561	init_completion(&ls->ls_members_done);
562	ls->ls_members_result = -1;
563
564	mutex_init(&ls->ls_cb_mutex);
565	INIT_LIST_HEAD(&ls->ls_cb_delay);
566
567	ls->ls_recoverd_task = NULL;
568	mutex_init(&ls->ls_recoverd_active);
569	spin_lock_init(&ls->ls_recover_lock);
570	spin_lock_init(&ls->ls_rcom_spin);
571	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
572	ls->ls_recover_status = 0;
573	ls->ls_recover_seq = 0;
574	ls->ls_recover_args = NULL;
575	init_rwsem(&ls->ls_in_recovery);
576	init_rwsem(&ls->ls_recv_active);
577	INIT_LIST_HEAD(&ls->ls_requestqueue);
 
 
578	mutex_init(&ls->ls_requestqueue_mutex);
579	mutex_init(&ls->ls_clear_proc_locks);
580
581	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
 
 
 
 
 
582	if (!ls->ls_recover_buf)
583		goto out_lkbidr;
584
585	ls->ls_slot = 0;
586	ls->ls_num_slots = 0;
587	ls->ls_slots_size = 0;
588	ls->ls_slots = NULL;
589
590	INIT_LIST_HEAD(&ls->ls_recover_list);
591	spin_lock_init(&ls->ls_recover_list_lock);
592	idr_init(&ls->ls_recover_idr);
593	spin_lock_init(&ls->ls_recover_idr_lock);
594	ls->ls_recover_list_count = 0;
595	ls->ls_local_handle = ls;
596	init_waitqueue_head(&ls->ls_wait_general);
597	INIT_LIST_HEAD(&ls->ls_root_list);
598	init_rwsem(&ls->ls_root_sem);
599
600	spin_lock(&lslist_lock);
601	ls->ls_create_count = 1;
602	list_add(&ls->ls_list, &lslist);
603	spin_unlock(&lslist_lock);
604
605	if (flags & DLM_LSFL_FS) {
606		error = dlm_callback_start(ls);
607		if (error) {
608			log_error(ls, "can't start dlm_callback %d", error);
609			goto out_delist;
610		}
611	}
612
613	init_waitqueue_head(&ls->ls_recover_lock_wait);
614
615	/*
616	 * Once started, dlm_recoverd first looks for ls in lslist, then
617	 * initializes ls_in_recovery as locked in "down" mode.  We need
618	 * to wait for the wakeup from dlm_recoverd because in_recovery
619	 * has to start out in down mode.
620	 */
621
622	error = dlm_recoverd_start(ls);
623	if (error) {
624		log_error(ls, "can't start dlm_recoverd %d", error);
625		goto out_callback;
626	}
627
628	wait_event(ls->ls_recover_lock_wait,
629		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
630
 
 
 
631	ls->ls_kobj.kset = dlm_kset;
632	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
633				     "%s", ls->ls_name);
634	if (error)
635		goto out_recoverd;
636	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
637
638	/* let kobject handle freeing of ls if there's an error */
639	do_unreg = 1;
640
641	/* This uevent triggers dlm_controld in userspace to add us to the
642	   group of nodes that are members of this lockspace (managed by the
643	   cluster infrastructure.)  Once it's done that, it tells us who the
644	   current lockspace members are (via configfs) and then tells the
645	   lockspace to start running (via sysfs) in dlm_ls_start(). */
646
647	error = do_uevent(ls, 1);
648	if (error)
649		goto out_recoverd;
650
651	wait_for_completion(&ls->ls_members_done);
652	error = ls->ls_members_result;
 
653	if (error)
654		goto out_members;
655
656	dlm_create_debug_file(ls);
657
658	log_rinfo(ls, "join complete");
659	*lockspace = ls;
660	return 0;
661
662 out_members:
663	do_uevent(ls, 0);
664	dlm_clear_members(ls);
665	kfree(ls->ls_node_array);
666 out_recoverd:
667	dlm_recoverd_stop(ls);
668 out_callback:
669	dlm_callback_stop(ls);
670 out_delist:
671	spin_lock(&lslist_lock);
672	list_del(&ls->ls_list);
673	spin_unlock(&lslist_lock);
674	idr_destroy(&ls->ls_recover_idr);
675	kfree(ls->ls_recover_buf);
676 out_lkbidr:
677	idr_destroy(&ls->ls_lkbidr);
678	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
679		if (ls->ls_remove_names[i])
680			kfree(ls->ls_remove_names[i]);
681	}
682 out_rsbtbl:
 
 
683	vfree(ls->ls_rsbtbl);
684 out_lsfree:
685	if (do_unreg)
686		kobject_put(&ls->ls_kobj);
687	else
688		kfree(ls);
689 out:
690	module_put(THIS_MODULE);
691	return error;
692}
693
694int dlm_new_lockspace(const char *name, const char *cluster,
695		      uint32_t flags, int lvblen,
696		      const struct dlm_lockspace_ops *ops, void *ops_arg,
697		      int *ops_result, dlm_lockspace_t **lockspace)
 
698{
699	int error = 0;
700
701	mutex_lock(&ls_lock);
702	if (!ls_count)
703		error = threads_start();
704	if (error)
705		goto out;
706
707	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
708			      ops_result, lockspace);
709	if (!error)
710		ls_count++;
711	if (error > 0)
712		error = 0;
713	if (!ls_count)
714		threads_stop();
 
 
 
715 out:
716	mutex_unlock(&ls_lock);
717	return error;
718}
719
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
720static int lkb_idr_is_local(int id, void *p, void *data)
721{
722	struct dlm_lkb *lkb = p;
723
724	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
725}
726
727static int lkb_idr_is_any(int id, void *p, void *data)
728{
729	return 1;
730}
731
732static int lkb_idr_free(int id, void *p, void *data)
733{
734	struct dlm_lkb *lkb = p;
735
736	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
737		dlm_free_lvb(lkb->lkb_lvbptr);
738
739	dlm_free_lkb(lkb);
740	return 0;
741}
742
743/* NOTE: We check the lkbidr here rather than the resource table.
744   This is because there may be LKBs queued as ASTs that have been unlinked
745   from their RSBs and are pending deletion once the AST has been delivered */
746
747static int lockspace_busy(struct dlm_ls *ls, int force)
748{
749	int rv;
750
751	spin_lock(&ls->ls_lkbidr_spin);
752	if (force == 0) {
753		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
754	} else if (force == 1) {
755		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
756	} else {
757		rv = 0;
758	}
759	spin_unlock(&ls->ls_lkbidr_spin);
760	return rv;
761}
762
763static int release_lockspace(struct dlm_ls *ls, int force)
764{
765	struct dlm_rsb *rsb;
766	struct rb_node *n;
767	int i, busy, rv;
768
769	busy = lockspace_busy(ls, force);
770
771	spin_lock(&lslist_lock);
772	if (ls->ls_create_count == 1) {
773		if (busy) {
774			rv = -EBUSY;
775		} else {
776			/* remove_lockspace takes ls off lslist */
777			ls->ls_create_count = 0;
778			rv = 0;
779		}
780	} else if (ls->ls_create_count > 1) {
781		rv = --ls->ls_create_count;
782	} else {
783		rv = -EINVAL;
784	}
785	spin_unlock(&lslist_lock);
786
787	if (rv) {
788		log_debug(ls, "release_lockspace no remove %d", rv);
789		return rv;
790	}
791
792	dlm_device_deregister(ls);
793
794	if (force < 3 && dlm_user_daemon_available())
795		do_uevent(ls, 0);
796
797	dlm_recoverd_stop(ls);
798
 
 
 
 
 
 
799	dlm_callback_stop(ls);
800
801	remove_lockspace(ls);
802
803	dlm_delete_debug_file(ls);
804
 
805	kfree(ls->ls_recover_buf);
806
807	/*
808	 * Free all lkb's in idr
809	 */
810
811	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
812	idr_destroy(&ls->ls_lkbidr);
813
814	/*
815	 * Free all rsb's on rsbtbl[] lists
816	 */
817
818	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
819		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
820			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
821			rb_erase(n, &ls->ls_rsbtbl[i].keep);
822			dlm_free_rsb(rsb);
823		}
824
825		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
826			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
827			rb_erase(n, &ls->ls_rsbtbl[i].toss);
828			dlm_free_rsb(rsb);
829		}
830	}
831
832	vfree(ls->ls_rsbtbl);
833
834	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
835		kfree(ls->ls_remove_names[i]);
836
837	while (!list_empty(&ls->ls_new_rsb)) {
838		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
839				       res_hashchain);
840		list_del(&rsb->res_hashchain);
841		dlm_free_rsb(rsb);
842	}
843
844	/*
845	 * Free structures on any other lists
846	 */
847
848	dlm_purge_requestqueue(ls);
849	kfree(ls->ls_recover_args);
850	dlm_clear_members(ls);
851	dlm_clear_members_gone(ls);
852	kfree(ls->ls_node_array);
853	log_rinfo(ls, "release_lockspace final free");
854	kobject_put(&ls->ls_kobj);
855	/* The ls structure will be freed when the kobject is done with */
856
857	module_put(THIS_MODULE);
858	return 0;
859}
860
861/*
862 * Called when a system has released all its locks and is not going to use the
863 * lockspace any longer.  We free everything we're managing for this lockspace.
864 * Remaining nodes will go through the recovery process as if we'd died.  The
865 * lockspace must continue to function as usual, participating in recoveries,
866 * until this returns.
867 *
868 * Force has 4 possible values:
869 * 0 - don't destroy locksapce if it has any LKBs
870 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
871 * 2 - destroy lockspace regardless of LKBs
872 * 3 - destroy lockspace as part of a forced shutdown
873 */
874
875int dlm_release_lockspace(void *lockspace, int force)
876{
877	struct dlm_ls *ls;
878	int error;
879
880	ls = dlm_find_lockspace_local(lockspace);
881	if (!ls)
882		return -EINVAL;
883	dlm_put_lockspace(ls);
884
885	mutex_lock(&ls_lock);
886	error = release_lockspace(ls, force);
887	if (!error)
888		ls_count--;
889	if (!ls_count)
890		threads_stop();
891	mutex_unlock(&ls_lock);
892
893	return error;
894}
895
896void dlm_stop_lockspaces(void)
897{
898	struct dlm_ls *ls;
899	int count;
900
901 restart:
902	count = 0;
903	spin_lock(&lslist_lock);
904	list_for_each_entry(ls, &lslist, ls_list) {
905		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
906			count++;
907			continue;
908		}
909		spin_unlock(&lslist_lock);
910		log_error(ls, "no userland control daemon, stopping lockspace");
911		dlm_ls_stop(ls);
912		goto restart;
913	}
914	spin_unlock(&lslist_lock);
915
916	if (count)
917		log_print("dlm user daemon left %d lockspaces", count);
918}
919
v6.2
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  7**
 
 
 
  8**
  9*******************************************************************************
 10******************************************************************************/
 11
 12#include <linux/module.h>
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 19#include "midcomms.h"
 20#include "config.h"
 21#include "memory.h"
 22#include "lock.h"
 23#include "recover.h"
 24#include "requestqueue.h"
 25#include "user.h"
 26#include "ast.h"
 27
 28static int			ls_count;
 29static struct mutex		ls_lock;
 30static struct list_head		lslist;
 31static spinlock_t		lslist_lock;
 32static struct task_struct *	scand_task;
 33
 34
 35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 36{
 37	ssize_t ret = len;
 38	int n;
 39	int rc = kstrtoint(buf, 0, &n);
 40
 41	if (rc)
 42		return rc;
 43	ls = dlm_find_lockspace_local(ls->ls_local_handle);
 44	if (!ls)
 45		return -EINVAL;
 46
 47	switch (n) {
 48	case 0:
 49		dlm_ls_stop(ls);
 50		break;
 51	case 1:
 52		dlm_ls_start(ls);
 53		break;
 54	default:
 55		ret = -EINVAL;
 56	}
 57	dlm_put_lockspace(ls);
 58	return ret;
 59}
 60
 61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 62{
 63	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 64
 65	if (rc)
 66		return rc;
 67	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 68	wake_up(&ls->ls_uevent_wait);
 69	return len;
 70}
 71
 72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 73{
 74	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 75}
 76
 77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 78{
 79	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 80
 81	if (rc)
 82		return rc;
 83	return len;
 84}
 85
 86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 87{
 88	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 89}
 90
 91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 92{
 93	int val;
 94	int rc = kstrtoint(buf, 0, &val);
 95
 96	if (rc)
 97		return rc;
 98	if (val == 1)
 99		set_bit(LSFL_NODIR, &ls->ls_flags);
100	return len;
101}
102
103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104{
105	uint32_t status = dlm_recover_status(ls);
106	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107}
108
109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110{
111	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112}
113
114struct dlm_attr {
115	struct attribute attr;
116	ssize_t (*show)(struct dlm_ls *, char *);
117	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118};
119
120static struct dlm_attr dlm_attr_control = {
121	.attr  = {.name = "control", .mode = S_IWUSR},
122	.store = dlm_control_store
123};
124
125static struct dlm_attr dlm_attr_event = {
126	.attr  = {.name = "event_done", .mode = S_IWUSR},
127	.store = dlm_event_store
128};
129
130static struct dlm_attr dlm_attr_id = {
131	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132	.show  = dlm_id_show,
133	.store = dlm_id_store
134};
135
136static struct dlm_attr dlm_attr_nodir = {
137	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138	.show  = dlm_nodir_show,
139	.store = dlm_nodir_store
140};
141
142static struct dlm_attr dlm_attr_recover_status = {
143	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144	.show  = dlm_recover_status_show
145};
146
147static struct dlm_attr dlm_attr_recover_nodeid = {
148	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149	.show  = dlm_recover_nodeid_show
150};
151
152static struct attribute *dlm_attrs[] = {
153	&dlm_attr_control.attr,
154	&dlm_attr_event.attr,
155	&dlm_attr_id.attr,
156	&dlm_attr_nodir.attr,
157	&dlm_attr_recover_status.attr,
158	&dlm_attr_recover_nodeid.attr,
159	NULL,
160};
161ATTRIBUTE_GROUPS(dlm);
162
163static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164			     char *buf)
165{
166	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168	return a->show ? a->show(ls, buf) : 0;
169}
170
171static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172			      const char *buf, size_t len)
173{
174	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176	return a->store ? a->store(ls, buf, len) : len;
177}
178
179static void lockspace_kobj_release(struct kobject *k)
180{
181	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182	kfree(ls);
183}
184
185static const struct sysfs_ops dlm_attr_ops = {
186	.show  = dlm_attr_show,
187	.store = dlm_attr_store,
188};
189
190static struct kobj_type dlm_ktype = {
191	.default_groups = dlm_groups,
192	.sysfs_ops     = &dlm_attr_ops,
193	.release       = lockspace_kobj_release,
194};
195
196static struct kset *dlm_kset;
197
198static int do_uevent(struct dlm_ls *ls, int in)
199{
 
 
200	if (in)
201		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202	else
203		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207	/* dlm_controld will see the uevent, do the necessary group management
208	   and then write to sysfs to wake us */
209
210	wait_event(ls->ls_uevent_wait,
211		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215	return ls->ls_uevent_result;
 
 
 
 
 
 
 
 
216}
217
218static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
 
219{
220	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
221
222	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223	return 0;
224}
225
226static const struct kset_uevent_ops dlm_uevent_ops = {
227	.uevent = dlm_uevent,
228};
229
230int __init dlm_lockspace_init(void)
231{
232	ls_count = 0;
233	mutex_init(&ls_lock);
234	INIT_LIST_HEAD(&lslist);
235	spin_lock_init(&lslist_lock);
236
237	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
238	if (!dlm_kset) {
239		printk(KERN_WARNING "%s: can not create kset\n", __func__);
240		return -ENOMEM;
241	}
242	return 0;
243}
244
245void dlm_lockspace_exit(void)
246{
247	kset_unregister(dlm_kset);
248}
249
250static struct dlm_ls *find_ls_to_scan(void)
251{
252	struct dlm_ls *ls;
253
254	spin_lock(&lslist_lock);
255	list_for_each_entry(ls, &lslist, ls_list) {
256		if (time_after_eq(jiffies, ls->ls_scan_time +
257					    dlm_config.ci_scan_secs * HZ)) {
258			spin_unlock(&lslist_lock);
259			return ls;
260		}
261	}
262	spin_unlock(&lslist_lock);
263	return NULL;
264}
265
266static int dlm_scand(void *data)
267{
268	struct dlm_ls *ls;
269
270	while (!kthread_should_stop()) {
271		ls = find_ls_to_scan();
272		if (ls) {
273			if (dlm_lock_recovery_try(ls)) {
274				ls->ls_scan_time = jiffies;
275				dlm_scan_rsbs(ls);
276				dlm_scan_timeout(ls);
 
277				dlm_unlock_recovery(ls);
278			} else {
279				ls->ls_scan_time += HZ;
280			}
281			continue;
282		}
283		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
284	}
285	return 0;
286}
287
288static int dlm_scand_start(void)
289{
290	struct task_struct *p;
291	int error = 0;
292
293	p = kthread_run(dlm_scand, NULL, "dlm_scand");
294	if (IS_ERR(p))
295		error = PTR_ERR(p);
296	else
297		scand_task = p;
298	return error;
299}
300
301static void dlm_scand_stop(void)
302{
303	kthread_stop(scand_task);
304}
305
306struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
307{
308	struct dlm_ls *ls;
309
310	spin_lock(&lslist_lock);
311
312	list_for_each_entry(ls, &lslist, ls_list) {
313		if (ls->ls_global_id == id) {
314			atomic_inc(&ls->ls_count);
315			goto out;
316		}
317	}
318	ls = NULL;
319 out:
320	spin_unlock(&lslist_lock);
321	return ls;
322}
323
324struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
325{
326	struct dlm_ls *ls;
327
328	spin_lock(&lslist_lock);
329	list_for_each_entry(ls, &lslist, ls_list) {
330		if (ls->ls_local_handle == lockspace) {
331			atomic_inc(&ls->ls_count);
332			goto out;
333		}
334	}
335	ls = NULL;
336 out:
337	spin_unlock(&lslist_lock);
338	return ls;
339}
340
341struct dlm_ls *dlm_find_lockspace_device(int minor)
342{
343	struct dlm_ls *ls;
344
345	spin_lock(&lslist_lock);
346	list_for_each_entry(ls, &lslist, ls_list) {
347		if (ls->ls_device.minor == minor) {
348			atomic_inc(&ls->ls_count);
349			goto out;
350		}
351	}
352	ls = NULL;
353 out:
354	spin_unlock(&lslist_lock);
355	return ls;
356}
357
358void dlm_put_lockspace(struct dlm_ls *ls)
359{
360	if (atomic_dec_and_test(&ls->ls_count))
361		wake_up(&ls->ls_count_wait);
 
362}
363
364static void remove_lockspace(struct dlm_ls *ls)
365{
366retry:
367	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
368
369	spin_lock(&lslist_lock);
370	if (atomic_read(&ls->ls_count) != 0) {
 
 
 
371		spin_unlock(&lslist_lock);
372		goto retry;
373	}
374
375	WARN_ON(ls->ls_create_count != 0);
376	list_del(&ls->ls_list);
377	spin_unlock(&lslist_lock);
378}
379
380static int threads_start(void)
381{
382	int error;
383
384	error = dlm_scand_start();
385	if (error) {
386		log_print("cannot start dlm_scand thread %d", error);
387		goto fail;
388	}
389
390	/* Thread for sending/receiving messages for all lockspace's */
391	error = dlm_midcomms_start();
392	if (error) {
393		log_print("cannot start dlm midcomms %d", error);
394		goto scand_fail;
395	}
396
397	return 0;
398
399 scand_fail:
400	dlm_scand_stop();
401 fail:
402	return error;
403}
404
 
 
 
 
 
 
405static int new_lockspace(const char *name, const char *cluster,
406			 uint32_t flags, int lvblen,
407			 const struct dlm_lockspace_ops *ops, void *ops_arg,
408			 int *ops_result, dlm_lockspace_t **lockspace)
409{
410	struct dlm_ls *ls;
411	int i, size, error;
412	int do_unreg = 0;
413	int namelen = strlen(name);
414
415	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
416		return -EINVAL;
417
418	if (lvblen % 8)
419		return -EINVAL;
420
421	if (!try_module_get(THIS_MODULE))
422		return -EINVAL;
423
424	if (!dlm_user_daemon_available()) {
425		log_print("dlm user daemon not available");
426		error = -EUNATCH;
427		goto out;
428	}
429
430	if (ops && ops_result) {
431	       	if (!dlm_config.ci_recover_callbacks)
432			*ops_result = -EOPNOTSUPP;
433		else
434			*ops_result = 0;
435	}
436
437	if (!cluster)
438		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
439			  dlm_config.ci_cluster_name);
440
441	if (dlm_config.ci_recover_callbacks && cluster &&
442	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443		log_print("dlm cluster name '%s' does not match "
444			  "the application cluster name '%s'",
445			  dlm_config.ci_cluster_name, cluster);
446		error = -EBADR;
447		goto out;
448	}
449
450	error = 0;
451
452	spin_lock(&lslist_lock);
453	list_for_each_entry(ls, &lslist, ls_list) {
454		WARN_ON(ls->ls_create_count <= 0);
455		if (ls->ls_namelen != namelen)
456			continue;
457		if (memcmp(ls->ls_name, name, namelen))
458			continue;
459		if (flags & DLM_LSFL_NEWEXCL) {
460			error = -EEXIST;
461			break;
462		}
463		ls->ls_create_count++;
464		*lockspace = ls;
465		error = 1;
466		break;
467	}
468	spin_unlock(&lslist_lock);
469
470	if (error)
471		goto out;
472
473	error = -ENOMEM;
474
475	ls = kzalloc(sizeof(*ls), GFP_NOFS);
476	if (!ls)
477		goto out;
478	memcpy(ls->ls_name, name, namelen);
479	ls->ls_namelen = namelen;
480	ls->ls_lvblen = lvblen;
481	atomic_set(&ls->ls_count, 0);
482	init_waitqueue_head(&ls->ls_count_wait);
483	ls->ls_flags = 0;
484	ls->ls_scan_time = jiffies;
485
486	if (ops && dlm_config.ci_recover_callbacks) {
487		ls->ls_ops = ops;
488		ls->ls_ops_arg = ops_arg;
489	}
490
491#ifdef CONFIG_DLM_DEPRECATED_API
492	if (flags & DLM_LSFL_TIMEWARN) {
493		pr_warn_once("===============================================================\n"
494			     "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
495			     "         will be removed in v6.2!\n"
496			     "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
497			     "===============================================================\n");
498
499		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500	}
501
502	/* ls_exflags are forced to match among nodes, and we don't
503	 * need to require all nodes to have some flags set
504	 */
505	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
506				    DLM_LSFL_NEWEXCL));
507#else
508	/* ls_exflags are forced to match among nodes, and we don't
509	 * need to require all nodes to have some flags set
510	 */
511	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
512#endif
513
514	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
515	ls->ls_rsbtbl_size = size;
516
517	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
518	if (!ls->ls_rsbtbl)
519		goto out_lsfree;
520	for (i = 0; i < size; i++) {
521		ls->ls_rsbtbl[i].keep.rb_node = NULL;
522		ls->ls_rsbtbl[i].toss.rb_node = NULL;
523		spin_lock_init(&ls->ls_rsbtbl[i].lock);
524	}
525
 
 
526	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
527		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
528						 GFP_KERNEL);
529		if (!ls->ls_remove_names[i])
530			goto out_rsbtbl;
531	}
532
533	idr_init(&ls->ls_lkbidr);
534	spin_lock_init(&ls->ls_lkbidr_spin);
535
536	INIT_LIST_HEAD(&ls->ls_waiters);
537	mutex_init(&ls->ls_waiters_mutex);
538	INIT_LIST_HEAD(&ls->ls_orphans);
539	mutex_init(&ls->ls_orphans_mutex);
540#ifdef CONFIG_DLM_DEPRECATED_API
541	INIT_LIST_HEAD(&ls->ls_timeout);
542	mutex_init(&ls->ls_timeout_mutex);
543#endif
544
545	INIT_LIST_HEAD(&ls->ls_new_rsb);
546	spin_lock_init(&ls->ls_new_rsb_spin);
547
548	INIT_LIST_HEAD(&ls->ls_nodes);
549	INIT_LIST_HEAD(&ls->ls_nodes_gone);
550	ls->ls_num_nodes = 0;
551	ls->ls_low_nodeid = 0;
552	ls->ls_total_weight = 0;
553	ls->ls_node_array = NULL;
554
555	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
556	ls->ls_stub_rsb.res_ls = ls;
557
558	ls->ls_debug_rsb_dentry = NULL;
559	ls->ls_debug_waiters_dentry = NULL;
560
561	init_waitqueue_head(&ls->ls_uevent_wait);
562	ls->ls_uevent_result = 0;
563	init_completion(&ls->ls_recovery_done);
564	ls->ls_recovery_result = -1;
565
566	spin_lock_init(&ls->ls_cb_lock);
567	INIT_LIST_HEAD(&ls->ls_cb_delay);
568
569	ls->ls_recoverd_task = NULL;
570	mutex_init(&ls->ls_recoverd_active);
571	spin_lock_init(&ls->ls_recover_lock);
572	spin_lock_init(&ls->ls_rcom_spin);
573	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
574	ls->ls_recover_status = 0;
575	ls->ls_recover_seq = 0;
576	ls->ls_recover_args = NULL;
577	init_rwsem(&ls->ls_in_recovery);
578	init_rwsem(&ls->ls_recv_active);
579	INIT_LIST_HEAD(&ls->ls_requestqueue);
580	atomic_set(&ls->ls_requestqueue_cnt, 0);
581	init_waitqueue_head(&ls->ls_requestqueue_wait);
582	mutex_init(&ls->ls_requestqueue_mutex);
583	spin_lock_init(&ls->ls_clear_proc_locks);
584
585	/* Due backwards compatibility with 3.1 we need to use maximum
586	 * possible dlm message size to be sure the message will fit and
587	 * not having out of bounds issues. However on sending side 3.2
588	 * might send less.
589	 */
590	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
591	if (!ls->ls_recover_buf)
592		goto out_lkbidr;
593
594	ls->ls_slot = 0;
595	ls->ls_num_slots = 0;
596	ls->ls_slots_size = 0;
597	ls->ls_slots = NULL;
598
599	INIT_LIST_HEAD(&ls->ls_recover_list);
600	spin_lock_init(&ls->ls_recover_list_lock);
601	idr_init(&ls->ls_recover_idr);
602	spin_lock_init(&ls->ls_recover_idr_lock);
603	ls->ls_recover_list_count = 0;
604	ls->ls_local_handle = ls;
605	init_waitqueue_head(&ls->ls_wait_general);
606	INIT_LIST_HEAD(&ls->ls_root_list);
607	init_rwsem(&ls->ls_root_sem);
608
609	spin_lock(&lslist_lock);
610	ls->ls_create_count = 1;
611	list_add(&ls->ls_list, &lslist);
612	spin_unlock(&lslist_lock);
613
614	if (flags & DLM_LSFL_FS) {
615		error = dlm_callback_start(ls);
616		if (error) {
617			log_error(ls, "can't start dlm_callback %d", error);
618			goto out_delist;
619		}
620	}
621
622	init_waitqueue_head(&ls->ls_recover_lock_wait);
623
624	/*
625	 * Once started, dlm_recoverd first looks for ls in lslist, then
626	 * initializes ls_in_recovery as locked in "down" mode.  We need
627	 * to wait for the wakeup from dlm_recoverd because in_recovery
628	 * has to start out in down mode.
629	 */
630
631	error = dlm_recoverd_start(ls);
632	if (error) {
633		log_error(ls, "can't start dlm_recoverd %d", error);
634		goto out_callback;
635	}
636
637	wait_event(ls->ls_recover_lock_wait,
638		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
639
640	/* let kobject handle freeing of ls if there's an error */
641	do_unreg = 1;
642
643	ls->ls_kobj.kset = dlm_kset;
644	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
645				     "%s", ls->ls_name);
646	if (error)
647		goto out_recoverd;
648	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
649
 
 
 
650	/* This uevent triggers dlm_controld in userspace to add us to the
651	   group of nodes that are members of this lockspace (managed by the
652	   cluster infrastructure.)  Once it's done that, it tells us who the
653	   current lockspace members are (via configfs) and then tells the
654	   lockspace to start running (via sysfs) in dlm_ls_start(). */
655
656	error = do_uevent(ls, 1);
657	if (error)
658		goto out_recoverd;
659
660	/* wait until recovery is successful or failed */
661	wait_for_completion(&ls->ls_recovery_done);
662	error = ls->ls_recovery_result;
663	if (error)
664		goto out_members;
665
666	dlm_create_debug_file(ls);
667
668	log_rinfo(ls, "join complete");
669	*lockspace = ls;
670	return 0;
671
672 out_members:
673	do_uevent(ls, 0);
674	dlm_clear_members(ls);
675	kfree(ls->ls_node_array);
676 out_recoverd:
677	dlm_recoverd_stop(ls);
678 out_callback:
679	dlm_callback_stop(ls);
680 out_delist:
681	spin_lock(&lslist_lock);
682	list_del(&ls->ls_list);
683	spin_unlock(&lslist_lock);
684	idr_destroy(&ls->ls_recover_idr);
685	kfree(ls->ls_recover_buf);
686 out_lkbidr:
687	idr_destroy(&ls->ls_lkbidr);
 
 
 
 
688 out_rsbtbl:
689	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
690		kfree(ls->ls_remove_names[i]);
691	vfree(ls->ls_rsbtbl);
692 out_lsfree:
693	if (do_unreg)
694		kobject_put(&ls->ls_kobj);
695	else
696		kfree(ls);
697 out:
698	module_put(THIS_MODULE);
699	return error;
700}
701
702static int __dlm_new_lockspace(const char *name, const char *cluster,
703			       uint32_t flags, int lvblen,
704			       const struct dlm_lockspace_ops *ops,
705			       void *ops_arg, int *ops_result,
706			       dlm_lockspace_t **lockspace)
707{
708	int error = 0;
709
710	mutex_lock(&ls_lock);
711	if (!ls_count)
712		error = threads_start();
713	if (error)
714		goto out;
715
716	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
717			      ops_result, lockspace);
718	if (!error)
719		ls_count++;
720	if (error > 0)
721		error = 0;
722	if (!ls_count) {
723		dlm_scand_stop();
724		dlm_midcomms_shutdown();
725		dlm_midcomms_stop();
726	}
727 out:
728	mutex_unlock(&ls_lock);
729	return error;
730}
731
732int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
733		      int lvblen, const struct dlm_lockspace_ops *ops,
734		      void *ops_arg, int *ops_result,
735		      dlm_lockspace_t **lockspace)
736{
737	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
738				   ops, ops_arg, ops_result, lockspace);
739}
740
741int dlm_new_user_lockspace(const char *name, const char *cluster,
742			   uint32_t flags, int lvblen,
743			   const struct dlm_lockspace_ops *ops,
744			   void *ops_arg, int *ops_result,
745			   dlm_lockspace_t **lockspace)
746{
747	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
748				   ops_arg, ops_result, lockspace);
749}
750
751static int lkb_idr_is_local(int id, void *p, void *data)
752{
753	struct dlm_lkb *lkb = p;
754
755	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
756}
757
758static int lkb_idr_is_any(int id, void *p, void *data)
759{
760	return 1;
761}
762
763static int lkb_idr_free(int id, void *p, void *data)
764{
765	struct dlm_lkb *lkb = p;
766
767	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
768		dlm_free_lvb(lkb->lkb_lvbptr);
769
770	dlm_free_lkb(lkb);
771	return 0;
772}
773
774/* NOTE: We check the lkbidr here rather than the resource table.
775   This is because there may be LKBs queued as ASTs that have been unlinked
776   from their RSBs and are pending deletion once the AST has been delivered */
777
778static int lockspace_busy(struct dlm_ls *ls, int force)
779{
780	int rv;
781
782	spin_lock(&ls->ls_lkbidr_spin);
783	if (force == 0) {
784		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
785	} else if (force == 1) {
786		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
787	} else {
788		rv = 0;
789	}
790	spin_unlock(&ls->ls_lkbidr_spin);
791	return rv;
792}
793
794static int release_lockspace(struct dlm_ls *ls, int force)
795{
796	struct dlm_rsb *rsb;
797	struct rb_node *n;
798	int i, busy, rv;
799
800	busy = lockspace_busy(ls, force);
801
802	spin_lock(&lslist_lock);
803	if (ls->ls_create_count == 1) {
804		if (busy) {
805			rv = -EBUSY;
806		} else {
807			/* remove_lockspace takes ls off lslist */
808			ls->ls_create_count = 0;
809			rv = 0;
810		}
811	} else if (ls->ls_create_count > 1) {
812		rv = --ls->ls_create_count;
813	} else {
814		rv = -EINVAL;
815	}
816	spin_unlock(&lslist_lock);
817
818	if (rv) {
819		log_debug(ls, "release_lockspace no remove %d", rv);
820		return rv;
821	}
822
823	dlm_device_deregister(ls);
824
825	if (force < 3 && dlm_user_daemon_available())
826		do_uevent(ls, 0);
827
828	dlm_recoverd_stop(ls);
829
830	if (ls_count == 1) {
831		dlm_scand_stop();
832		dlm_clear_members(ls);
833		dlm_midcomms_shutdown();
834	}
835
836	dlm_callback_stop(ls);
837
838	remove_lockspace(ls);
839
840	dlm_delete_debug_file(ls);
841
842	idr_destroy(&ls->ls_recover_idr);
843	kfree(ls->ls_recover_buf);
844
845	/*
846	 * Free all lkb's in idr
847	 */
848
849	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
850	idr_destroy(&ls->ls_lkbidr);
851
852	/*
853	 * Free all rsb's on rsbtbl[] lists
854	 */
855
856	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
857		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
858			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
859			rb_erase(n, &ls->ls_rsbtbl[i].keep);
860			dlm_free_rsb(rsb);
861		}
862
863		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
864			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
865			rb_erase(n, &ls->ls_rsbtbl[i].toss);
866			dlm_free_rsb(rsb);
867		}
868	}
869
870	vfree(ls->ls_rsbtbl);
871
872	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
873		kfree(ls->ls_remove_names[i]);
874
875	while (!list_empty(&ls->ls_new_rsb)) {
876		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
877				       res_hashchain);
878		list_del(&rsb->res_hashchain);
879		dlm_free_rsb(rsb);
880	}
881
882	/*
883	 * Free structures on any other lists
884	 */
885
886	dlm_purge_requestqueue(ls);
887	kfree(ls->ls_recover_args);
888	dlm_clear_members(ls);
889	dlm_clear_members_gone(ls);
890	kfree(ls->ls_node_array);
891	log_rinfo(ls, "release_lockspace final free");
892	kobject_put(&ls->ls_kobj);
893	/* The ls structure will be freed when the kobject is done with */
894
895	module_put(THIS_MODULE);
896	return 0;
897}
898
899/*
900 * Called when a system has released all its locks and is not going to use the
901 * lockspace any longer.  We free everything we're managing for this lockspace.
902 * Remaining nodes will go through the recovery process as if we'd died.  The
903 * lockspace must continue to function as usual, participating in recoveries,
904 * until this returns.
905 *
906 * Force has 4 possible values:
907 * 0 - don't destroy lockspace if it has any LKBs
908 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
909 * 2 - destroy lockspace regardless of LKBs
910 * 3 - destroy lockspace as part of a forced shutdown
911 */
912
913int dlm_release_lockspace(void *lockspace, int force)
914{
915	struct dlm_ls *ls;
916	int error;
917
918	ls = dlm_find_lockspace_local(lockspace);
919	if (!ls)
920		return -EINVAL;
921	dlm_put_lockspace(ls);
922
923	mutex_lock(&ls_lock);
924	error = release_lockspace(ls, force);
925	if (!error)
926		ls_count--;
927	if (!ls_count)
928		dlm_midcomms_stop();
929	mutex_unlock(&ls_lock);
930
931	return error;
932}
933
934void dlm_stop_lockspaces(void)
935{
936	struct dlm_ls *ls;
937	int count;
938
939 restart:
940	count = 0;
941	spin_lock(&lslist_lock);
942	list_for_each_entry(ls, &lslist, ls_list) {
943		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
944			count++;
945			continue;
946		}
947		spin_unlock(&lslist_lock);
948		log_error(ls, "no userland control daemon, stopping lockspace");
949		dlm_ls_stop(ls);
950		goto restart;
951	}
952	spin_unlock(&lslist_lock);
953
954	if (count)
955		log_print("dlm user daemon left %d lockspaces", count);
956}
957
958void dlm_stop_lockspaces_check(void)
959{
960	struct dlm_ls *ls;
961
962	spin_lock(&lslist_lock);
963	list_for_each_entry(ls, &lslist, ls_list) {
964		if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
965			    !dlm_locking_stopped(ls)))
966			break;
967	}
968	spin_unlock(&lslist_lock);
969}