Linux Audio

Check our new training course

Loading...
v4.6
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 19#include "lowcomms.h"
 20#include "config.h"
 21#include "memory.h"
 22#include "lock.h"
 23#include "recover.h"
 24#include "requestqueue.h"
 25#include "user.h"
 26#include "ast.h"
 27
 28static int			ls_count;
 29static struct mutex		ls_lock;
 30static struct list_head		lslist;
 31static spinlock_t		lslist_lock;
 32static struct task_struct *	scand_task;
 33
 34
 35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 36{
 37	ssize_t ret = len;
 38	int n;
 39	int rc = kstrtoint(buf, 0, &n);
 40
 41	if (rc)
 42		return rc;
 43	ls = dlm_find_lockspace_local(ls->ls_local_handle);
 44	if (!ls)
 45		return -EINVAL;
 46
 47	switch (n) {
 48	case 0:
 49		dlm_ls_stop(ls);
 50		break;
 51	case 1:
 52		dlm_ls_start(ls);
 53		break;
 54	default:
 55		ret = -EINVAL;
 56	}
 57	dlm_put_lockspace(ls);
 58	return ret;
 59}
 60
 61static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 62{
 63	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
 64
 65	if (rc)
 66		return rc;
 67	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 68	wake_up(&ls->ls_uevent_wait);
 69	return len;
 70}
 71
 72static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 73{
 74	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 75}
 76
 77static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 78{
 79	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
 80
 81	if (rc)
 82		return rc;
 83	return len;
 84}
 85
 86static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
 87{
 88	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
 89}
 90
 91static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
 92{
 93	int val;
 94	int rc = kstrtoint(buf, 0, &val);
 95
 96	if (rc)
 97		return rc;
 98	if (val == 1)
 99		set_bit(LSFL_NODIR, &ls->ls_flags);
100	return len;
101}
102
103static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104{
105	uint32_t status = dlm_recover_status(ls);
106	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107}
108
109static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110{
111	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112}
113
114struct dlm_attr {
115	struct attribute attr;
116	ssize_t (*show)(struct dlm_ls *, char *);
117	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118};
119
120static struct dlm_attr dlm_attr_control = {
121	.attr  = {.name = "control", .mode = S_IWUSR},
122	.store = dlm_control_store
123};
124
125static struct dlm_attr dlm_attr_event = {
126	.attr  = {.name = "event_done", .mode = S_IWUSR},
127	.store = dlm_event_store
128};
129
130static struct dlm_attr dlm_attr_id = {
131	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132	.show  = dlm_id_show,
133	.store = dlm_id_store
134};
135
136static struct dlm_attr dlm_attr_nodir = {
137	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138	.show  = dlm_nodir_show,
139	.store = dlm_nodir_store
140};
141
142static struct dlm_attr dlm_attr_recover_status = {
143	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144	.show  = dlm_recover_status_show
145};
146
147static struct dlm_attr dlm_attr_recover_nodeid = {
148	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149	.show  = dlm_recover_nodeid_show
150};
151
152static struct attribute *dlm_attrs[] = {
153	&dlm_attr_control.attr,
154	&dlm_attr_event.attr,
155	&dlm_attr_id.attr,
156	&dlm_attr_nodir.attr,
157	&dlm_attr_recover_status.attr,
158	&dlm_attr_recover_nodeid.attr,
159	NULL,
160};
161
162static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
163			     char *buf)
164{
165	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
166	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
167	return a->show ? a->show(ls, buf) : 0;
168}
169
170static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
171			      const char *buf, size_t len)
172{
173	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
174	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
175	return a->store ? a->store(ls, buf, len) : len;
176}
177
178static void lockspace_kobj_release(struct kobject *k)
179{
180	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
181	kfree(ls);
182}
183
184static const struct sysfs_ops dlm_attr_ops = {
185	.show  = dlm_attr_show,
186	.store = dlm_attr_store,
187};
188
189static struct kobj_type dlm_ktype = {
190	.default_attrs = dlm_attrs,
191	.sysfs_ops     = &dlm_attr_ops,
192	.release       = lockspace_kobj_release,
193};
194
195static struct kset *dlm_kset;
196
197static int do_uevent(struct dlm_ls *ls, int in)
198{
199	int error;
200
201	if (in)
202		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203	else
204		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208	/* dlm_controld will see the uevent, do the necessary group management
209	   and then write to sysfs to wake us */
210
211	error = wait_event_interruptible(ls->ls_uevent_wait,
212			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213
214	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
215
216	if (error)
217		goto out;
218
219	error = ls->ls_uevent_result;
220 out:
221	if (error)
222		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
223			  error, ls->ls_uevent_result);
224	return error;
225}
226
227static int dlm_uevent(struct kset *kset, struct kobject *kobj,
228		      struct kobj_uevent_env *env)
229{
230	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
231
232	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
233	return 0;
234}
235
236static struct kset_uevent_ops dlm_uevent_ops = {
237	.uevent = dlm_uevent,
238};
239
240int __init dlm_lockspace_init(void)
241{
242	ls_count = 0;
243	mutex_init(&ls_lock);
244	INIT_LIST_HEAD(&lslist);
245	spin_lock_init(&lslist_lock);
246
247	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
248	if (!dlm_kset) {
249		printk(KERN_WARNING "%s: can not create kset\n", __func__);
250		return -ENOMEM;
251	}
252	return 0;
253}
254
255void dlm_lockspace_exit(void)
256{
257	kset_unregister(dlm_kset);
258}
259
260static struct dlm_ls *find_ls_to_scan(void)
261{
262	struct dlm_ls *ls;
263
264	spin_lock(&lslist_lock);
265	list_for_each_entry(ls, &lslist, ls_list) {
266		if (time_after_eq(jiffies, ls->ls_scan_time +
267					    dlm_config.ci_scan_secs * HZ)) {
268			spin_unlock(&lslist_lock);
269			return ls;
270		}
271	}
272	spin_unlock(&lslist_lock);
273	return NULL;
274}
275
276static int dlm_scand(void *data)
277{
278	struct dlm_ls *ls;
279
280	while (!kthread_should_stop()) {
281		ls = find_ls_to_scan();
282		if (ls) {
283			if (dlm_lock_recovery_try(ls)) {
284				ls->ls_scan_time = jiffies;
285				dlm_scan_rsbs(ls);
286				dlm_scan_timeout(ls);
287				dlm_scan_waiters(ls);
288				dlm_unlock_recovery(ls);
289			} else {
290				ls->ls_scan_time += HZ;
291			}
292			continue;
293		}
294		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
295	}
296	return 0;
297}
298
299static int dlm_scand_start(void)
300{
301	struct task_struct *p;
302	int error = 0;
303
304	p = kthread_run(dlm_scand, NULL, "dlm_scand");
305	if (IS_ERR(p))
306		error = PTR_ERR(p);
307	else
308		scand_task = p;
309	return error;
310}
311
312static void dlm_scand_stop(void)
313{
314	kthread_stop(scand_task);
315}
316
317struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
318{
319	struct dlm_ls *ls;
320
321	spin_lock(&lslist_lock);
322
323	list_for_each_entry(ls, &lslist, ls_list) {
324		if (ls->ls_global_id == id) {
325			ls->ls_count++;
326			goto out;
327		}
328	}
329	ls = NULL;
330 out:
331	spin_unlock(&lslist_lock);
332	return ls;
333}
334
335struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
336{
337	struct dlm_ls *ls;
338
339	spin_lock(&lslist_lock);
340	list_for_each_entry(ls, &lslist, ls_list) {
341		if (ls->ls_local_handle == lockspace) {
342			ls->ls_count++;
343			goto out;
344		}
345	}
346	ls = NULL;
347 out:
348	spin_unlock(&lslist_lock);
349	return ls;
350}
351
352struct dlm_ls *dlm_find_lockspace_device(int minor)
353{
354	struct dlm_ls *ls;
355
356	spin_lock(&lslist_lock);
357	list_for_each_entry(ls, &lslist, ls_list) {
358		if (ls->ls_device.minor == minor) {
359			ls->ls_count++;
360			goto out;
361		}
362	}
363	ls = NULL;
364 out:
365	spin_unlock(&lslist_lock);
366	return ls;
367}
368
369void dlm_put_lockspace(struct dlm_ls *ls)
370{
371	spin_lock(&lslist_lock);
372	ls->ls_count--;
373	spin_unlock(&lslist_lock);
374}
375
376static void remove_lockspace(struct dlm_ls *ls)
377{
378	for (;;) {
379		spin_lock(&lslist_lock);
380		if (ls->ls_count == 0) {
381			WARN_ON(ls->ls_create_count != 0);
382			list_del(&ls->ls_list);
383			spin_unlock(&lslist_lock);
384			return;
385		}
386		spin_unlock(&lslist_lock);
387		ssleep(1);
388	}
389}
390
391static int threads_start(void)
392{
393	int error;
394
395	error = dlm_scand_start();
396	if (error) {
397		log_print("cannot start dlm_scand thread %d", error);
398		goto fail;
399	}
400
401	/* Thread for sending/receiving messages for all lockspace's */
402	error = dlm_lowcomms_start();
403	if (error) {
404		log_print("cannot start dlm lowcomms %d", error);
405		goto scand_fail;
406	}
407
408	return 0;
409
410 scand_fail:
411	dlm_scand_stop();
412 fail:
413	return error;
414}
415
416static void threads_stop(void)
417{
418	dlm_scand_stop();
419	dlm_lowcomms_stop();
420}
421
422static int new_lockspace(const char *name, const char *cluster,
423			 uint32_t flags, int lvblen,
424			 const struct dlm_lockspace_ops *ops, void *ops_arg,
425			 int *ops_result, dlm_lockspace_t **lockspace)
426{
427	struct dlm_ls *ls;
428	int i, size, error;
429	int do_unreg = 0;
430	int namelen = strlen(name);
431
432	if (namelen > DLM_LOCKSPACE_LEN)
433		return -EINVAL;
434
435	if (!lvblen || (lvblen % 8))
436		return -EINVAL;
437
438	if (!try_module_get(THIS_MODULE))
439		return -EINVAL;
440
441	if (!dlm_user_daemon_available()) {
442		log_print("dlm user daemon not available");
443		error = -EUNATCH;
444		goto out;
445	}
446
447	if (ops && ops_result) {
448	       	if (!dlm_config.ci_recover_callbacks)
449			*ops_result = -EOPNOTSUPP;
450		else
451			*ops_result = 0;
452	}
453
454	if (dlm_config.ci_recover_callbacks && cluster &&
455	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
456		log_print("dlm cluster name %s mismatch %s",
457			  dlm_config.ci_cluster_name, cluster);
458		error = -EBADR;
459		goto out;
460	}
461
462	error = 0;
463
464	spin_lock(&lslist_lock);
465	list_for_each_entry(ls, &lslist, ls_list) {
466		WARN_ON(ls->ls_create_count <= 0);
467		if (ls->ls_namelen != namelen)
468			continue;
469		if (memcmp(ls->ls_name, name, namelen))
470			continue;
471		if (flags & DLM_LSFL_NEWEXCL) {
472			error = -EEXIST;
473			break;
474		}
475		ls->ls_create_count++;
476		*lockspace = ls;
477		error = 1;
478		break;
479	}
480	spin_unlock(&lslist_lock);
481
482	if (error)
483		goto out;
484
485	error = -ENOMEM;
486
487	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
488	if (!ls)
489		goto out;
490	memcpy(ls->ls_name, name, namelen);
491	ls->ls_namelen = namelen;
492	ls->ls_lvblen = lvblen;
493	ls->ls_count = 0;
494	ls->ls_flags = 0;
495	ls->ls_scan_time = jiffies;
496
497	if (ops && dlm_config.ci_recover_callbacks) {
498		ls->ls_ops = ops;
499		ls->ls_ops_arg = ops_arg;
500	}
501
502	if (flags & DLM_LSFL_TIMEWARN)
503		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
504
505	/* ls_exflags are forced to match among nodes, and we don't
506	   need to require all nodes to have some flags set */
507	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
508				    DLM_LSFL_NEWEXCL));
509
510	size = dlm_config.ci_rsbtbl_size;
511	ls->ls_rsbtbl_size = size;
512
513	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
514	if (!ls->ls_rsbtbl)
515		goto out_lsfree;
516	for (i = 0; i < size; i++) {
517		ls->ls_rsbtbl[i].keep.rb_node = NULL;
518		ls->ls_rsbtbl[i].toss.rb_node = NULL;
519		spin_lock_init(&ls->ls_rsbtbl[i].lock);
520	}
521
522	spin_lock_init(&ls->ls_remove_spin);
523
524	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
525		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
526						 GFP_KERNEL);
527		if (!ls->ls_remove_names[i])
528			goto out_rsbtbl;
529	}
530
531	idr_init(&ls->ls_lkbidr);
532	spin_lock_init(&ls->ls_lkbidr_spin);
533
 
 
 
 
 
 
 
 
 
 
 
534	INIT_LIST_HEAD(&ls->ls_waiters);
535	mutex_init(&ls->ls_waiters_mutex);
536	INIT_LIST_HEAD(&ls->ls_orphans);
537	mutex_init(&ls->ls_orphans_mutex);
538	INIT_LIST_HEAD(&ls->ls_timeout);
539	mutex_init(&ls->ls_timeout_mutex);
540
541	INIT_LIST_HEAD(&ls->ls_new_rsb);
542	spin_lock_init(&ls->ls_new_rsb_spin);
543
544	INIT_LIST_HEAD(&ls->ls_nodes);
545	INIT_LIST_HEAD(&ls->ls_nodes_gone);
546	ls->ls_num_nodes = 0;
547	ls->ls_low_nodeid = 0;
548	ls->ls_total_weight = 0;
549	ls->ls_node_array = NULL;
550
551	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
552	ls->ls_stub_rsb.res_ls = ls;
553
554	ls->ls_debug_rsb_dentry = NULL;
555	ls->ls_debug_waiters_dentry = NULL;
556
557	init_waitqueue_head(&ls->ls_uevent_wait);
558	ls->ls_uevent_result = 0;
559	init_completion(&ls->ls_members_done);
560	ls->ls_members_result = -1;
561
562	mutex_init(&ls->ls_cb_mutex);
563	INIT_LIST_HEAD(&ls->ls_cb_delay);
564
565	ls->ls_recoverd_task = NULL;
566	mutex_init(&ls->ls_recoverd_active);
567	spin_lock_init(&ls->ls_recover_lock);
568	spin_lock_init(&ls->ls_rcom_spin);
569	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
570	ls->ls_recover_status = 0;
571	ls->ls_recover_seq = 0;
572	ls->ls_recover_args = NULL;
573	init_rwsem(&ls->ls_in_recovery);
574	init_rwsem(&ls->ls_recv_active);
575	INIT_LIST_HEAD(&ls->ls_requestqueue);
576	mutex_init(&ls->ls_requestqueue_mutex);
577	mutex_init(&ls->ls_clear_proc_locks);
578
579	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
580	if (!ls->ls_recover_buf)
581		goto out_lkbidr;
582
583	ls->ls_slot = 0;
584	ls->ls_num_slots = 0;
585	ls->ls_slots_size = 0;
586	ls->ls_slots = NULL;
587
588	INIT_LIST_HEAD(&ls->ls_recover_list);
589	spin_lock_init(&ls->ls_recover_list_lock);
590	idr_init(&ls->ls_recover_idr);
591	spin_lock_init(&ls->ls_recover_idr_lock);
592	ls->ls_recover_list_count = 0;
593	ls->ls_local_handle = ls;
594	init_waitqueue_head(&ls->ls_wait_general);
595	INIT_LIST_HEAD(&ls->ls_root_list);
596	init_rwsem(&ls->ls_root_sem);
597
 
 
598	spin_lock(&lslist_lock);
599	ls->ls_create_count = 1;
600	list_add(&ls->ls_list, &lslist);
601	spin_unlock(&lslist_lock);
602
603	if (flags & DLM_LSFL_FS) {
604		error = dlm_callback_start(ls);
605		if (error) {
606			log_error(ls, "can't start dlm_callback %d", error);
607			goto out_delist;
608		}
609	}
610
611	init_waitqueue_head(&ls->ls_recover_lock_wait);
612
613	/*
614	 * Once started, dlm_recoverd first looks for ls in lslist, then
615	 * initializes ls_in_recovery as locked in "down" mode.  We need
616	 * to wait for the wakeup from dlm_recoverd because in_recovery
617	 * has to start out in down mode.
618	 */
619
620	error = dlm_recoverd_start(ls);
621	if (error) {
622		log_error(ls, "can't start dlm_recoverd %d", error);
623		goto out_callback;
624	}
625
626	wait_event(ls->ls_recover_lock_wait,
627		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
628
629	ls->ls_kobj.kset = dlm_kset;
630	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
631				     "%s", ls->ls_name);
632	if (error)
633		goto out_recoverd;
634	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
635
636	/* let kobject handle freeing of ls if there's an error */
637	do_unreg = 1;
638
639	/* This uevent triggers dlm_controld in userspace to add us to the
640	   group of nodes that are members of this lockspace (managed by the
641	   cluster infrastructure.)  Once it's done that, it tells us who the
642	   current lockspace members are (via configfs) and then tells the
643	   lockspace to start running (via sysfs) in dlm_ls_start(). */
644
645	error = do_uevent(ls, 1);
646	if (error)
647		goto out_recoverd;
648
649	wait_for_completion(&ls->ls_members_done);
650	error = ls->ls_members_result;
651	if (error)
652		goto out_members;
653
654	dlm_create_debug_file(ls);
655
656	log_rinfo(ls, "join complete");
657	*lockspace = ls;
658	return 0;
659
660 out_members:
661	do_uevent(ls, 0);
662	dlm_clear_members(ls);
663	kfree(ls->ls_node_array);
664 out_recoverd:
665	dlm_recoverd_stop(ls);
666 out_callback:
667	dlm_callback_stop(ls);
668 out_delist:
669	spin_lock(&lslist_lock);
670	list_del(&ls->ls_list);
671	spin_unlock(&lslist_lock);
672	idr_destroy(&ls->ls_recover_idr);
673	kfree(ls->ls_recover_buf);
674 out_lkbidr:
 
 
675	idr_destroy(&ls->ls_lkbidr);
676	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
677		if (ls->ls_remove_names[i])
678			kfree(ls->ls_remove_names[i]);
679	}
680 out_rsbtbl:
681	vfree(ls->ls_rsbtbl);
682 out_lsfree:
683	if (do_unreg)
684		kobject_put(&ls->ls_kobj);
685	else
686		kfree(ls);
687 out:
688	module_put(THIS_MODULE);
689	return error;
690}
691
692int dlm_new_lockspace(const char *name, const char *cluster,
693		      uint32_t flags, int lvblen,
694		      const struct dlm_lockspace_ops *ops, void *ops_arg,
695		      int *ops_result, dlm_lockspace_t **lockspace)
696{
697	int error = 0;
698
699	mutex_lock(&ls_lock);
700	if (!ls_count)
701		error = threads_start();
702	if (error)
703		goto out;
704
705	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
706			      ops_result, lockspace);
707	if (!error)
708		ls_count++;
709	if (error > 0)
710		error = 0;
711	if (!ls_count)
712		threads_stop();
713 out:
714	mutex_unlock(&ls_lock);
715	return error;
716}
717
718static int lkb_idr_is_local(int id, void *p, void *data)
719{
720	struct dlm_lkb *lkb = p;
721
722	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
 
 
723}
724
725static int lkb_idr_is_any(int id, void *p, void *data)
726{
727	return 1;
728}
729
730static int lkb_idr_free(int id, void *p, void *data)
731{
732	struct dlm_lkb *lkb = p;
733
734	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735		dlm_free_lvb(lkb->lkb_lvbptr);
736
737	dlm_free_lkb(lkb);
738	return 0;
739}
740
741/* NOTE: We check the lkbidr here rather than the resource table.
742   This is because there may be LKBs queued as ASTs that have been unlinked
743   from their RSBs and are pending deletion once the AST has been delivered */
744
745static int lockspace_busy(struct dlm_ls *ls, int force)
746{
747	int rv;
748
749	spin_lock(&ls->ls_lkbidr_spin);
750	if (force == 0) {
751		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
752	} else if (force == 1) {
753		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
754	} else {
755		rv = 0;
756	}
757	spin_unlock(&ls->ls_lkbidr_spin);
758	return rv;
759}
760
761static int release_lockspace(struct dlm_ls *ls, int force)
762{
763	struct dlm_rsb *rsb;
764	struct rb_node *n;
765	int i, busy, rv;
766
767	busy = lockspace_busy(ls, force);
768
769	spin_lock(&lslist_lock);
770	if (ls->ls_create_count == 1) {
771		if (busy) {
772			rv = -EBUSY;
773		} else {
774			/* remove_lockspace takes ls off lslist */
775			ls->ls_create_count = 0;
776			rv = 0;
777		}
778	} else if (ls->ls_create_count > 1) {
779		rv = --ls->ls_create_count;
780	} else {
781		rv = -EINVAL;
782	}
783	spin_unlock(&lslist_lock);
784
785	if (rv) {
786		log_debug(ls, "release_lockspace no remove %d", rv);
787		return rv;
788	}
789
790	dlm_device_deregister(ls);
791
792	if (force < 3 && dlm_user_daemon_available())
793		do_uevent(ls, 0);
794
795	dlm_recoverd_stop(ls);
796
797	dlm_callback_stop(ls);
798
799	remove_lockspace(ls);
800
801	dlm_delete_debug_file(ls);
802
803	kfree(ls->ls_recover_buf);
804
805	/*
 
 
 
 
 
 
 
806	 * Free all lkb's in idr
807	 */
808
809	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
 
810	idr_destroy(&ls->ls_lkbidr);
811
812	/*
813	 * Free all rsb's on rsbtbl[] lists
814	 */
815
816	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
817		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
818			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
819			rb_erase(n, &ls->ls_rsbtbl[i].keep);
 
 
 
820			dlm_free_rsb(rsb);
821		}
822
823		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
824			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
825			rb_erase(n, &ls->ls_rsbtbl[i].toss);
 
 
826			dlm_free_rsb(rsb);
827		}
828	}
829
830	vfree(ls->ls_rsbtbl);
831
832	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
833		kfree(ls->ls_remove_names[i]);
834
835	while (!list_empty(&ls->ls_new_rsb)) {
836		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
837				       res_hashchain);
838		list_del(&rsb->res_hashchain);
839		dlm_free_rsb(rsb);
840	}
841
842	/*
843	 * Free structures on any other lists
844	 */
845
846	dlm_purge_requestqueue(ls);
847	kfree(ls->ls_recover_args);
 
848	dlm_clear_members(ls);
849	dlm_clear_members_gone(ls);
850	kfree(ls->ls_node_array);
851	log_rinfo(ls, "release_lockspace final free");
852	kobject_put(&ls->ls_kobj);
853	/* The ls structure will be freed when the kobject is done with */
854
855	module_put(THIS_MODULE);
856	return 0;
857}
858
859/*
860 * Called when a system has released all its locks and is not going to use the
861 * lockspace any longer.  We free everything we're managing for this lockspace.
862 * Remaining nodes will go through the recovery process as if we'd died.  The
863 * lockspace must continue to function as usual, participating in recoveries,
864 * until this returns.
865 *
866 * Force has 4 possible values:
867 * 0 - don't destroy locksapce if it has any LKBs
868 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
869 * 2 - destroy lockspace regardless of LKBs
870 * 3 - destroy lockspace as part of a forced shutdown
871 */
872
873int dlm_release_lockspace(void *lockspace, int force)
874{
875	struct dlm_ls *ls;
876	int error;
877
878	ls = dlm_find_lockspace_local(lockspace);
879	if (!ls)
880		return -EINVAL;
881	dlm_put_lockspace(ls);
882
883	mutex_lock(&ls_lock);
884	error = release_lockspace(ls, force);
885	if (!error)
886		ls_count--;
887	if (!ls_count)
888		threads_stop();
889	mutex_unlock(&ls_lock);
890
891	return error;
892}
893
894void dlm_stop_lockspaces(void)
895{
896	struct dlm_ls *ls;
897	int count;
898
899 restart:
900	count = 0;
901	spin_lock(&lslist_lock);
902	list_for_each_entry(ls, &lslist, ls_list) {
903		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
904			count++;
905			continue;
906		}
907		spin_unlock(&lslist_lock);
908		log_error(ls, "no userland control daemon, stopping lockspace");
909		dlm_ls_stop(ls);
910		goto restart;
911	}
912	spin_unlock(&lslist_lock);
913
914	if (count)
915		log_print("dlm user daemon left %d lockspaces", count);
916}
917
v3.1
  1/******************************************************************************
  2*******************************************************************************
  3**
  4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
  5**  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
  6**
  7**  This copyrighted material is made available to anyone wishing to use,
  8**  modify, copy, or redistribute it subject to the terms and conditions
  9**  of the GNU General Public License v.2.
 10**
 11*******************************************************************************
 12******************************************************************************/
 13
 14#include "dlm_internal.h"
 15#include "lockspace.h"
 16#include "member.h"
 17#include "recoverd.h"
 18#include "dir.h"
 19#include "lowcomms.h"
 20#include "config.h"
 21#include "memory.h"
 22#include "lock.h"
 23#include "recover.h"
 24#include "requestqueue.h"
 25#include "user.h"
 26#include "ast.h"
 27
 28static int			ls_count;
 29static struct mutex		ls_lock;
 30static struct list_head		lslist;
 31static spinlock_t		lslist_lock;
 32static struct task_struct *	scand_task;
 33
 34
 35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
 36{
 37	ssize_t ret = len;
 38	int n = simple_strtol(buf, NULL, 0);
 
 39
 
 
 40	ls = dlm_find_lockspace_local(ls->ls_local_handle);
 41	if (!ls)
 42		return -EINVAL;
 43
 44	switch (n) {
 45	case 0:
 46		dlm_ls_stop(ls);
 47		break;
 48	case 1:
 49		dlm_ls_start(ls);
 50		break;
 51	default:
 52		ret = -EINVAL;
 53	}
 54	dlm_put_lockspace(ls);
 55	return ret;
 56}
 57
 58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
 59{
 60	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
 
 
 
 61	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
 62	wake_up(&ls->ls_uevent_wait);
 63	return len;
 64}
 65
 66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
 67{
 68	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
 69}
 70
 71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
 72{
 73	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 74	return len;
 75}
 76
 77static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 78{
 79	uint32_t status = dlm_recover_status(ls);
 80	return snprintf(buf, PAGE_SIZE, "%x\n", status);
 81}
 82
 83static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
 84{
 85	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
 86}
 87
 88struct dlm_attr {
 89	struct attribute attr;
 90	ssize_t (*show)(struct dlm_ls *, char *);
 91	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
 92};
 93
 94static struct dlm_attr dlm_attr_control = {
 95	.attr  = {.name = "control", .mode = S_IWUSR},
 96	.store = dlm_control_store
 97};
 98
 99static struct dlm_attr dlm_attr_event = {
100	.attr  = {.name = "event_done", .mode = S_IWUSR},
101	.store = dlm_event_store
102};
103
104static struct dlm_attr dlm_attr_id = {
105	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106	.show  = dlm_id_show,
107	.store = dlm_id_store
108};
109
 
 
 
 
 
 
110static struct dlm_attr dlm_attr_recover_status = {
111	.attr  = {.name = "recover_status", .mode = S_IRUGO},
112	.show  = dlm_recover_status_show
113};
114
115static struct dlm_attr dlm_attr_recover_nodeid = {
116	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117	.show  = dlm_recover_nodeid_show
118};
119
120static struct attribute *dlm_attrs[] = {
121	&dlm_attr_control.attr,
122	&dlm_attr_event.attr,
123	&dlm_attr_id.attr,
 
124	&dlm_attr_recover_status.attr,
125	&dlm_attr_recover_nodeid.attr,
126	NULL,
127};
128
129static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130			     char *buf)
131{
132	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134	return a->show ? a->show(ls, buf) : 0;
135}
136
137static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138			      const char *buf, size_t len)
139{
140	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142	return a->store ? a->store(ls, buf, len) : len;
143}
144
145static void lockspace_kobj_release(struct kobject *k)
146{
147	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148	kfree(ls);
149}
150
151static const struct sysfs_ops dlm_attr_ops = {
152	.show  = dlm_attr_show,
153	.store = dlm_attr_store,
154};
155
156static struct kobj_type dlm_ktype = {
157	.default_attrs = dlm_attrs,
158	.sysfs_ops     = &dlm_attr_ops,
159	.release       = lockspace_kobj_release,
160};
161
162static struct kset *dlm_kset;
163
164static int do_uevent(struct dlm_ls *ls, int in)
165{
166	int error;
167
168	if (in)
169		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170	else
171		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175	/* dlm_controld will see the uevent, do the necessary group management
176	   and then write to sysfs to wake us */
177
178	error = wait_event_interruptible(ls->ls_uevent_wait,
179			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183	if (error)
184		goto out;
185
186	error = ls->ls_uevent_result;
187 out:
188	if (error)
189		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190			  error, ls->ls_uevent_result);
191	return error;
192}
193
194static int dlm_uevent(struct kset *kset, struct kobject *kobj,
195		      struct kobj_uevent_env *env)
196{
197	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
198
199	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
200	return 0;
201}
202
203static struct kset_uevent_ops dlm_uevent_ops = {
204	.uevent = dlm_uevent,
205};
206
207int __init dlm_lockspace_init(void)
208{
209	ls_count = 0;
210	mutex_init(&ls_lock);
211	INIT_LIST_HEAD(&lslist);
212	spin_lock_init(&lslist_lock);
213
214	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
215	if (!dlm_kset) {
216		printk(KERN_WARNING "%s: can not create kset\n", __func__);
217		return -ENOMEM;
218	}
219	return 0;
220}
221
222void dlm_lockspace_exit(void)
223{
224	kset_unregister(dlm_kset);
225}
226
227static struct dlm_ls *find_ls_to_scan(void)
228{
229	struct dlm_ls *ls;
230
231	spin_lock(&lslist_lock);
232	list_for_each_entry(ls, &lslist, ls_list) {
233		if (time_after_eq(jiffies, ls->ls_scan_time +
234					    dlm_config.ci_scan_secs * HZ)) {
235			spin_unlock(&lslist_lock);
236			return ls;
237		}
238	}
239	spin_unlock(&lslist_lock);
240	return NULL;
241}
242
243static int dlm_scand(void *data)
244{
245	struct dlm_ls *ls;
246
247	while (!kthread_should_stop()) {
248		ls = find_ls_to_scan();
249		if (ls) {
250			if (dlm_lock_recovery_try(ls)) {
251				ls->ls_scan_time = jiffies;
252				dlm_scan_rsbs(ls);
253				dlm_scan_timeout(ls);
254				dlm_scan_waiters(ls);
255				dlm_unlock_recovery(ls);
256			} else {
257				ls->ls_scan_time += HZ;
258			}
259			continue;
260		}
261		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
262	}
263	return 0;
264}
265
266static int dlm_scand_start(void)
267{
268	struct task_struct *p;
269	int error = 0;
270
271	p = kthread_run(dlm_scand, NULL, "dlm_scand");
272	if (IS_ERR(p))
273		error = PTR_ERR(p);
274	else
275		scand_task = p;
276	return error;
277}
278
279static void dlm_scand_stop(void)
280{
281	kthread_stop(scand_task);
282}
283
284struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
285{
286	struct dlm_ls *ls;
287
288	spin_lock(&lslist_lock);
289
290	list_for_each_entry(ls, &lslist, ls_list) {
291		if (ls->ls_global_id == id) {
292			ls->ls_count++;
293			goto out;
294		}
295	}
296	ls = NULL;
297 out:
298	spin_unlock(&lslist_lock);
299	return ls;
300}
301
302struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
303{
304	struct dlm_ls *ls;
305
306	spin_lock(&lslist_lock);
307	list_for_each_entry(ls, &lslist, ls_list) {
308		if (ls->ls_local_handle == lockspace) {
309			ls->ls_count++;
310			goto out;
311		}
312	}
313	ls = NULL;
314 out:
315	spin_unlock(&lslist_lock);
316	return ls;
317}
318
319struct dlm_ls *dlm_find_lockspace_device(int minor)
320{
321	struct dlm_ls *ls;
322
323	spin_lock(&lslist_lock);
324	list_for_each_entry(ls, &lslist, ls_list) {
325		if (ls->ls_device.minor == minor) {
326			ls->ls_count++;
327			goto out;
328		}
329	}
330	ls = NULL;
331 out:
332	spin_unlock(&lslist_lock);
333	return ls;
334}
335
336void dlm_put_lockspace(struct dlm_ls *ls)
337{
338	spin_lock(&lslist_lock);
339	ls->ls_count--;
340	spin_unlock(&lslist_lock);
341}
342
343static void remove_lockspace(struct dlm_ls *ls)
344{
345	for (;;) {
346		spin_lock(&lslist_lock);
347		if (ls->ls_count == 0) {
348			WARN_ON(ls->ls_create_count != 0);
349			list_del(&ls->ls_list);
350			spin_unlock(&lslist_lock);
351			return;
352		}
353		spin_unlock(&lslist_lock);
354		ssleep(1);
355	}
356}
357
358static int threads_start(void)
359{
360	int error;
361
362	error = dlm_scand_start();
363	if (error) {
364		log_print("cannot start dlm_scand thread %d", error);
365		goto fail;
366	}
367
368	/* Thread for sending/receiving messages for all lockspace's */
369	error = dlm_lowcomms_start();
370	if (error) {
371		log_print("cannot start dlm lowcomms %d", error);
372		goto scand_fail;
373	}
374
375	return 0;
376
377 scand_fail:
378	dlm_scand_stop();
379 fail:
380	return error;
381}
382
383static void threads_stop(void)
384{
385	dlm_scand_stop();
386	dlm_lowcomms_stop();
387}
388
389static int new_lockspace(const char *name, int namelen, void **lockspace,
390			 uint32_t flags, int lvblen)
 
 
391{
392	struct dlm_ls *ls;
393	int i, size, error;
394	int do_unreg = 0;
 
395
396	if (namelen > DLM_LOCKSPACE_LEN)
397		return -EINVAL;
398
399	if (!lvblen || (lvblen % 8))
400		return -EINVAL;
401
402	if (!try_module_get(THIS_MODULE))
403		return -EINVAL;
404
405	if (!dlm_user_daemon_available()) {
406		module_put(THIS_MODULE);
407		return -EUNATCH;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
408	}
409
410	error = 0;
411
412	spin_lock(&lslist_lock);
413	list_for_each_entry(ls, &lslist, ls_list) {
414		WARN_ON(ls->ls_create_count <= 0);
415		if (ls->ls_namelen != namelen)
416			continue;
417		if (memcmp(ls->ls_name, name, namelen))
418			continue;
419		if (flags & DLM_LSFL_NEWEXCL) {
420			error = -EEXIST;
421			break;
422		}
423		ls->ls_create_count++;
424		*lockspace = ls;
425		error = 1;
426		break;
427	}
428	spin_unlock(&lslist_lock);
429
430	if (error)
431		goto out;
432
433	error = -ENOMEM;
434
435	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
436	if (!ls)
437		goto out;
438	memcpy(ls->ls_name, name, namelen);
439	ls->ls_namelen = namelen;
440	ls->ls_lvblen = lvblen;
441	ls->ls_count = 0;
442	ls->ls_flags = 0;
443	ls->ls_scan_time = jiffies;
444
 
 
 
 
 
445	if (flags & DLM_LSFL_TIMEWARN)
446		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
447
448	/* ls_exflags are forced to match among nodes, and we don't
449	   need to require all nodes to have some flags set */
450	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
451				    DLM_LSFL_NEWEXCL));
452
453	size = dlm_config.ci_rsbtbl_size;
454	ls->ls_rsbtbl_size = size;
455
456	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
457	if (!ls->ls_rsbtbl)
458		goto out_lsfree;
459	for (i = 0; i < size; i++) {
460		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
461		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
462		spin_lock_init(&ls->ls_rsbtbl[i].lock);
463	}
464
 
 
 
 
 
 
 
 
 
465	idr_init(&ls->ls_lkbidr);
466	spin_lock_init(&ls->ls_lkbidr_spin);
467
468	size = dlm_config.ci_dirtbl_size;
469	ls->ls_dirtbl_size = size;
470
471	ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
472	if (!ls->ls_dirtbl)
473		goto out_lkbfree;
474	for (i = 0; i < size; i++) {
475		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
476		spin_lock_init(&ls->ls_dirtbl[i].lock);
477	}
478
479	INIT_LIST_HEAD(&ls->ls_waiters);
480	mutex_init(&ls->ls_waiters_mutex);
481	INIT_LIST_HEAD(&ls->ls_orphans);
482	mutex_init(&ls->ls_orphans_mutex);
483	INIT_LIST_HEAD(&ls->ls_timeout);
484	mutex_init(&ls->ls_timeout_mutex);
485
486	INIT_LIST_HEAD(&ls->ls_new_rsb);
487	spin_lock_init(&ls->ls_new_rsb_spin);
488
489	INIT_LIST_HEAD(&ls->ls_nodes);
490	INIT_LIST_HEAD(&ls->ls_nodes_gone);
491	ls->ls_num_nodes = 0;
492	ls->ls_low_nodeid = 0;
493	ls->ls_total_weight = 0;
494	ls->ls_node_array = NULL;
495
496	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
497	ls->ls_stub_rsb.res_ls = ls;
498
499	ls->ls_debug_rsb_dentry = NULL;
500	ls->ls_debug_waiters_dentry = NULL;
501
502	init_waitqueue_head(&ls->ls_uevent_wait);
503	ls->ls_uevent_result = 0;
504	init_completion(&ls->ls_members_done);
505	ls->ls_members_result = -1;
506
507	mutex_init(&ls->ls_cb_mutex);
508	INIT_LIST_HEAD(&ls->ls_cb_delay);
509
510	ls->ls_recoverd_task = NULL;
511	mutex_init(&ls->ls_recoverd_active);
512	spin_lock_init(&ls->ls_recover_lock);
513	spin_lock_init(&ls->ls_rcom_spin);
514	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
515	ls->ls_recover_status = 0;
516	ls->ls_recover_seq = 0;
517	ls->ls_recover_args = NULL;
518	init_rwsem(&ls->ls_in_recovery);
519	init_rwsem(&ls->ls_recv_active);
520	INIT_LIST_HEAD(&ls->ls_requestqueue);
521	mutex_init(&ls->ls_requestqueue_mutex);
522	mutex_init(&ls->ls_clear_proc_locks);
523
524	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
525	if (!ls->ls_recover_buf)
526		goto out_dirfree;
 
 
 
 
 
527
528	INIT_LIST_HEAD(&ls->ls_recover_list);
529	spin_lock_init(&ls->ls_recover_list_lock);
 
 
530	ls->ls_recover_list_count = 0;
531	ls->ls_local_handle = ls;
532	init_waitqueue_head(&ls->ls_wait_general);
533	INIT_LIST_HEAD(&ls->ls_root_list);
534	init_rwsem(&ls->ls_root_sem);
535
536	down_write(&ls->ls_in_recovery);
537
538	spin_lock(&lslist_lock);
539	ls->ls_create_count = 1;
540	list_add(&ls->ls_list, &lslist);
541	spin_unlock(&lslist_lock);
542
543	if (flags & DLM_LSFL_FS) {
544		error = dlm_callback_start(ls);
545		if (error) {
546			log_error(ls, "can't start dlm_callback %d", error);
547			goto out_delist;
548		}
549	}
550
551	/* needs to find ls in lslist */
 
 
 
 
 
 
 
 
552	error = dlm_recoverd_start(ls);
553	if (error) {
554		log_error(ls, "can't start dlm_recoverd %d", error);
555		goto out_callback;
556	}
557
 
 
 
558	ls->ls_kobj.kset = dlm_kset;
559	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
560				     "%s", ls->ls_name);
561	if (error)
562		goto out_recoverd;
563	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
564
565	/* let kobject handle freeing of ls if there's an error */
566	do_unreg = 1;
567
568	/* This uevent triggers dlm_controld in userspace to add us to the
569	   group of nodes that are members of this lockspace (managed by the
570	   cluster infrastructure.)  Once it's done that, it tells us who the
571	   current lockspace members are (via configfs) and then tells the
572	   lockspace to start running (via sysfs) in dlm_ls_start(). */
573
574	error = do_uevent(ls, 1);
575	if (error)
576		goto out_recoverd;
577
578	wait_for_completion(&ls->ls_members_done);
579	error = ls->ls_members_result;
580	if (error)
581		goto out_members;
582
583	dlm_create_debug_file(ls);
584
585	log_debug(ls, "join complete");
586	*lockspace = ls;
587	return 0;
588
589 out_members:
590	do_uevent(ls, 0);
591	dlm_clear_members(ls);
592	kfree(ls->ls_node_array);
593 out_recoverd:
594	dlm_recoverd_stop(ls);
595 out_callback:
596	dlm_callback_stop(ls);
597 out_delist:
598	spin_lock(&lslist_lock);
599	list_del(&ls->ls_list);
600	spin_unlock(&lslist_lock);
 
601	kfree(ls->ls_recover_buf);
602 out_dirfree:
603	vfree(ls->ls_dirtbl);
604 out_lkbfree:
605	idr_destroy(&ls->ls_lkbidr);
 
 
 
 
 
606	vfree(ls->ls_rsbtbl);
607 out_lsfree:
608	if (do_unreg)
609		kobject_put(&ls->ls_kobj);
610	else
611		kfree(ls);
612 out:
613	module_put(THIS_MODULE);
614	return error;
615}
616
617int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
618		      uint32_t flags, int lvblen)
 
 
619{
620	int error = 0;
621
622	mutex_lock(&ls_lock);
623	if (!ls_count)
624		error = threads_start();
625	if (error)
626		goto out;
627
628	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
 
629	if (!error)
630		ls_count++;
631	if (error > 0)
632		error = 0;
633	if (!ls_count)
634		threads_stop();
635 out:
636	mutex_unlock(&ls_lock);
637	return error;
638}
639
640static int lkb_idr_is_local(int id, void *p, void *data)
641{
642	struct dlm_lkb *lkb = p;
643
644	if (!lkb->lkb_nodeid)
645		return 1;
646	return 0;
647}
648
649static int lkb_idr_is_any(int id, void *p, void *data)
650{
651	return 1;
652}
653
654static int lkb_idr_free(int id, void *p, void *data)
655{
656	struct dlm_lkb *lkb = p;
657
658	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
659		dlm_free_lvb(lkb->lkb_lvbptr);
660
661	dlm_free_lkb(lkb);
662	return 0;
663}
664
665/* NOTE: We check the lkbidr here rather than the resource table.
666   This is because there may be LKBs queued as ASTs that have been unlinked
667   from their RSBs and are pending deletion once the AST has been delivered */
668
669static int lockspace_busy(struct dlm_ls *ls, int force)
670{
671	int rv;
672
673	spin_lock(&ls->ls_lkbidr_spin);
674	if (force == 0) {
675		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
676	} else if (force == 1) {
677		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
678	} else {
679		rv = 0;
680	}
681	spin_unlock(&ls->ls_lkbidr_spin);
682	return rv;
683}
684
685static int release_lockspace(struct dlm_ls *ls, int force)
686{
687	struct dlm_rsb *rsb;
688	struct list_head *head;
689	int i, busy, rv;
690
691	busy = lockspace_busy(ls, force);
692
693	spin_lock(&lslist_lock);
694	if (ls->ls_create_count == 1) {
695		if (busy) {
696			rv = -EBUSY;
697		} else {
698			/* remove_lockspace takes ls off lslist */
699			ls->ls_create_count = 0;
700			rv = 0;
701		}
702	} else if (ls->ls_create_count > 1) {
703		rv = --ls->ls_create_count;
704	} else {
705		rv = -EINVAL;
706	}
707	spin_unlock(&lslist_lock);
708
709	if (rv) {
710		log_debug(ls, "release_lockspace no remove %d", rv);
711		return rv;
712	}
713
714	dlm_device_deregister(ls);
715
716	if (force < 3 && dlm_user_daemon_available())
717		do_uevent(ls, 0);
718
719	dlm_recoverd_stop(ls);
720
721	dlm_callback_stop(ls);
722
723	remove_lockspace(ls);
724
725	dlm_delete_debug_file(ls);
726
727	kfree(ls->ls_recover_buf);
728
729	/*
730	 * Free direntry structs.
731	 */
732
733	dlm_dir_clear(ls);
734	vfree(ls->ls_dirtbl);
735
736	/*
737	 * Free all lkb's in idr
738	 */
739
740	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
741	idr_remove_all(&ls->ls_lkbidr);
742	idr_destroy(&ls->ls_lkbidr);
743
744	/*
745	 * Free all rsb's on rsbtbl[] lists
746	 */
747
748	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
749		head = &ls->ls_rsbtbl[i].list;
750		while (!list_empty(head)) {
751			rsb = list_entry(head->next, struct dlm_rsb,
752					 res_hashchain);
753
754			list_del(&rsb->res_hashchain);
755			dlm_free_rsb(rsb);
756		}
757
758		head = &ls->ls_rsbtbl[i].toss;
759		while (!list_empty(head)) {
760			rsb = list_entry(head->next, struct dlm_rsb,
761					 res_hashchain);
762			list_del(&rsb->res_hashchain);
763			dlm_free_rsb(rsb);
764		}
765	}
766
767	vfree(ls->ls_rsbtbl);
768
 
 
 
769	while (!list_empty(&ls->ls_new_rsb)) {
770		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
771				       res_hashchain);
772		list_del(&rsb->res_hashchain);
773		dlm_free_rsb(rsb);
774	}
775
776	/*
777	 * Free structures on any other lists
778	 */
779
780	dlm_purge_requestqueue(ls);
781	kfree(ls->ls_recover_args);
782	dlm_clear_free_entries(ls);
783	dlm_clear_members(ls);
784	dlm_clear_members_gone(ls);
785	kfree(ls->ls_node_array);
786	log_debug(ls, "release_lockspace final free");
787	kobject_put(&ls->ls_kobj);
788	/* The ls structure will be freed when the kobject is done with */
789
790	module_put(THIS_MODULE);
791	return 0;
792}
793
794/*
795 * Called when a system has released all its locks and is not going to use the
796 * lockspace any longer.  We free everything we're managing for this lockspace.
797 * Remaining nodes will go through the recovery process as if we'd died.  The
798 * lockspace must continue to function as usual, participating in recoveries,
799 * until this returns.
800 *
801 * Force has 4 possible values:
802 * 0 - don't destroy locksapce if it has any LKBs
803 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
804 * 2 - destroy lockspace regardless of LKBs
805 * 3 - destroy lockspace as part of a forced shutdown
806 */
807
808int dlm_release_lockspace(void *lockspace, int force)
809{
810	struct dlm_ls *ls;
811	int error;
812
813	ls = dlm_find_lockspace_local(lockspace);
814	if (!ls)
815		return -EINVAL;
816	dlm_put_lockspace(ls);
817
818	mutex_lock(&ls_lock);
819	error = release_lockspace(ls, force);
820	if (!error)
821		ls_count--;
822	if (!ls_count)
823		threads_stop();
824	mutex_unlock(&ls_lock);
825
826	return error;
827}
828
829void dlm_stop_lockspaces(void)
830{
831	struct dlm_ls *ls;
 
832
833 restart:
 
834	spin_lock(&lslist_lock);
835	list_for_each_entry(ls, &lslist, ls_list) {
836		if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
 
837			continue;
 
838		spin_unlock(&lslist_lock);
839		log_error(ls, "no userland control daemon, stopping lockspace");
840		dlm_ls_stop(ls);
841		goto restart;
842	}
843	spin_unlock(&lslist_lock);
 
 
 
844}
845