Linux Audio

Check our new training course

Loading...
v4.6
 
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 
 
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 19#include "lowcomms.h"
 20#include "config.h"
 21#include "memory.h"
 22#include "lock.h"
 23#include "recover.h"
 24#include "requestqueue.h"
 25#include "user.h"
 26#include "ast.h"
 27
 28static int			ls_count;
 29static struct mutex		ls_lock;
 30static struct list_head		lslist;
 31static spinlock_t		lslist_lock;
 32static struct task_struct *	scand_task;
 33
 34
 35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 36{
 37	ssize_t ret = len;
 38	int n;
 39	int rc = kstrtoint(buf, 0, &n);
 40
 41	if (rc)
 42		return rc;
 43	ls = dlm_find_lockspace_local(ls->ls_local_handle);
 44	if (!ls)
 45		return -EINVAL;
 46
 47	switch (n) {
 48	case 0:
 49		dlm_ls_stop(ls);
 50		break;
 51	case 1:
 52		dlm_ls_start(ls);
 53		break;
 54	default:
 55		ret = -EINVAL;
 56	}
 57	dlm_put_lockspace(ls);
 58	return ret;
 59}
 60
 61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 62{
 63	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 64
 65	if (rc)
 66		return rc;
 67	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 68	wake_up(&ls->ls_uevent_wait);
 69	return len;
 70}
 71
 72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 73{
 74	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 75}
 76
 77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 78{
 79	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 80
 81	if (rc)
 82		return rc;
 83	return len;
 84}
 85
 86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 87{
 88	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 89}
 90
 91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 92{
 93	int val;
 94	int rc = kstrtoint(buf, 0, &val);
 95
 96	if (rc)
 97		return rc;
 98	if (val == 1)
 99		set_bit(LSFL_NODIR, &ls->ls_flags);
100	return len;
101}
102
103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104{
105	uint32_t status = dlm_recover_status(ls);
106	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107}
108
109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110{
111	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112}
113
114struct dlm_attr {
115	struct attribute attr;
116	ssize_t (*show)(struct dlm_ls *, char *);
117	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118};
119
120static struct dlm_attr dlm_attr_control = {
121	.attr  = {.name = "control", .mode = S_IWUSR},
122	.store = dlm_control_store
123};
124
125static struct dlm_attr dlm_attr_event = {
126	.attr  = {.name = "event_done", .mode = S_IWUSR},
127	.store = dlm_event_store
128};
129
130static struct dlm_attr dlm_attr_id = {
131	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132	.show  = dlm_id_show,
133	.store = dlm_id_store
134};
135
136static struct dlm_attr dlm_attr_nodir = {
137	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138	.show  = dlm_nodir_show,
139	.store = dlm_nodir_store
140};
141
142static struct dlm_attr dlm_attr_recover_status = {
143	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144	.show  = dlm_recover_status_show
145};
146
147static struct dlm_attr dlm_attr_recover_nodeid = {
148	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149	.show  = dlm_recover_nodeid_show
150};
151
152static struct attribute *dlm_attrs[] = {
153	&dlm_attr_control.attr,
154	&dlm_attr_event.attr,
155	&dlm_attr_id.attr,
156	&dlm_attr_nodir.attr,
157	&dlm_attr_recover_status.attr,
158	&dlm_attr_recover_nodeid.attr,
159	NULL,
160};
 
161
162static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
163			     char *buf)
164{
165	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
166	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
167	return a->show ? a->show(ls, buf) : 0;
168}
169
170static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
171			      const char *buf, size_t len)
172{
173	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
174	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
175	return a->store ? a->store(ls, buf, len) : len;
176}
177
178static void lockspace_kobj_release(struct kobject *k)
179{
180	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
181	kfree(ls);
182}
183
184static const struct sysfs_ops dlm_attr_ops = {
185	.show  = dlm_attr_show,
186	.store = dlm_attr_store,
187};
188
189static struct kobj_type dlm_ktype = {
190	.default_attrs = dlm_attrs,
191	.sysfs_ops     = &dlm_attr_ops,
192	.release       = lockspace_kobj_release,
193};
194
195static struct kset *dlm_kset;
196
197static int do_uevent(struct dlm_ls *ls, int in)
198{
199	int error;
200
201	if (in)
202		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203	else
204		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208	/* dlm_controld will see the uevent, do the necessary group management
209	   and then write to sysfs to wake us */
210
211	error = wait_event_interruptible(ls->ls_uevent_wait,
212			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213
214	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
215
216	if (error)
217		goto out;
218
219	error = ls->ls_uevent_result;
220 out:
221	if (error)
222		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
223			  error, ls->ls_uevent_result);
224	return error;
225}
226
227static int dlm_uevent(struct kset *kset, struct kobject *kobj,
228		      struct kobj_uevent_env *env)
229{
230	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
231
232	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
233	return 0;
234}
235
236static struct kset_uevent_ops dlm_uevent_ops = {
237	.uevent = dlm_uevent,
238};
239
240int __init dlm_lockspace_init(void)
241{
242	ls_count = 0;
243	mutex_init(&ls_lock);
244	INIT_LIST_HEAD(&lslist);
245	spin_lock_init(&lslist_lock);
246
247	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
248	if (!dlm_kset) {
249		printk(KERN_WARNING "%s: can not create kset\n", __func__);
250		return -ENOMEM;
251	}
252	return 0;
253}
254
255void dlm_lockspace_exit(void)
256{
257	kset_unregister(dlm_kset);
258}
259
260static struct dlm_ls *find_ls_to_scan(void)
261{
262	struct dlm_ls *ls;
263
264	spin_lock(&lslist_lock);
265	list_for_each_entry(ls, &lslist, ls_list) {
266		if (time_after_eq(jiffies, ls->ls_scan_time +
267					    dlm_config.ci_scan_secs * HZ)) {
268			spin_unlock(&lslist_lock);
269			return ls;
270		}
271	}
272	spin_unlock(&lslist_lock);
273	return NULL;
274}
275
276static int dlm_scand(void *data)
277{
278	struct dlm_ls *ls;
279
280	while (!kthread_should_stop()) {
281		ls = find_ls_to_scan();
282		if (ls) {
283			if (dlm_lock_recovery_try(ls)) {
284				ls->ls_scan_time = jiffies;
285				dlm_scan_rsbs(ls);
286				dlm_scan_timeout(ls);
287				dlm_scan_waiters(ls);
288				dlm_unlock_recovery(ls);
289			} else {
290				ls->ls_scan_time += HZ;
291			}
292			continue;
293		}
294		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
295	}
296	return 0;
297}
298
299static int dlm_scand_start(void)
300{
301	struct task_struct *p;
302	int error = 0;
303
304	p = kthread_run(dlm_scand, NULL, "dlm_scand");
305	if (IS_ERR(p))
306		error = PTR_ERR(p);
307	else
308		scand_task = p;
309	return error;
310}
311
312static void dlm_scand_stop(void)
313{
314	kthread_stop(scand_task);
315}
316
317struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
318{
319	struct dlm_ls *ls;
320
321	spin_lock(&lslist_lock);
322
323	list_for_each_entry(ls, &lslist, ls_list) {
324		if (ls->ls_global_id == id) {
325			ls->ls_count++;
326			goto out;
327		}
328	}
329	ls = NULL;
330 out:
331	spin_unlock(&lslist_lock);
332	return ls;
333}
334
335struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
336{
337	struct dlm_ls *ls;
338
339	spin_lock(&lslist_lock);
340	list_for_each_entry(ls, &lslist, ls_list) {
341		if (ls->ls_local_handle == lockspace) {
342			ls->ls_count++;
343			goto out;
344		}
345	}
346	ls = NULL;
347 out:
348	spin_unlock(&lslist_lock);
349	return ls;
350}
351
352struct dlm_ls *dlm_find_lockspace_device(int minor)
353{
354	struct dlm_ls *ls;
355
356	spin_lock(&lslist_lock);
357	list_for_each_entry(ls, &lslist, ls_list) {
358		if (ls->ls_device.minor == minor) {
359			ls->ls_count++;
360			goto out;
361		}
362	}
363	ls = NULL;
364 out:
365	spin_unlock(&lslist_lock);
366	return ls;
367}
368
369void dlm_put_lockspace(struct dlm_ls *ls)
370{
371	spin_lock(&lslist_lock);
372	ls->ls_count--;
373	spin_unlock(&lslist_lock);
374}
375
376static void remove_lockspace(struct dlm_ls *ls)
377{
378	for (;;) {
379		spin_lock(&lslist_lock);
380		if (ls->ls_count == 0) {
381			WARN_ON(ls->ls_create_count != 0);
382			list_del(&ls->ls_list);
383			spin_unlock(&lslist_lock);
384			return;
385		}
386		spin_unlock(&lslist_lock);
387		ssleep(1);
388	}
 
 
 
 
389}
390
391static int threads_start(void)
392{
393	int error;
394
395	error = dlm_scand_start();
396	if (error) {
397		log_print("cannot start dlm_scand thread %d", error);
398		goto fail;
399	}
400
401	/* Thread for sending/receiving messages for all lockspace's */
402	error = dlm_lowcomms_start();
403	if (error) {
404		log_print("cannot start dlm lowcomms %d", error);
405		goto scand_fail;
406	}
 
 
 
 
 
 
407
 
408	return 0;
 
409
410 scand_fail:
411	dlm_scand_stop();
412 fail:
413	return error;
 
414}
415
416static void threads_stop(void)
417{
418	dlm_scand_stop();
419	dlm_lowcomms_stop();
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
420}
421
422static int new_lockspace(const char *name, const char *cluster,
423			 uint32_t flags, int lvblen,
424			 const struct dlm_lockspace_ops *ops, void *ops_arg,
425			 int *ops_result, dlm_lockspace_t **lockspace)
426{
427	struct dlm_ls *ls;
428	int i, size, error;
429	int do_unreg = 0;
430	int namelen = strlen(name);
 
431
432	if (namelen > DLM_LOCKSPACE_LEN)
433		return -EINVAL;
434
435	if (!lvblen || (lvblen % 8))
436		return -EINVAL;
437
438	if (!try_module_get(THIS_MODULE))
439		return -EINVAL;
440
441	if (!dlm_user_daemon_available()) {
442		log_print("dlm user daemon not available");
443		error = -EUNATCH;
444		goto out;
445	}
446
447	if (ops && ops_result) {
448	       	if (!dlm_config.ci_recover_callbacks)
449			*ops_result = -EOPNOTSUPP;
450		else
451			*ops_result = 0;
452	}
453
 
 
 
 
454	if (dlm_config.ci_recover_callbacks && cluster &&
455	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
456		log_print("dlm cluster name %s mismatch %s",
 
457			  dlm_config.ci_cluster_name, cluster);
458		error = -EBADR;
459		goto out;
460	}
461
462	error = 0;
463
464	spin_lock(&lslist_lock);
465	list_for_each_entry(ls, &lslist, ls_list) {
466		WARN_ON(ls->ls_create_count <= 0);
467		if (ls->ls_namelen != namelen)
468			continue;
469		if (memcmp(ls->ls_name, name, namelen))
470			continue;
471		if (flags & DLM_LSFL_NEWEXCL) {
472			error = -EEXIST;
473			break;
474		}
475		ls->ls_create_count++;
476		*lockspace = ls;
477		error = 1;
478		break;
479	}
480	spin_unlock(&lslist_lock);
481
482	if (error)
483		goto out;
484
485	error = -ENOMEM;
486
487	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
488	if (!ls)
489		goto out;
490	memcpy(ls->ls_name, name, namelen);
491	ls->ls_namelen = namelen;
492	ls->ls_lvblen = lvblen;
493	ls->ls_count = 0;
 
494	ls->ls_flags = 0;
495	ls->ls_scan_time = jiffies;
496
497	if (ops && dlm_config.ci_recover_callbacks) {
498		ls->ls_ops = ops;
499		ls->ls_ops_arg = ops_arg;
500	}
501
502	if (flags & DLM_LSFL_TIMEWARN)
503		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
504
505	/* ls_exflags are forced to match among nodes, and we don't
506	   need to require all nodes to have some flags set */
507	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
508				    DLM_LSFL_NEWEXCL));
 
509
510	size = dlm_config.ci_rsbtbl_size;
511	ls->ls_rsbtbl_size = size;
 
512
513	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
514	if (!ls->ls_rsbtbl)
515		goto out_lsfree;
516	for (i = 0; i < size; i++) {
517		ls->ls_rsbtbl[i].keep.rb_node = NULL;
518		ls->ls_rsbtbl[i].toss.rb_node = NULL;
519		spin_lock_init(&ls->ls_rsbtbl[i].lock);
520	}
521
522	spin_lock_init(&ls->ls_remove_spin);
523
524	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
525		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
526						 GFP_KERNEL);
527		if (!ls->ls_remove_names[i])
528			goto out_rsbtbl;
529	}
530
531	idr_init(&ls->ls_lkbidr);
532	spin_lock_init(&ls->ls_lkbidr_spin);
533
534	INIT_LIST_HEAD(&ls->ls_waiters);
535	mutex_init(&ls->ls_waiters_mutex);
536	INIT_LIST_HEAD(&ls->ls_orphans);
537	mutex_init(&ls->ls_orphans_mutex);
538	INIT_LIST_HEAD(&ls->ls_timeout);
539	mutex_init(&ls->ls_timeout_mutex);
540
541	INIT_LIST_HEAD(&ls->ls_new_rsb);
542	spin_lock_init(&ls->ls_new_rsb_spin);
543
544	INIT_LIST_HEAD(&ls->ls_nodes);
545	INIT_LIST_HEAD(&ls->ls_nodes_gone);
546	ls->ls_num_nodes = 0;
547	ls->ls_low_nodeid = 0;
548	ls->ls_total_weight = 0;
549	ls->ls_node_array = NULL;
550
551	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
552	ls->ls_stub_rsb.res_ls = ls;
553
554	ls->ls_debug_rsb_dentry = NULL;
555	ls->ls_debug_waiters_dentry = NULL;
556
557	init_waitqueue_head(&ls->ls_uevent_wait);
558	ls->ls_uevent_result = 0;
559	init_completion(&ls->ls_members_done);
560	ls->ls_members_result = -1;
561
562	mutex_init(&ls->ls_cb_mutex);
563	INIT_LIST_HEAD(&ls->ls_cb_delay);
564
 
 
565	ls->ls_recoverd_task = NULL;
566	mutex_init(&ls->ls_recoverd_active);
567	spin_lock_init(&ls->ls_recover_lock);
568	spin_lock_init(&ls->ls_rcom_spin);
569	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
570	ls->ls_recover_status = 0;
571	ls->ls_recover_seq = 0;
572	ls->ls_recover_args = NULL;
573	init_rwsem(&ls->ls_in_recovery);
574	init_rwsem(&ls->ls_recv_active);
575	INIT_LIST_HEAD(&ls->ls_requestqueue);
576	mutex_init(&ls->ls_requestqueue_mutex);
577	mutex_init(&ls->ls_clear_proc_locks);
578
579	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
580	if (!ls->ls_recover_buf)
581		goto out_lkbidr;
 
 
 
 
 
 
 
582
583	ls->ls_slot = 0;
584	ls->ls_num_slots = 0;
585	ls->ls_slots_size = 0;
586	ls->ls_slots = NULL;
587
588	INIT_LIST_HEAD(&ls->ls_recover_list);
589	spin_lock_init(&ls->ls_recover_list_lock);
590	idr_init(&ls->ls_recover_idr);
591	spin_lock_init(&ls->ls_recover_idr_lock);
592	ls->ls_recover_list_count = 0;
593	ls->ls_local_handle = ls;
594	init_waitqueue_head(&ls->ls_wait_general);
595	INIT_LIST_HEAD(&ls->ls_root_list);
596	init_rwsem(&ls->ls_root_sem);
 
 
 
 
 
 
597
598	spin_lock(&lslist_lock);
599	ls->ls_create_count = 1;
600	list_add(&ls->ls_list, &lslist);
601	spin_unlock(&lslist_lock);
602
603	if (flags & DLM_LSFL_FS) {
604		error = dlm_callback_start(ls);
605		if (error) {
606			log_error(ls, "can't start dlm_callback %d", error);
607			goto out_delist;
608		}
 
609	}
610
611	init_waitqueue_head(&ls->ls_recover_lock_wait);
612
613	/*
614	 * Once started, dlm_recoverd first looks for ls in lslist, then
615	 * initializes ls_in_recovery as locked in "down" mode.  We need
616	 * to wait for the wakeup from dlm_recoverd because in_recovery
617	 * has to start out in down mode.
618	 */
619
620	error = dlm_recoverd_start(ls);
621	if (error) {
622		log_error(ls, "can't start dlm_recoverd %d", error);
623		goto out_callback;
624	}
625
626	wait_event(ls->ls_recover_lock_wait,
627		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
628
629	ls->ls_kobj.kset = dlm_kset;
630	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
631				     "%s", ls->ls_name);
632	if (error)
633		goto out_recoverd;
634	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
635
636	/* let kobject handle freeing of ls if there's an error */
637	do_unreg = 1;
638
639	/* This uevent triggers dlm_controld in userspace to add us to the
640	   group of nodes that are members of this lockspace (managed by the
641	   cluster infrastructure.)  Once it's done that, it tells us who the
642	   current lockspace members are (via configfs) and then tells the
643	   lockspace to start running (via sysfs) in dlm_ls_start(). */
644
645	error = do_uevent(ls, 1);
646	if (error)
647		goto out_recoverd;
648
649	wait_for_completion(&ls->ls_members_done);
650	error = ls->ls_members_result;
 
651	if (error)
652		goto out_members;
653
654	dlm_create_debug_file(ls);
655
656	log_rinfo(ls, "join complete");
657	*lockspace = ls;
658	return 0;
659
660 out_members:
661	do_uevent(ls, 0);
662	dlm_clear_members(ls);
663	kfree(ls->ls_node_array);
664 out_recoverd:
665	dlm_recoverd_stop(ls);
666 out_callback:
667	dlm_callback_stop(ls);
668 out_delist:
669	spin_lock(&lslist_lock);
670	list_del(&ls->ls_list);
671	spin_unlock(&lslist_lock);
672	idr_destroy(&ls->ls_recover_idr);
673	kfree(ls->ls_recover_buf);
674 out_lkbidr:
675	idr_destroy(&ls->ls_lkbidr);
676	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
677		if (ls->ls_remove_names[i])
678			kfree(ls->ls_remove_names[i]);
679	}
680 out_rsbtbl:
681	vfree(ls->ls_rsbtbl);
682 out_lsfree:
683	if (do_unreg)
684		kobject_put(&ls->ls_kobj);
685	else
686		kfree(ls);
687 out:
688	module_put(THIS_MODULE);
689	return error;
690}
691
692int dlm_new_lockspace(const char *name, const char *cluster,
693		      uint32_t flags, int lvblen,
694		      const struct dlm_lockspace_ops *ops, void *ops_arg,
695		      int *ops_result, dlm_lockspace_t **lockspace)
 
696{
697	int error = 0;
698
699	mutex_lock(&ls_lock);
700	if (!ls_count)
701		error = threads_start();
702	if (error)
703		goto out;
704
705	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
706			      ops_result, lockspace);
707	if (!error)
708		ls_count++;
709	if (error > 0)
710		error = 0;
711	if (!ls_count)
712		threads_stop();
 
 
713 out:
714	mutex_unlock(&ls_lock);
715	return error;
716}
717
718static int lkb_idr_is_local(int id, void *p, void *data)
719{
720	struct dlm_lkb *lkb = p;
721
722	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
723}
724
725static int lkb_idr_is_any(int id, void *p, void *data)
726{
727	return 1;
728}
729
730static int lkb_idr_free(int id, void *p, void *data)
 
731{
732	struct dlm_lkb *lkb = p;
733
734	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735		dlm_free_lvb(lkb->lkb_lvbptr);
736
737	dlm_free_lkb(lkb);
738	return 0;
739}
740
741/* NOTE: We check the lkbidr here rather than the resource table.
742   This is because there may be LKBs queued as ASTs that have been unlinked
743   from their RSBs and are pending deletion once the AST has been delivered */
744
745static int lockspace_busy(struct dlm_ls *ls, int force)
746{
747	int rv;
 
 
748
749	spin_lock(&ls->ls_lkbidr_spin);
750	if (force == 0) {
751		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
 
 
 
752	} else if (force == 1) {
753		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
 
 
 
 
 
 
754	} else {
755		rv = 0;
756	}
757	spin_unlock(&ls->ls_lkbidr_spin);
758	return rv;
759}
760
761static int release_lockspace(struct dlm_ls *ls, int force)
762{
763	struct dlm_rsb *rsb;
764	struct rb_node *n;
765	int i, busy, rv;
766
767	busy = lockspace_busy(ls, force);
768
769	spin_lock(&lslist_lock);
770	if (ls->ls_create_count == 1) {
771		if (busy) {
772			rv = -EBUSY;
773		} else {
774			/* remove_lockspace takes ls off lslist */
775			ls->ls_create_count = 0;
776			rv = 0;
777		}
778	} else if (ls->ls_create_count > 1) {
779		rv = --ls->ls_create_count;
780	} else {
781		rv = -EINVAL;
782	}
783	spin_unlock(&lslist_lock);
784
785	if (rv) {
786		log_debug(ls, "release_lockspace no remove %d", rv);
787		return rv;
788	}
789
 
 
 
790	dlm_device_deregister(ls);
791
792	if (force < 3 && dlm_user_daemon_available())
793		do_uevent(ls, 0);
794
795	dlm_recoverd_stop(ls);
796
797	dlm_callback_stop(ls);
798
799	remove_lockspace(ls);
800
801	dlm_delete_debug_file(ls);
802
803	kfree(ls->ls_recover_buf);
804
805	/*
806	 * Free all lkb's in idr
807	 */
 
 
808
809	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
810	idr_destroy(&ls->ls_lkbidr);
811
812	/*
813	 * Free all rsb's on rsbtbl[] lists
814	 */
815
816	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
817		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
818			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
819			rb_erase(n, &ls->ls_rsbtbl[i].keep);
820			dlm_free_rsb(rsb);
821		}
822
823		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
824			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
825			rb_erase(n, &ls->ls_rsbtbl[i].toss);
826			dlm_free_rsb(rsb);
827		}
828	}
829
830	vfree(ls->ls_rsbtbl);
831
832	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
833		kfree(ls->ls_remove_names[i]);
834
835	while (!list_empty(&ls->ls_new_rsb)) {
836		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
837				       res_hashchain);
838		list_del(&rsb->res_hashchain);
839		dlm_free_rsb(rsb);
840	}
841
842	/*
843	 * Free structures on any other lists
844	 */
845
846	dlm_purge_requestqueue(ls);
847	kfree(ls->ls_recover_args);
848	dlm_clear_members(ls);
849	dlm_clear_members_gone(ls);
850	kfree(ls->ls_node_array);
851	log_rinfo(ls, "release_lockspace final free");
852	kobject_put(&ls->ls_kobj);
853	/* The ls structure will be freed when the kobject is done with */
854
 
 
 
 
855	module_put(THIS_MODULE);
856	return 0;
857}
858
859/*
860 * Called when a system has released all its locks and is not going to use the
861 * lockspace any longer.  We free everything we're managing for this lockspace.
862 * Remaining nodes will go through the recovery process as if we'd died.  The
863 * lockspace must continue to function as usual, participating in recoveries,
864 * until this returns.
865 *
866 * Force has 4 possible values:
867 * 0 - don't destroy locksapce if it has any LKBs
868 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
869 * 2 - destroy lockspace regardless of LKBs
870 * 3 - destroy lockspace as part of a forced shutdown
871 */
872
873int dlm_release_lockspace(void *lockspace, int force)
874{
875	struct dlm_ls *ls;
876	int error;
877
878	ls = dlm_find_lockspace_local(lockspace);
879	if (!ls)
880		return -EINVAL;
881	dlm_put_lockspace(ls);
882
883	mutex_lock(&ls_lock);
884	error = release_lockspace(ls, force);
885	if (!error)
886		ls_count--;
887	if (!ls_count)
888		threads_stop();
889	mutex_unlock(&ls_lock);
890
891	return error;
892}
893
894void dlm_stop_lockspaces(void)
895{
896	struct dlm_ls *ls;
897	int count;
898
899 restart:
900	count = 0;
901	spin_lock(&lslist_lock);
902	list_for_each_entry(ls, &lslist, ls_list) {
903		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
904			count++;
905			continue;
906		}
907		spin_unlock(&lslist_lock);
908		log_error(ls, "no userland control daemon, stopping lockspace");
909		dlm_ls_stop(ls);
910		goto restart;
911	}
912	spin_unlock(&lslist_lock);
913
914	if (count)
915		log_print("dlm user daemon left %d lockspaces", count);
916}
917
v6.13.7
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3*******************************************************************************
  4**
  5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  6**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  7**
 
 
 
  8**
  9*******************************************************************************
 10******************************************************************************/
 11
 12#include <linux/module.h>
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 19#include "midcomms.h"
 20#include "config.h"
 21#include "memory.h"
 22#include "lock.h"
 23#include "recover.h"
 24#include "requestqueue.h"
 25#include "user.h"
 26#include "ast.h"
 27
 28static int			ls_count;
 29static struct mutex		ls_lock;
 30static struct list_head		lslist;
 31static spinlock_t		lslist_lock;
 
 
 32
 33static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 34{
 35	ssize_t ret = len;
 36	int n;
 37	int rc = kstrtoint(buf, 0, &n);
 38
 39	if (rc)
 40		return rc;
 41	ls = dlm_find_lockspace_local(ls);
 42	if (!ls)
 43		return -EINVAL;
 44
 45	switch (n) {
 46	case 0:
 47		dlm_ls_stop(ls);
 48		break;
 49	case 1:
 50		dlm_ls_start(ls);
 51		break;
 52	default:
 53		ret = -EINVAL;
 54	}
 55	dlm_put_lockspace(ls);
 56	return ret;
 57}
 58
 59static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 60{
 61	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 62
 63	if (rc)
 64		return rc;
 65	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 66	wake_up(&ls->ls_uevent_wait);
 67	return len;
 68}
 69
 70static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 71{
 72	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 73}
 74
 75static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 76{
 77	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 78
 79	if (rc)
 80		return rc;
 81	return len;
 82}
 83
 84static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 85{
 86	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 87}
 88
 89static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 90{
 91	int val;
 92	int rc = kstrtoint(buf, 0, &val);
 93
 94	if (rc)
 95		return rc;
 96	if (val == 1)
 97		set_bit(LSFL_NODIR, &ls->ls_flags);
 98	return len;
 99}
100
101static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102{
103	uint32_t status = dlm_recover_status(ls);
104	return snprintf(buf, PAGE_SIZE, "%x\n", status);
105}
106
107static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108{
109	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110}
111
112struct dlm_attr {
113	struct attribute attr;
114	ssize_t (*show)(struct dlm_ls *, char *);
115	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116};
117
118static struct dlm_attr dlm_attr_control = {
119	.attr  = {.name = "control", .mode = S_IWUSR},
120	.store = dlm_control_store
121};
122
123static struct dlm_attr dlm_attr_event = {
124	.attr  = {.name = "event_done", .mode = S_IWUSR},
125	.store = dlm_event_store
126};
127
128static struct dlm_attr dlm_attr_id = {
129	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130	.show  = dlm_id_show,
131	.store = dlm_id_store
132};
133
134static struct dlm_attr dlm_attr_nodir = {
135	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136	.show  = dlm_nodir_show,
137	.store = dlm_nodir_store
138};
139
140static struct dlm_attr dlm_attr_recover_status = {
141	.attr  = {.name = "recover_status", .mode = S_IRUGO},
142	.show  = dlm_recover_status_show
143};
144
145static struct dlm_attr dlm_attr_recover_nodeid = {
146	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147	.show  = dlm_recover_nodeid_show
148};
149
150static struct attribute *dlm_attrs[] = {
151	&dlm_attr_control.attr,
152	&dlm_attr_event.attr,
153	&dlm_attr_id.attr,
154	&dlm_attr_nodir.attr,
155	&dlm_attr_recover_status.attr,
156	&dlm_attr_recover_nodeid.attr,
157	NULL,
158};
159ATTRIBUTE_GROUPS(dlm);
160
161static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162			     char *buf)
163{
164	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166	return a->show ? a->show(ls, buf) : 0;
167}
168
169static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170			      const char *buf, size_t len)
171{
172	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174	return a->store ? a->store(ls, buf, len) : len;
175}
176
 
 
 
 
 
 
177static const struct sysfs_ops dlm_attr_ops = {
178	.show  = dlm_attr_show,
179	.store = dlm_attr_store,
180};
181
182static struct kobj_type dlm_ktype = {
183	.default_groups = dlm_groups,
184	.sysfs_ops     = &dlm_attr_ops,
 
185};
186
187static struct kset *dlm_kset;
188
189static int do_uevent(struct dlm_ls *ls, int in)
190{
 
 
191	if (in)
192		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
193	else
194		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
195
196	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
197
198	/* dlm_controld will see the uevent, do the necessary group management
199	   and then write to sysfs to wake us */
200
201	wait_event(ls->ls_uevent_wait,
202		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
 
 
203
204	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
 
205
206	return ls->ls_uevent_result;
 
 
 
 
 
207}
208
209static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
 
210{
211	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
212
213	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
214	return 0;
215}
216
217static const struct kset_uevent_ops dlm_uevent_ops = {
218	.uevent = dlm_uevent,
219};
220
221int __init dlm_lockspace_init(void)
222{
223	ls_count = 0;
224	mutex_init(&ls_lock);
225	INIT_LIST_HEAD(&lslist);
226	spin_lock_init(&lslist_lock);
227
228	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
229	if (!dlm_kset) {
230		printk(KERN_WARNING "%s: can not create kset\n", __func__);
231		return -ENOMEM;
232	}
233	return 0;
234}
235
236void dlm_lockspace_exit(void)
237{
238	kset_unregister(dlm_kset);
239}
240
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
241struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
242{
243	struct dlm_ls *ls;
244
245	spin_lock_bh(&lslist_lock);
246
247	list_for_each_entry(ls, &lslist, ls_list) {
248		if (ls->ls_global_id == id) {
249			atomic_inc(&ls->ls_count);
250			goto out;
251		}
252	}
253	ls = NULL;
254 out:
255	spin_unlock_bh(&lslist_lock);
256	return ls;
257}
258
259struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
260{
261	struct dlm_ls *ls = lockspace;
262
263	atomic_inc(&ls->ls_count);
 
 
 
 
 
 
 
 
 
264	return ls;
265}
266
267struct dlm_ls *dlm_find_lockspace_device(int minor)
268{
269	struct dlm_ls *ls;
270
271	spin_lock_bh(&lslist_lock);
272	list_for_each_entry(ls, &lslist, ls_list) {
273		if (ls->ls_device.minor == minor) {
274			atomic_inc(&ls->ls_count);
275			goto out;
276		}
277	}
278	ls = NULL;
279 out:
280	spin_unlock_bh(&lslist_lock);
281	return ls;
282}
283
284void dlm_put_lockspace(struct dlm_ls *ls)
285{
286	if (atomic_dec_and_test(&ls->ls_count))
287		wake_up(&ls->ls_count_wait);
 
288}
289
290static void remove_lockspace(struct dlm_ls *ls)
291{
292retry:
293	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
294
295	spin_lock_bh(&lslist_lock);
296	if (atomic_read(&ls->ls_count) != 0) {
297		spin_unlock_bh(&lslist_lock);
298		goto retry;
 
 
 
299	}
300
301	WARN_ON(ls->ls_create_count != 0);
302	list_del(&ls->ls_list);
303	spin_unlock_bh(&lslist_lock);
304}
305
306static int threads_start(void)
307{
308	int error;
309
 
 
 
 
 
 
310	/* Thread for sending/receiving messages for all lockspace's */
311	error = dlm_midcomms_start();
312	if (error)
313		log_print("cannot start dlm midcomms %d", error);
314
315	return error;
316}
317
318static int lkb_idr_free(struct dlm_lkb *lkb)
319{
320	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
321		dlm_free_lvb(lkb->lkb_lvbptr);
322
323	dlm_free_lkb(lkb);
324	return 0;
325}
326
327static void rhash_free_rsb(void *ptr, void *arg)
328{
329	struct dlm_rsb *rsb = ptr;
330
331	dlm_free_rsb(rsb);
332}
333
334static void free_lockspace(struct work_struct *work)
335{
336	struct dlm_ls *ls  = container_of(work, struct dlm_ls, ls_free_work);
337	struct dlm_lkb *lkb;
338	unsigned long id;
339
340	/*
341	 * Free all lkb's in xa
342	 */
343	xa_for_each(&ls->ls_lkbxa, id, lkb) {
344		lkb_idr_free(lkb);
345	}
346	xa_destroy(&ls->ls_lkbxa);
347
348	/*
349	 * Free all rsb's on rsbtbl
350	 */
351	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
352
353	kfree(ls);
354}
355
356static int new_lockspace(const char *name, const char *cluster,
357			 uint32_t flags, int lvblen,
358			 const struct dlm_lockspace_ops *ops, void *ops_arg,
359			 int *ops_result, dlm_lockspace_t **lockspace)
360{
361	struct dlm_ls *ls;
 
 
362	int namelen = strlen(name);
363	int error;
364
365	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
366		return -EINVAL;
367
368	if (lvblen % 8)
369		return -EINVAL;
370
371	if (!try_module_get(THIS_MODULE))
372		return -EINVAL;
373
374	if (!dlm_user_daemon_available()) {
375		log_print("dlm user daemon not available");
376		error = -EUNATCH;
377		goto out;
378	}
379
380	if (ops && ops_result) {
381	       	if (!dlm_config.ci_recover_callbacks)
382			*ops_result = -EOPNOTSUPP;
383		else
384			*ops_result = 0;
385	}
386
387	if (!cluster)
388		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389			  dlm_config.ci_cluster_name);
390
391	if (dlm_config.ci_recover_callbacks && cluster &&
392	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
393		log_print("dlm cluster name '%s' does not match "
394			  "the application cluster name '%s'",
395			  dlm_config.ci_cluster_name, cluster);
396		error = -EBADR;
397		goto out;
398	}
399
400	error = 0;
401
402	spin_lock_bh(&lslist_lock);
403	list_for_each_entry(ls, &lslist, ls_list) {
404		WARN_ON(ls->ls_create_count <= 0);
405		if (ls->ls_namelen != namelen)
406			continue;
407		if (memcmp(ls->ls_name, name, namelen))
408			continue;
409		if (flags & DLM_LSFL_NEWEXCL) {
410			error = -EEXIST;
411			break;
412		}
413		ls->ls_create_count++;
414		*lockspace = ls;
415		error = 1;
416		break;
417	}
418	spin_unlock_bh(&lslist_lock);
419
420	if (error)
421		goto out;
422
423	error = -ENOMEM;
424
425	ls = kzalloc(sizeof(*ls), GFP_NOFS);
426	if (!ls)
427		goto out;
428	memcpy(ls->ls_name, name, namelen);
429	ls->ls_namelen = namelen;
430	ls->ls_lvblen = lvblen;
431	atomic_set(&ls->ls_count, 0);
432	init_waitqueue_head(&ls->ls_count_wait);
433	ls->ls_flags = 0;
 
434
435	if (ops && dlm_config.ci_recover_callbacks) {
436		ls->ls_ops = ops;
437		ls->ls_ops_arg = ops_arg;
438	}
439
440	if (flags & DLM_LSFL_SOFTIRQ)
441		set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
442
443	/* ls_exflags are forced to match among nodes, and we don't
444	 * need to require all nodes to have some flags set
445	 */
446	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
447				    DLM_LSFL_SOFTIRQ));
448
449	INIT_LIST_HEAD(&ls->ls_slow_inactive);
450	INIT_LIST_HEAD(&ls->ls_slow_active);
451	rwlock_init(&ls->ls_rsbtbl_lock);
452
453	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
454	if (error)
455		goto out_lsfree;
 
 
 
 
 
456
457	xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
458	rwlock_init(&ls->ls_lkbxa_lock);
 
 
 
 
 
 
 
 
 
459
460	INIT_LIST_HEAD(&ls->ls_waiters);
461	spin_lock_init(&ls->ls_waiters_lock);
462	INIT_LIST_HEAD(&ls->ls_orphans);
463	spin_lock_init(&ls->ls_orphans_lock);
 
 
 
 
 
464
465	INIT_LIST_HEAD(&ls->ls_nodes);
466	INIT_LIST_HEAD(&ls->ls_nodes_gone);
467	ls->ls_num_nodes = 0;
468	ls->ls_low_nodeid = 0;
469	ls->ls_total_weight = 0;
470	ls->ls_node_array = NULL;
471
472	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
473	ls->ls_local_rsb.res_ls = ls;
474
475	ls->ls_debug_rsb_dentry = NULL;
476	ls->ls_debug_waiters_dentry = NULL;
477
478	init_waitqueue_head(&ls->ls_uevent_wait);
479	ls->ls_uevent_result = 0;
480	init_completion(&ls->ls_recovery_done);
481	ls->ls_recovery_result = -1;
482
483	spin_lock_init(&ls->ls_cb_lock);
484	INIT_LIST_HEAD(&ls->ls_cb_delay);
485
486	INIT_WORK(&ls->ls_free_work, free_lockspace);
487
488	ls->ls_recoverd_task = NULL;
489	mutex_init(&ls->ls_recoverd_active);
490	spin_lock_init(&ls->ls_recover_lock);
491	spin_lock_init(&ls->ls_rcom_spin);
492	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
493	ls->ls_recover_status = 0;
494	ls->ls_recover_seq = get_random_u64();
495	ls->ls_recover_args = NULL;
496	init_rwsem(&ls->ls_in_recovery);
497	rwlock_init(&ls->ls_recv_active);
498	INIT_LIST_HEAD(&ls->ls_requestqueue);
499	rwlock_init(&ls->ls_requestqueue_lock);
500	spin_lock_init(&ls->ls_clear_proc_locks);
501
502	/* Due backwards compatibility with 3.1 we need to use maximum
503	 * possible dlm message size to be sure the message will fit and
504	 * not having out of bounds issues. However on sending side 3.2
505	 * might send less.
506	 */
507	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
508	if (!ls->ls_recover_buf) {
509		error = -ENOMEM;
510		goto out_lkbxa;
511	}
512
513	ls->ls_slot = 0;
514	ls->ls_num_slots = 0;
515	ls->ls_slots_size = 0;
516	ls->ls_slots = NULL;
517
518	INIT_LIST_HEAD(&ls->ls_recover_list);
519	spin_lock_init(&ls->ls_recover_list_lock);
520	xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
521	spin_lock_init(&ls->ls_recover_xa_lock);
522	ls->ls_recover_list_count = 0;
 
523	init_waitqueue_head(&ls->ls_wait_general);
524	INIT_LIST_HEAD(&ls->ls_masters_list);
525	rwlock_init(&ls->ls_masters_lock);
526	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
527	rwlock_init(&ls->ls_dir_dump_lock);
528
529	INIT_LIST_HEAD(&ls->ls_scan_list);
530	spin_lock_init(&ls->ls_scan_lock);
531	timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
532
533	spin_lock_bh(&lslist_lock);
534	ls->ls_create_count = 1;
535	list_add(&ls->ls_list, &lslist);
536	spin_unlock_bh(&lslist_lock);
537
538	if (flags & DLM_LSFL_FS)
539		set_bit(LSFL_FS, &ls->ls_flags);
540
541	error = dlm_callback_start(ls);
542	if (error) {
543		log_error(ls, "can't start dlm_callback %d", error);
544		goto out_delist;
545	}
546
547	init_waitqueue_head(&ls->ls_recover_lock_wait);
548
549	/*
550	 * Once started, dlm_recoverd first looks for ls in lslist, then
551	 * initializes ls_in_recovery as locked in "down" mode.  We need
552	 * to wait for the wakeup from dlm_recoverd because in_recovery
553	 * has to start out in down mode.
554	 */
555
556	error = dlm_recoverd_start(ls);
557	if (error) {
558		log_error(ls, "can't start dlm_recoverd %d", error);
559		goto out_callback;
560	}
561
562	wait_event(ls->ls_recover_lock_wait,
563		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
564
565	ls->ls_kobj.kset = dlm_kset;
566	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
567				     "%s", ls->ls_name);
568	if (error)
569		goto out_recoverd;
570	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
571
 
 
 
572	/* This uevent triggers dlm_controld in userspace to add us to the
573	   group of nodes that are members of this lockspace (managed by the
574	   cluster infrastructure.)  Once it's done that, it tells us who the
575	   current lockspace members are (via configfs) and then tells the
576	   lockspace to start running (via sysfs) in dlm_ls_start(). */
577
578	error = do_uevent(ls, 1);
579	if (error)
580		goto out_recoverd;
581
582	/* wait until recovery is successful or failed */
583	wait_for_completion(&ls->ls_recovery_done);
584	error = ls->ls_recovery_result;
585	if (error)
586		goto out_members;
587
588	dlm_create_debug_file(ls);
589
590	log_rinfo(ls, "join complete");
591	*lockspace = ls;
592	return 0;
593
594 out_members:
595	do_uevent(ls, 0);
596	dlm_clear_members(ls);
597	kfree(ls->ls_node_array);
598 out_recoverd:
599	dlm_recoverd_stop(ls);
600 out_callback:
601	dlm_callback_stop(ls);
602 out_delist:
603	spin_lock_bh(&lslist_lock);
604	list_del(&ls->ls_list);
605	spin_unlock_bh(&lslist_lock);
606	xa_destroy(&ls->ls_recover_xa);
607	kfree(ls->ls_recover_buf);
608 out_lkbxa:
609	xa_destroy(&ls->ls_lkbxa);
610	rhashtable_destroy(&ls->ls_rsbtbl);
 
 
 
 
 
611 out_lsfree:
612	kobject_put(&ls->ls_kobj);
613	kfree(ls);
 
 
614 out:
615	module_put(THIS_MODULE);
616	return error;
617}
618
619static int __dlm_new_lockspace(const char *name, const char *cluster,
620			       uint32_t flags, int lvblen,
621			       const struct dlm_lockspace_ops *ops,
622			       void *ops_arg, int *ops_result,
623			       dlm_lockspace_t **lockspace)
624{
625	int error = 0;
626
627	mutex_lock(&ls_lock);
628	if (!ls_count)
629		error = threads_start();
630	if (error)
631		goto out;
632
633	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
634			      ops_result, lockspace);
635	if (!error)
636		ls_count++;
637	if (error > 0)
638		error = 0;
639	if (!ls_count) {
640		dlm_midcomms_shutdown();
641		dlm_midcomms_stop();
642	}
643 out:
644	mutex_unlock(&ls_lock);
645	return error;
646}
647
648int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
649		      int lvblen, const struct dlm_lockspace_ops *ops,
650		      void *ops_arg, int *ops_result,
651		      dlm_lockspace_t **lockspace)
652{
653	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
654				   ops, ops_arg, ops_result, lockspace);
655}
656
657int dlm_new_user_lockspace(const char *name, const char *cluster,
658			   uint32_t flags, int lvblen,
659			   const struct dlm_lockspace_ops *ops,
660			   void *ops_arg, int *ops_result,
661			   dlm_lockspace_t **lockspace)
662{
663	if (flags & DLM_LSFL_SOFTIRQ)
664		return -EINVAL;
 
 
665
666	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
667				   ops_arg, ops_result, lockspace);
668}
669
670/* NOTE: We check the lkbxa here rather than the resource table.
671   This is because there may be LKBs queued as ASTs that have been unlinked
672   from their RSBs and are pending deletion once the AST has been delivered */
673
674static int lockspace_busy(struct dlm_ls *ls, int force)
675{
676	struct dlm_lkb *lkb;
677	unsigned long id;
678	int rv = 0;
679
680	read_lock_bh(&ls->ls_lkbxa_lock);
681	if (force == 0) {
682		xa_for_each(&ls->ls_lkbxa, id, lkb) {
683			rv = 1;
684			break;
685		}
686	} else if (force == 1) {
687		xa_for_each(&ls->ls_lkbxa, id, lkb) {
688			if (lkb->lkb_nodeid == 0 &&
689			    lkb->lkb_grmode != DLM_LOCK_IV) {
690				rv = 1;
691				break;
692			}
693		}
694	} else {
695		rv = 0;
696	}
697	read_unlock_bh(&ls->ls_lkbxa_lock);
698	return rv;
699}
700
701static int release_lockspace(struct dlm_ls *ls, int force)
702{
703	int busy, rv;
 
 
704
705	busy = lockspace_busy(ls, force);
706
707	spin_lock_bh(&lslist_lock);
708	if (ls->ls_create_count == 1) {
709		if (busy) {
710			rv = -EBUSY;
711		} else {
712			/* remove_lockspace takes ls off lslist */
713			ls->ls_create_count = 0;
714			rv = 0;
715		}
716	} else if (ls->ls_create_count > 1) {
717		rv = --ls->ls_create_count;
718	} else {
719		rv = -EINVAL;
720	}
721	spin_unlock_bh(&lslist_lock);
722
723	if (rv) {
724		log_debug(ls, "release_lockspace no remove %d", rv);
725		return rv;
726	}
727
728	if (ls_count == 1)
729		dlm_midcomms_version_wait();
730
731	dlm_device_deregister(ls);
732
733	if (force < 3 && dlm_user_daemon_available())
734		do_uevent(ls, 0);
735
736	dlm_recoverd_stop(ls);
737
738	/* clear the LSFL_RUNNING flag to fast up
739	 * time_shutdown_sync(), we don't care anymore
 
 
 
 
 
 
 
 
740	 */
741	clear_bit(LSFL_RUNNING, &ls->ls_flags);
742	timer_shutdown_sync(&ls->ls_scan_timer);
743
744	if (ls_count == 1) {
745		dlm_clear_members(ls);
746		dlm_midcomms_shutdown();
747	}
 
 
748
749	dlm_callback_stop(ls);
 
 
 
 
 
750
751	remove_lockspace(ls);
 
 
 
 
 
752
753	dlm_delete_debug_file(ls);
754
755	kobject_put(&ls->ls_kobj);
 
756
757	xa_destroy(&ls->ls_recover_xa);
758	kfree(ls->ls_recover_buf);
 
 
 
 
759
760	/*
761	 * Free structures on any other lists
762	 */
763
764	dlm_purge_requestqueue(ls);
765	kfree(ls->ls_recover_args);
766	dlm_clear_members(ls);
767	dlm_clear_members_gone(ls);
768	kfree(ls->ls_node_array);
 
 
 
769
770	log_rinfo(ls, "%s final free", __func__);
771
772	/* delayed free of data structures see free_lockspace() */
773	queue_work(dlm_wq, &ls->ls_free_work);
774	module_put(THIS_MODULE);
775	return 0;
776}
777
778/*
779 * Called when a system has released all its locks and is not going to use the
780 * lockspace any longer.  We free everything we're managing for this lockspace.
781 * Remaining nodes will go through the recovery process as if we'd died.  The
782 * lockspace must continue to function as usual, participating in recoveries,
783 * until this returns.
784 *
785 * Force has 4 possible values:
786 * 0 - don't destroy lockspace if it has any LKBs
787 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788 * 2 - destroy lockspace regardless of LKBs
789 * 3 - destroy lockspace as part of a forced shutdown
790 */
791
792int dlm_release_lockspace(void *lockspace, int force)
793{
794	struct dlm_ls *ls;
795	int error;
796
797	ls = dlm_find_lockspace_local(lockspace);
798	if (!ls)
799		return -EINVAL;
800	dlm_put_lockspace(ls);
801
802	mutex_lock(&ls_lock);
803	error = release_lockspace(ls, force);
804	if (!error)
805		ls_count--;
806	if (!ls_count)
807		dlm_midcomms_stop();
808	mutex_unlock(&ls_lock);
809
810	return error;
811}
812
813void dlm_stop_lockspaces(void)
814{
815	struct dlm_ls *ls;
816	int count;
817
818 restart:
819	count = 0;
820	spin_lock_bh(&lslist_lock);
821	list_for_each_entry(ls, &lslist, ls_list) {
822		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
823			count++;
824			continue;
825		}
826		spin_unlock_bh(&lslist_lock);
827		log_error(ls, "no userland control daemon, stopping lockspace");
828		dlm_ls_stop(ls);
829		goto restart;
830	}
831	spin_unlock_bh(&lslist_lock);
832
833	if (count)
834		log_print("dlm user daemon left %d lockspaces", count);
835}