Loading...
1// SPDX-License-Identifier: GPL-2.0-only
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
7**
8**
9*******************************************************************************
10******************************************************************************/
11
12#include <linux/module.h>
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "recoverd.h"
18#include "dir.h"
19#include "midcomms.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "recover.h"
24#include "requestqueue.h"
25#include "user.h"
26#include "ast.h"
27
28static int ls_count;
29static struct mutex ls_lock;
30static struct list_head lslist;
31static spinlock_t lslist_lock;
32
33static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34{
35 ssize_t ret = len;
36 int n;
37 int rc = kstrtoint(buf, 0, &n);
38
39 if (rc)
40 return rc;
41 ls = dlm_find_lockspace_local(ls);
42 if (!ls)
43 return -EINVAL;
44
45 switch (n) {
46 case 0:
47 dlm_ls_stop(ls);
48 break;
49 case 1:
50 dlm_ls_start(ls);
51 break;
52 default:
53 ret = -EINVAL;
54 }
55 dlm_put_lockspace(ls);
56 return ret;
57}
58
59static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60{
61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62
63 if (rc)
64 return rc;
65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 wake_up(&ls->ls_uevent_wait);
67 return len;
68}
69
70static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71{
72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73}
74
75static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76{
77 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78
79 if (rc)
80 return rc;
81 return len;
82}
83
84static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85{
86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87}
88
89static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90{
91 int val;
92 int rc = kstrtoint(buf, 0, &val);
93
94 if (rc)
95 return rc;
96 if (val == 1)
97 set_bit(LSFL_NODIR, &ls->ls_flags);
98 return len;
99}
100
101static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102{
103 uint32_t status = dlm_recover_status(ls);
104 return snprintf(buf, PAGE_SIZE, "%x\n", status);
105}
106
107static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108{
109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110}
111
112struct dlm_attr {
113 struct attribute attr;
114 ssize_t (*show)(struct dlm_ls *, char *);
115 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116};
117
118static struct dlm_attr dlm_attr_control = {
119 .attr = {.name = "control", .mode = S_IWUSR},
120 .store = dlm_control_store
121};
122
123static struct dlm_attr dlm_attr_event = {
124 .attr = {.name = "event_done", .mode = S_IWUSR},
125 .store = dlm_event_store
126};
127
128static struct dlm_attr dlm_attr_id = {
129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 .show = dlm_id_show,
131 .store = dlm_id_store
132};
133
134static struct dlm_attr dlm_attr_nodir = {
135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 .show = dlm_nodir_show,
137 .store = dlm_nodir_store
138};
139
140static struct dlm_attr dlm_attr_recover_status = {
141 .attr = {.name = "recover_status", .mode = S_IRUGO},
142 .show = dlm_recover_status_show
143};
144
145static struct dlm_attr dlm_attr_recover_nodeid = {
146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
147 .show = dlm_recover_nodeid_show
148};
149
150static struct attribute *dlm_attrs[] = {
151 &dlm_attr_control.attr,
152 &dlm_attr_event.attr,
153 &dlm_attr_id.attr,
154 &dlm_attr_nodir.attr,
155 &dlm_attr_recover_status.attr,
156 &dlm_attr_recover_nodeid.attr,
157 NULL,
158};
159ATTRIBUTE_GROUPS(dlm);
160
161static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 char *buf)
163{
164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 return a->show ? a->show(ls, buf) : 0;
167}
168
169static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 const char *buf, size_t len)
171{
172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 return a->store ? a->store(ls, buf, len) : len;
175}
176
177static const struct sysfs_ops dlm_attr_ops = {
178 .show = dlm_attr_show,
179 .store = dlm_attr_store,
180};
181
182static struct kobj_type dlm_ktype = {
183 .default_groups = dlm_groups,
184 .sysfs_ops = &dlm_attr_ops,
185};
186
187static struct kset *dlm_kset;
188
189static int do_uevent(struct dlm_ls *ls, int in)
190{
191 if (in)
192 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
193 else
194 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
195
196 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
197
198 /* dlm_controld will see the uevent, do the necessary group management
199 and then write to sysfs to wake us */
200
201 wait_event(ls->ls_uevent_wait,
202 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
203
204 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
205
206 return ls->ls_uevent_result;
207}
208
209static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
210{
211 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
212
213 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
214 return 0;
215}
216
217static const struct kset_uevent_ops dlm_uevent_ops = {
218 .uevent = dlm_uevent,
219};
220
221int __init dlm_lockspace_init(void)
222{
223 ls_count = 0;
224 mutex_init(&ls_lock);
225 INIT_LIST_HEAD(&lslist);
226 spin_lock_init(&lslist_lock);
227
228 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
229 if (!dlm_kset) {
230 printk(KERN_WARNING "%s: can not create kset\n", __func__);
231 return -ENOMEM;
232 }
233 return 0;
234}
235
236void dlm_lockspace_exit(void)
237{
238 kset_unregister(dlm_kset);
239}
240
241struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
242{
243 struct dlm_ls *ls;
244
245 spin_lock_bh(&lslist_lock);
246
247 list_for_each_entry(ls, &lslist, ls_list) {
248 if (ls->ls_global_id == id) {
249 atomic_inc(&ls->ls_count);
250 goto out;
251 }
252 }
253 ls = NULL;
254 out:
255 spin_unlock_bh(&lslist_lock);
256 return ls;
257}
258
259struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
260{
261 struct dlm_ls *ls = lockspace;
262
263 atomic_inc(&ls->ls_count);
264 return ls;
265}
266
267struct dlm_ls *dlm_find_lockspace_device(int minor)
268{
269 struct dlm_ls *ls;
270
271 spin_lock_bh(&lslist_lock);
272 list_for_each_entry(ls, &lslist, ls_list) {
273 if (ls->ls_device.minor == minor) {
274 atomic_inc(&ls->ls_count);
275 goto out;
276 }
277 }
278 ls = NULL;
279 out:
280 spin_unlock_bh(&lslist_lock);
281 return ls;
282}
283
284void dlm_put_lockspace(struct dlm_ls *ls)
285{
286 if (atomic_dec_and_test(&ls->ls_count))
287 wake_up(&ls->ls_count_wait);
288}
289
290static void remove_lockspace(struct dlm_ls *ls)
291{
292retry:
293 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
294
295 spin_lock_bh(&lslist_lock);
296 if (atomic_read(&ls->ls_count) != 0) {
297 spin_unlock_bh(&lslist_lock);
298 goto retry;
299 }
300
301 WARN_ON(ls->ls_create_count != 0);
302 list_del(&ls->ls_list);
303 spin_unlock_bh(&lslist_lock);
304}
305
306static int threads_start(void)
307{
308 int error;
309
310 /* Thread for sending/receiving messages for all lockspace's */
311 error = dlm_midcomms_start();
312 if (error)
313 log_print("cannot start dlm midcomms %d", error);
314
315 return error;
316}
317
318static int lkb_idr_free(struct dlm_lkb *lkb)
319{
320 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
321 dlm_free_lvb(lkb->lkb_lvbptr);
322
323 dlm_free_lkb(lkb);
324 return 0;
325}
326
327static void rhash_free_rsb(void *ptr, void *arg)
328{
329 struct dlm_rsb *rsb = ptr;
330
331 dlm_free_rsb(rsb);
332}
333
334static void free_lockspace(struct work_struct *work)
335{
336 struct dlm_ls *ls = container_of(work, struct dlm_ls, ls_free_work);
337 struct dlm_lkb *lkb;
338 unsigned long id;
339
340 /*
341 * Free all lkb's in xa
342 */
343 xa_for_each(&ls->ls_lkbxa, id, lkb) {
344 lkb_idr_free(lkb);
345 }
346 xa_destroy(&ls->ls_lkbxa);
347
348 /*
349 * Free all rsb's on rsbtbl
350 */
351 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
352
353 kfree(ls);
354}
355
356static int new_lockspace(const char *name, const char *cluster,
357 uint32_t flags, int lvblen,
358 const struct dlm_lockspace_ops *ops, void *ops_arg,
359 int *ops_result, dlm_lockspace_t **lockspace)
360{
361 struct dlm_ls *ls;
362 int namelen = strlen(name);
363 int error;
364
365 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
366 return -EINVAL;
367
368 if (lvblen % 8)
369 return -EINVAL;
370
371 if (!try_module_get(THIS_MODULE))
372 return -EINVAL;
373
374 if (!dlm_user_daemon_available()) {
375 log_print("dlm user daemon not available");
376 error = -EUNATCH;
377 goto out;
378 }
379
380 if (ops && ops_result) {
381 if (!dlm_config.ci_recover_callbacks)
382 *ops_result = -EOPNOTSUPP;
383 else
384 *ops_result = 0;
385 }
386
387 if (!cluster)
388 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
389 dlm_config.ci_cluster_name);
390
391 if (dlm_config.ci_recover_callbacks && cluster &&
392 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
393 log_print("dlm cluster name '%s' does not match "
394 "the application cluster name '%s'",
395 dlm_config.ci_cluster_name, cluster);
396 error = -EBADR;
397 goto out;
398 }
399
400 error = 0;
401
402 spin_lock_bh(&lslist_lock);
403 list_for_each_entry(ls, &lslist, ls_list) {
404 WARN_ON(ls->ls_create_count <= 0);
405 if (ls->ls_namelen != namelen)
406 continue;
407 if (memcmp(ls->ls_name, name, namelen))
408 continue;
409 if (flags & DLM_LSFL_NEWEXCL) {
410 error = -EEXIST;
411 break;
412 }
413 ls->ls_create_count++;
414 *lockspace = ls;
415 error = 1;
416 break;
417 }
418 spin_unlock_bh(&lslist_lock);
419
420 if (error)
421 goto out;
422
423 error = -ENOMEM;
424
425 ls = kzalloc(sizeof(*ls), GFP_NOFS);
426 if (!ls)
427 goto out;
428 memcpy(ls->ls_name, name, namelen);
429 ls->ls_namelen = namelen;
430 ls->ls_lvblen = lvblen;
431 atomic_set(&ls->ls_count, 0);
432 init_waitqueue_head(&ls->ls_count_wait);
433 ls->ls_flags = 0;
434
435 if (ops && dlm_config.ci_recover_callbacks) {
436 ls->ls_ops = ops;
437 ls->ls_ops_arg = ops_arg;
438 }
439
440 if (flags & DLM_LSFL_SOFTIRQ)
441 set_bit(LSFL_SOFTIRQ, &ls->ls_flags);
442
443 /* ls_exflags are forced to match among nodes, and we don't
444 * need to require all nodes to have some flags set
445 */
446 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
447 DLM_LSFL_SOFTIRQ));
448
449 INIT_LIST_HEAD(&ls->ls_slow_inactive);
450 INIT_LIST_HEAD(&ls->ls_slow_active);
451 rwlock_init(&ls->ls_rsbtbl_lock);
452
453 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
454 if (error)
455 goto out_lsfree;
456
457 xa_init_flags(&ls->ls_lkbxa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
458 rwlock_init(&ls->ls_lkbxa_lock);
459
460 INIT_LIST_HEAD(&ls->ls_waiters);
461 spin_lock_init(&ls->ls_waiters_lock);
462 INIT_LIST_HEAD(&ls->ls_orphans);
463 spin_lock_init(&ls->ls_orphans_lock);
464
465 INIT_LIST_HEAD(&ls->ls_nodes);
466 INIT_LIST_HEAD(&ls->ls_nodes_gone);
467 ls->ls_num_nodes = 0;
468 ls->ls_low_nodeid = 0;
469 ls->ls_total_weight = 0;
470 ls->ls_node_array = NULL;
471
472 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
473 ls->ls_local_rsb.res_ls = ls;
474
475 ls->ls_debug_rsb_dentry = NULL;
476 ls->ls_debug_waiters_dentry = NULL;
477
478 init_waitqueue_head(&ls->ls_uevent_wait);
479 ls->ls_uevent_result = 0;
480 init_completion(&ls->ls_recovery_done);
481 ls->ls_recovery_result = -1;
482
483 spin_lock_init(&ls->ls_cb_lock);
484 INIT_LIST_HEAD(&ls->ls_cb_delay);
485
486 INIT_WORK(&ls->ls_free_work, free_lockspace);
487
488 ls->ls_recoverd_task = NULL;
489 mutex_init(&ls->ls_recoverd_active);
490 spin_lock_init(&ls->ls_recover_lock);
491 spin_lock_init(&ls->ls_rcom_spin);
492 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
493 ls->ls_recover_status = 0;
494 ls->ls_recover_seq = get_random_u64();
495 ls->ls_recover_args = NULL;
496 init_rwsem(&ls->ls_in_recovery);
497 rwlock_init(&ls->ls_recv_active);
498 INIT_LIST_HEAD(&ls->ls_requestqueue);
499 rwlock_init(&ls->ls_requestqueue_lock);
500 spin_lock_init(&ls->ls_clear_proc_locks);
501
502 /* Due backwards compatibility with 3.1 we need to use maximum
503 * possible dlm message size to be sure the message will fit and
504 * not having out of bounds issues. However on sending side 3.2
505 * might send less.
506 */
507 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
508 if (!ls->ls_recover_buf) {
509 error = -ENOMEM;
510 goto out_lkbxa;
511 }
512
513 ls->ls_slot = 0;
514 ls->ls_num_slots = 0;
515 ls->ls_slots_size = 0;
516 ls->ls_slots = NULL;
517
518 INIT_LIST_HEAD(&ls->ls_recover_list);
519 spin_lock_init(&ls->ls_recover_list_lock);
520 xa_init_flags(&ls->ls_recover_xa, XA_FLAGS_ALLOC | XA_FLAGS_LOCK_BH);
521 spin_lock_init(&ls->ls_recover_xa_lock);
522 ls->ls_recover_list_count = 0;
523 init_waitqueue_head(&ls->ls_wait_general);
524 INIT_LIST_HEAD(&ls->ls_masters_list);
525 rwlock_init(&ls->ls_masters_lock);
526 INIT_LIST_HEAD(&ls->ls_dir_dump_list);
527 rwlock_init(&ls->ls_dir_dump_lock);
528
529 INIT_LIST_HEAD(&ls->ls_scan_list);
530 spin_lock_init(&ls->ls_scan_lock);
531 timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE);
532
533 spin_lock_bh(&lslist_lock);
534 ls->ls_create_count = 1;
535 list_add(&ls->ls_list, &lslist);
536 spin_unlock_bh(&lslist_lock);
537
538 if (flags & DLM_LSFL_FS)
539 set_bit(LSFL_FS, &ls->ls_flags);
540
541 error = dlm_callback_start(ls);
542 if (error) {
543 log_error(ls, "can't start dlm_callback %d", error);
544 goto out_delist;
545 }
546
547 init_waitqueue_head(&ls->ls_recover_lock_wait);
548
549 /*
550 * Once started, dlm_recoverd first looks for ls in lslist, then
551 * initializes ls_in_recovery as locked in "down" mode. We need
552 * to wait for the wakeup from dlm_recoverd because in_recovery
553 * has to start out in down mode.
554 */
555
556 error = dlm_recoverd_start(ls);
557 if (error) {
558 log_error(ls, "can't start dlm_recoverd %d", error);
559 goto out_callback;
560 }
561
562 wait_event(ls->ls_recover_lock_wait,
563 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
564
565 ls->ls_kobj.kset = dlm_kset;
566 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
567 "%s", ls->ls_name);
568 if (error)
569 goto out_recoverd;
570 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
571
572 /* This uevent triggers dlm_controld in userspace to add us to the
573 group of nodes that are members of this lockspace (managed by the
574 cluster infrastructure.) Once it's done that, it tells us who the
575 current lockspace members are (via configfs) and then tells the
576 lockspace to start running (via sysfs) in dlm_ls_start(). */
577
578 error = do_uevent(ls, 1);
579 if (error)
580 goto out_recoverd;
581
582 /* wait until recovery is successful or failed */
583 wait_for_completion(&ls->ls_recovery_done);
584 error = ls->ls_recovery_result;
585 if (error)
586 goto out_members;
587
588 dlm_create_debug_file(ls);
589
590 log_rinfo(ls, "join complete");
591 *lockspace = ls;
592 return 0;
593
594 out_members:
595 do_uevent(ls, 0);
596 dlm_clear_members(ls);
597 kfree(ls->ls_node_array);
598 out_recoverd:
599 dlm_recoverd_stop(ls);
600 out_callback:
601 dlm_callback_stop(ls);
602 out_delist:
603 spin_lock_bh(&lslist_lock);
604 list_del(&ls->ls_list);
605 spin_unlock_bh(&lslist_lock);
606 xa_destroy(&ls->ls_recover_xa);
607 kfree(ls->ls_recover_buf);
608 out_lkbxa:
609 xa_destroy(&ls->ls_lkbxa);
610 rhashtable_destroy(&ls->ls_rsbtbl);
611 out_lsfree:
612 kobject_put(&ls->ls_kobj);
613 kfree(ls);
614 out:
615 module_put(THIS_MODULE);
616 return error;
617}
618
619static int __dlm_new_lockspace(const char *name, const char *cluster,
620 uint32_t flags, int lvblen,
621 const struct dlm_lockspace_ops *ops,
622 void *ops_arg, int *ops_result,
623 dlm_lockspace_t **lockspace)
624{
625 int error = 0;
626
627 mutex_lock(&ls_lock);
628 if (!ls_count)
629 error = threads_start();
630 if (error)
631 goto out;
632
633 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
634 ops_result, lockspace);
635 if (!error)
636 ls_count++;
637 if (error > 0)
638 error = 0;
639 if (!ls_count) {
640 dlm_midcomms_shutdown();
641 dlm_midcomms_stop();
642 }
643 out:
644 mutex_unlock(&ls_lock);
645 return error;
646}
647
648int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
649 int lvblen, const struct dlm_lockspace_ops *ops,
650 void *ops_arg, int *ops_result,
651 dlm_lockspace_t **lockspace)
652{
653 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
654 ops, ops_arg, ops_result, lockspace);
655}
656
657int dlm_new_user_lockspace(const char *name, const char *cluster,
658 uint32_t flags, int lvblen,
659 const struct dlm_lockspace_ops *ops,
660 void *ops_arg, int *ops_result,
661 dlm_lockspace_t **lockspace)
662{
663 if (flags & DLM_LSFL_SOFTIRQ)
664 return -EINVAL;
665
666 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
667 ops_arg, ops_result, lockspace);
668}
669
670/* NOTE: We check the lkbxa here rather than the resource table.
671 This is because there may be LKBs queued as ASTs that have been unlinked
672 from their RSBs and are pending deletion once the AST has been delivered */
673
674static int lockspace_busy(struct dlm_ls *ls, int force)
675{
676 struct dlm_lkb *lkb;
677 unsigned long id;
678 int rv = 0;
679
680 read_lock_bh(&ls->ls_lkbxa_lock);
681 if (force == 0) {
682 xa_for_each(&ls->ls_lkbxa, id, lkb) {
683 rv = 1;
684 break;
685 }
686 } else if (force == 1) {
687 xa_for_each(&ls->ls_lkbxa, id, lkb) {
688 if (lkb->lkb_nodeid == 0 &&
689 lkb->lkb_grmode != DLM_LOCK_IV) {
690 rv = 1;
691 break;
692 }
693 }
694 } else {
695 rv = 0;
696 }
697 read_unlock_bh(&ls->ls_lkbxa_lock);
698 return rv;
699}
700
701static int release_lockspace(struct dlm_ls *ls, int force)
702{
703 int busy, rv;
704
705 busy = lockspace_busy(ls, force);
706
707 spin_lock_bh(&lslist_lock);
708 if (ls->ls_create_count == 1) {
709 if (busy) {
710 rv = -EBUSY;
711 } else {
712 /* remove_lockspace takes ls off lslist */
713 ls->ls_create_count = 0;
714 rv = 0;
715 }
716 } else if (ls->ls_create_count > 1) {
717 rv = --ls->ls_create_count;
718 } else {
719 rv = -EINVAL;
720 }
721 spin_unlock_bh(&lslist_lock);
722
723 if (rv) {
724 log_debug(ls, "release_lockspace no remove %d", rv);
725 return rv;
726 }
727
728 if (ls_count == 1)
729 dlm_midcomms_version_wait();
730
731 dlm_device_deregister(ls);
732
733 if (force < 3 && dlm_user_daemon_available())
734 do_uevent(ls, 0);
735
736 dlm_recoverd_stop(ls);
737
738 /* clear the LSFL_RUNNING flag to fast up
739 * time_shutdown_sync(), we don't care anymore
740 */
741 clear_bit(LSFL_RUNNING, &ls->ls_flags);
742 timer_shutdown_sync(&ls->ls_scan_timer);
743
744 if (ls_count == 1) {
745 dlm_clear_members(ls);
746 dlm_midcomms_shutdown();
747 }
748
749 dlm_callback_stop(ls);
750
751 remove_lockspace(ls);
752
753 dlm_delete_debug_file(ls);
754
755 kobject_put(&ls->ls_kobj);
756
757 xa_destroy(&ls->ls_recover_xa);
758 kfree(ls->ls_recover_buf);
759
760 /*
761 * Free structures on any other lists
762 */
763
764 dlm_purge_requestqueue(ls);
765 kfree(ls->ls_recover_args);
766 dlm_clear_members(ls);
767 dlm_clear_members_gone(ls);
768 kfree(ls->ls_node_array);
769
770 log_rinfo(ls, "%s final free", __func__);
771
772 /* delayed free of data structures see free_lockspace() */
773 queue_work(dlm_wq, &ls->ls_free_work);
774 module_put(THIS_MODULE);
775 return 0;
776}
777
778/*
779 * Called when a system has released all its locks and is not going to use the
780 * lockspace any longer. We free everything we're managing for this lockspace.
781 * Remaining nodes will go through the recovery process as if we'd died. The
782 * lockspace must continue to function as usual, participating in recoveries,
783 * until this returns.
784 *
785 * Force has 4 possible values:
786 * 0 - don't destroy lockspace if it has any LKBs
787 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
788 * 2 - destroy lockspace regardless of LKBs
789 * 3 - destroy lockspace as part of a forced shutdown
790 */
791
792int dlm_release_lockspace(void *lockspace, int force)
793{
794 struct dlm_ls *ls;
795 int error;
796
797 ls = dlm_find_lockspace_local(lockspace);
798 if (!ls)
799 return -EINVAL;
800 dlm_put_lockspace(ls);
801
802 mutex_lock(&ls_lock);
803 error = release_lockspace(ls, force);
804 if (!error)
805 ls_count--;
806 if (!ls_count)
807 dlm_midcomms_stop();
808 mutex_unlock(&ls_lock);
809
810 return error;
811}
812
813void dlm_stop_lockspaces(void)
814{
815 struct dlm_ls *ls;
816 int count;
817
818 restart:
819 count = 0;
820 spin_lock_bh(&lslist_lock);
821 list_for_each_entry(ls, &lslist, ls_list) {
822 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
823 count++;
824 continue;
825 }
826 spin_unlock_bh(&lslist_lock);
827 log_error(ls, "no userland control daemon, stopping lockspace");
828 dlm_ls_stop(ls);
829 goto restart;
830 }
831 spin_unlock_bh(&lslist_lock);
832
833 if (count)
834 log_print("dlm user daemon left %d lockspaces", count);
835}
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14#include "dlm_internal.h"
15#include "lockspace.h"
16#include "member.h"
17#include "recoverd.h"
18#include "dir.h"
19#include "lowcomms.h"
20#include "config.h"
21#include "memory.h"
22#include "lock.h"
23#include "recover.h"
24#include "requestqueue.h"
25#include "user.h"
26#include "ast.h"
27
28static int ls_count;
29static struct mutex ls_lock;
30static struct list_head lslist;
31static spinlock_t lslist_lock;
32static struct task_struct * scand_task;
33
34
35static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36{
37 ssize_t ret = len;
38 int n = simple_strtol(buf, NULL, 0);
39
40 ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 if (!ls)
42 return -EINVAL;
43
44 switch (n) {
45 case 0:
46 dlm_ls_stop(ls);
47 break;
48 case 1:
49 dlm_ls_start(ls);
50 break;
51 default:
52 ret = -EINVAL;
53 }
54 dlm_put_lockspace(ls);
55 return ret;
56}
57
58static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59{
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
63 return len;
64}
65
66static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67{
68 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69}
70
71static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72{
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 return len;
75}
76
77static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78{
79 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80}
81
82static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83{
84 int val = simple_strtoul(buf, NULL, 0);
85 if (val == 1)
86 set_bit(LSFL_NODIR, &ls->ls_flags);
87 return len;
88}
89
90static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
91{
92 uint32_t status = dlm_recover_status(ls);
93 return snprintf(buf, PAGE_SIZE, "%x\n", status);
94}
95
96static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
97{
98 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
99}
100
101struct dlm_attr {
102 struct attribute attr;
103 ssize_t (*show)(struct dlm_ls *, char *);
104 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
105};
106
107static struct dlm_attr dlm_attr_control = {
108 .attr = {.name = "control", .mode = S_IWUSR},
109 .store = dlm_control_store
110};
111
112static struct dlm_attr dlm_attr_event = {
113 .attr = {.name = "event_done", .mode = S_IWUSR},
114 .store = dlm_event_store
115};
116
117static struct dlm_attr dlm_attr_id = {
118 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119 .show = dlm_id_show,
120 .store = dlm_id_store
121};
122
123static struct dlm_attr dlm_attr_nodir = {
124 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125 .show = dlm_nodir_show,
126 .store = dlm_nodir_store
127};
128
129static struct dlm_attr dlm_attr_recover_status = {
130 .attr = {.name = "recover_status", .mode = S_IRUGO},
131 .show = dlm_recover_status_show
132};
133
134static struct dlm_attr dlm_attr_recover_nodeid = {
135 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
136 .show = dlm_recover_nodeid_show
137};
138
139static struct attribute *dlm_attrs[] = {
140 &dlm_attr_control.attr,
141 &dlm_attr_event.attr,
142 &dlm_attr_id.attr,
143 &dlm_attr_nodir.attr,
144 &dlm_attr_recover_status.attr,
145 &dlm_attr_recover_nodeid.attr,
146 NULL,
147};
148
149static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150 char *buf)
151{
152 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
153 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154 return a->show ? a->show(ls, buf) : 0;
155}
156
157static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158 const char *buf, size_t len)
159{
160 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
161 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162 return a->store ? a->store(ls, buf, len) : len;
163}
164
165static void lockspace_kobj_release(struct kobject *k)
166{
167 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
168 kfree(ls);
169}
170
171static const struct sysfs_ops dlm_attr_ops = {
172 .show = dlm_attr_show,
173 .store = dlm_attr_store,
174};
175
176static struct kobj_type dlm_ktype = {
177 .default_attrs = dlm_attrs,
178 .sysfs_ops = &dlm_attr_ops,
179 .release = lockspace_kobj_release,
180};
181
182static struct kset *dlm_kset;
183
184static int do_uevent(struct dlm_ls *ls, int in)
185{
186 int error;
187
188 if (in)
189 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
190 else
191 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
192
193 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
194
195 /* dlm_controld will see the uevent, do the necessary group management
196 and then write to sysfs to wake us */
197
198 error = wait_event_interruptible(ls->ls_uevent_wait,
199 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
200
201 log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
202
203 if (error)
204 goto out;
205
206 error = ls->ls_uevent_result;
207 out:
208 if (error)
209 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210 error, ls->ls_uevent_result);
211 return error;
212}
213
214static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215 struct kobj_uevent_env *env)
216{
217 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
218
219 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220 return 0;
221}
222
223static struct kset_uevent_ops dlm_uevent_ops = {
224 .uevent = dlm_uevent,
225};
226
227int __init dlm_lockspace_init(void)
228{
229 ls_count = 0;
230 mutex_init(&ls_lock);
231 INIT_LIST_HEAD(&lslist);
232 spin_lock_init(&lslist_lock);
233
234 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235 if (!dlm_kset) {
236 printk(KERN_WARNING "%s: can not create kset\n", __func__);
237 return -ENOMEM;
238 }
239 return 0;
240}
241
242void dlm_lockspace_exit(void)
243{
244 kset_unregister(dlm_kset);
245}
246
247static struct dlm_ls *find_ls_to_scan(void)
248{
249 struct dlm_ls *ls;
250
251 spin_lock(&lslist_lock);
252 list_for_each_entry(ls, &lslist, ls_list) {
253 if (time_after_eq(jiffies, ls->ls_scan_time +
254 dlm_config.ci_scan_secs * HZ)) {
255 spin_unlock(&lslist_lock);
256 return ls;
257 }
258 }
259 spin_unlock(&lslist_lock);
260 return NULL;
261}
262
263static int dlm_scand(void *data)
264{
265 struct dlm_ls *ls;
266
267 while (!kthread_should_stop()) {
268 ls = find_ls_to_scan();
269 if (ls) {
270 if (dlm_lock_recovery_try(ls)) {
271 ls->ls_scan_time = jiffies;
272 dlm_scan_rsbs(ls);
273 dlm_scan_timeout(ls);
274 dlm_scan_waiters(ls);
275 dlm_unlock_recovery(ls);
276 } else {
277 ls->ls_scan_time += HZ;
278 }
279 continue;
280 }
281 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
282 }
283 return 0;
284}
285
286static int dlm_scand_start(void)
287{
288 struct task_struct *p;
289 int error = 0;
290
291 p = kthread_run(dlm_scand, NULL, "dlm_scand");
292 if (IS_ERR(p))
293 error = PTR_ERR(p);
294 else
295 scand_task = p;
296 return error;
297}
298
299static void dlm_scand_stop(void)
300{
301 kthread_stop(scand_task);
302}
303
304struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
305{
306 struct dlm_ls *ls;
307
308 spin_lock(&lslist_lock);
309
310 list_for_each_entry(ls, &lslist, ls_list) {
311 if (ls->ls_global_id == id) {
312 ls->ls_count++;
313 goto out;
314 }
315 }
316 ls = NULL;
317 out:
318 spin_unlock(&lslist_lock);
319 return ls;
320}
321
322struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
323{
324 struct dlm_ls *ls;
325
326 spin_lock(&lslist_lock);
327 list_for_each_entry(ls, &lslist, ls_list) {
328 if (ls->ls_local_handle == lockspace) {
329 ls->ls_count++;
330 goto out;
331 }
332 }
333 ls = NULL;
334 out:
335 spin_unlock(&lslist_lock);
336 return ls;
337}
338
339struct dlm_ls *dlm_find_lockspace_device(int minor)
340{
341 struct dlm_ls *ls;
342
343 spin_lock(&lslist_lock);
344 list_for_each_entry(ls, &lslist, ls_list) {
345 if (ls->ls_device.minor == minor) {
346 ls->ls_count++;
347 goto out;
348 }
349 }
350 ls = NULL;
351 out:
352 spin_unlock(&lslist_lock);
353 return ls;
354}
355
356void dlm_put_lockspace(struct dlm_ls *ls)
357{
358 spin_lock(&lslist_lock);
359 ls->ls_count--;
360 spin_unlock(&lslist_lock);
361}
362
363static void remove_lockspace(struct dlm_ls *ls)
364{
365 for (;;) {
366 spin_lock(&lslist_lock);
367 if (ls->ls_count == 0) {
368 WARN_ON(ls->ls_create_count != 0);
369 list_del(&ls->ls_list);
370 spin_unlock(&lslist_lock);
371 return;
372 }
373 spin_unlock(&lslist_lock);
374 ssleep(1);
375 }
376}
377
378static int threads_start(void)
379{
380 int error;
381
382 error = dlm_scand_start();
383 if (error) {
384 log_print("cannot start dlm_scand thread %d", error);
385 goto fail;
386 }
387
388 /* Thread for sending/receiving messages for all lockspace's */
389 error = dlm_lowcomms_start();
390 if (error) {
391 log_print("cannot start dlm lowcomms %d", error);
392 goto scand_fail;
393 }
394
395 return 0;
396
397 scand_fail:
398 dlm_scand_stop();
399 fail:
400 return error;
401}
402
403static void threads_stop(void)
404{
405 dlm_scand_stop();
406 dlm_lowcomms_stop();
407}
408
409static int new_lockspace(const char *name, const char *cluster,
410 uint32_t flags, int lvblen,
411 const struct dlm_lockspace_ops *ops, void *ops_arg,
412 int *ops_result, dlm_lockspace_t **lockspace)
413{
414 struct dlm_ls *ls;
415 int i, size, error;
416 int do_unreg = 0;
417 int namelen = strlen(name);
418
419 if (namelen > DLM_LOCKSPACE_LEN)
420 return -EINVAL;
421
422 if (!lvblen || (lvblen % 8))
423 return -EINVAL;
424
425 if (!try_module_get(THIS_MODULE))
426 return -EINVAL;
427
428 if (!dlm_user_daemon_available()) {
429 log_print("dlm user daemon not available");
430 error = -EUNATCH;
431 goto out;
432 }
433
434 if (ops && ops_result) {
435 if (!dlm_config.ci_recover_callbacks)
436 *ops_result = -EOPNOTSUPP;
437 else
438 *ops_result = 0;
439 }
440
441 if (dlm_config.ci_recover_callbacks && cluster &&
442 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443 log_print("dlm cluster name %s mismatch %s",
444 dlm_config.ci_cluster_name, cluster);
445 error = -EBADR;
446 goto out;
447 }
448
449 error = 0;
450
451 spin_lock(&lslist_lock);
452 list_for_each_entry(ls, &lslist, ls_list) {
453 WARN_ON(ls->ls_create_count <= 0);
454 if (ls->ls_namelen != namelen)
455 continue;
456 if (memcmp(ls->ls_name, name, namelen))
457 continue;
458 if (flags & DLM_LSFL_NEWEXCL) {
459 error = -EEXIST;
460 break;
461 }
462 ls->ls_create_count++;
463 *lockspace = ls;
464 error = 1;
465 break;
466 }
467 spin_unlock(&lslist_lock);
468
469 if (error)
470 goto out;
471
472 error = -ENOMEM;
473
474 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475 if (!ls)
476 goto out;
477 memcpy(ls->ls_name, name, namelen);
478 ls->ls_namelen = namelen;
479 ls->ls_lvblen = lvblen;
480 ls->ls_count = 0;
481 ls->ls_flags = 0;
482 ls->ls_scan_time = jiffies;
483
484 if (ops && dlm_config.ci_recover_callbacks) {
485 ls->ls_ops = ops;
486 ls->ls_ops_arg = ops_arg;
487 }
488
489 if (flags & DLM_LSFL_TIMEWARN)
490 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
491
492 /* ls_exflags are forced to match among nodes, and we don't
493 need to require all nodes to have some flags set */
494 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
495 DLM_LSFL_NEWEXCL));
496
497 size = dlm_config.ci_rsbtbl_size;
498 ls->ls_rsbtbl_size = size;
499
500 ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501 if (!ls->ls_rsbtbl)
502 goto out_lsfree;
503 for (i = 0; i < size; i++) {
504 ls->ls_rsbtbl[i].keep.rb_node = NULL;
505 ls->ls_rsbtbl[i].toss.rb_node = NULL;
506 spin_lock_init(&ls->ls_rsbtbl[i].lock);
507 }
508
509 spin_lock_init(&ls->ls_remove_spin);
510
511 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
512 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
513 GFP_KERNEL);
514 if (!ls->ls_remove_names[i])
515 goto out_rsbtbl;
516 }
517
518 idr_init(&ls->ls_lkbidr);
519 spin_lock_init(&ls->ls_lkbidr_spin);
520
521 INIT_LIST_HEAD(&ls->ls_waiters);
522 mutex_init(&ls->ls_waiters_mutex);
523 INIT_LIST_HEAD(&ls->ls_orphans);
524 mutex_init(&ls->ls_orphans_mutex);
525 INIT_LIST_HEAD(&ls->ls_timeout);
526 mutex_init(&ls->ls_timeout_mutex);
527
528 INIT_LIST_HEAD(&ls->ls_new_rsb);
529 spin_lock_init(&ls->ls_new_rsb_spin);
530
531 INIT_LIST_HEAD(&ls->ls_nodes);
532 INIT_LIST_HEAD(&ls->ls_nodes_gone);
533 ls->ls_num_nodes = 0;
534 ls->ls_low_nodeid = 0;
535 ls->ls_total_weight = 0;
536 ls->ls_node_array = NULL;
537
538 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
539 ls->ls_stub_rsb.res_ls = ls;
540
541 ls->ls_debug_rsb_dentry = NULL;
542 ls->ls_debug_waiters_dentry = NULL;
543
544 init_waitqueue_head(&ls->ls_uevent_wait);
545 ls->ls_uevent_result = 0;
546 init_completion(&ls->ls_members_done);
547 ls->ls_members_result = -1;
548
549 mutex_init(&ls->ls_cb_mutex);
550 INIT_LIST_HEAD(&ls->ls_cb_delay);
551
552 ls->ls_recoverd_task = NULL;
553 mutex_init(&ls->ls_recoverd_active);
554 spin_lock_init(&ls->ls_recover_lock);
555 spin_lock_init(&ls->ls_rcom_spin);
556 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
557 ls->ls_recover_status = 0;
558 ls->ls_recover_seq = 0;
559 ls->ls_recover_args = NULL;
560 init_rwsem(&ls->ls_in_recovery);
561 init_rwsem(&ls->ls_recv_active);
562 INIT_LIST_HEAD(&ls->ls_requestqueue);
563 mutex_init(&ls->ls_requestqueue_mutex);
564 mutex_init(&ls->ls_clear_proc_locks);
565
566 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
567 if (!ls->ls_recover_buf)
568 goto out_lkbidr;
569
570 ls->ls_slot = 0;
571 ls->ls_num_slots = 0;
572 ls->ls_slots_size = 0;
573 ls->ls_slots = NULL;
574
575 INIT_LIST_HEAD(&ls->ls_recover_list);
576 spin_lock_init(&ls->ls_recover_list_lock);
577 idr_init(&ls->ls_recover_idr);
578 spin_lock_init(&ls->ls_recover_idr_lock);
579 ls->ls_recover_list_count = 0;
580 ls->ls_local_handle = ls;
581 init_waitqueue_head(&ls->ls_wait_general);
582 INIT_LIST_HEAD(&ls->ls_root_list);
583 init_rwsem(&ls->ls_root_sem);
584
585 spin_lock(&lslist_lock);
586 ls->ls_create_count = 1;
587 list_add(&ls->ls_list, &lslist);
588 spin_unlock(&lslist_lock);
589
590 if (flags & DLM_LSFL_FS) {
591 error = dlm_callback_start(ls);
592 if (error) {
593 log_error(ls, "can't start dlm_callback %d", error);
594 goto out_delist;
595 }
596 }
597
598 init_waitqueue_head(&ls->ls_recover_lock_wait);
599
600 /*
601 * Once started, dlm_recoverd first looks for ls in lslist, then
602 * initializes ls_in_recovery as locked in "down" mode. We need
603 * to wait for the wakeup from dlm_recoverd because in_recovery
604 * has to start out in down mode.
605 */
606
607 error = dlm_recoverd_start(ls);
608 if (error) {
609 log_error(ls, "can't start dlm_recoverd %d", error);
610 goto out_callback;
611 }
612
613 wait_event(ls->ls_recover_lock_wait,
614 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
615
616 ls->ls_kobj.kset = dlm_kset;
617 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
618 "%s", ls->ls_name);
619 if (error)
620 goto out_recoverd;
621 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
622
623 /* let kobject handle freeing of ls if there's an error */
624 do_unreg = 1;
625
626 /* This uevent triggers dlm_controld in userspace to add us to the
627 group of nodes that are members of this lockspace (managed by the
628 cluster infrastructure.) Once it's done that, it tells us who the
629 current lockspace members are (via configfs) and then tells the
630 lockspace to start running (via sysfs) in dlm_ls_start(). */
631
632 error = do_uevent(ls, 1);
633 if (error)
634 goto out_recoverd;
635
636 wait_for_completion(&ls->ls_members_done);
637 error = ls->ls_members_result;
638 if (error)
639 goto out_members;
640
641 dlm_create_debug_file(ls);
642
643 log_rinfo(ls, "join complete");
644 *lockspace = ls;
645 return 0;
646
647 out_members:
648 do_uevent(ls, 0);
649 dlm_clear_members(ls);
650 kfree(ls->ls_node_array);
651 out_recoverd:
652 dlm_recoverd_stop(ls);
653 out_callback:
654 dlm_callback_stop(ls);
655 out_delist:
656 spin_lock(&lslist_lock);
657 list_del(&ls->ls_list);
658 spin_unlock(&lslist_lock);
659 idr_destroy(&ls->ls_recover_idr);
660 kfree(ls->ls_recover_buf);
661 out_lkbidr:
662 idr_destroy(&ls->ls_lkbidr);
663 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
664 if (ls->ls_remove_names[i])
665 kfree(ls->ls_remove_names[i]);
666 }
667 out_rsbtbl:
668 vfree(ls->ls_rsbtbl);
669 out_lsfree:
670 if (do_unreg)
671 kobject_put(&ls->ls_kobj);
672 else
673 kfree(ls);
674 out:
675 module_put(THIS_MODULE);
676 return error;
677}
678
679int dlm_new_lockspace(const char *name, const char *cluster,
680 uint32_t flags, int lvblen,
681 const struct dlm_lockspace_ops *ops, void *ops_arg,
682 int *ops_result, dlm_lockspace_t **lockspace)
683{
684 int error = 0;
685
686 mutex_lock(&ls_lock);
687 if (!ls_count)
688 error = threads_start();
689 if (error)
690 goto out;
691
692 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
693 ops_result, lockspace);
694 if (!error)
695 ls_count++;
696 if (error > 0)
697 error = 0;
698 if (!ls_count)
699 threads_stop();
700 out:
701 mutex_unlock(&ls_lock);
702 return error;
703}
704
705static int lkb_idr_is_local(int id, void *p, void *data)
706{
707 struct dlm_lkb *lkb = p;
708
709 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
710}
711
712static int lkb_idr_is_any(int id, void *p, void *data)
713{
714 return 1;
715}
716
717static int lkb_idr_free(int id, void *p, void *data)
718{
719 struct dlm_lkb *lkb = p;
720
721 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
722 dlm_free_lvb(lkb->lkb_lvbptr);
723
724 dlm_free_lkb(lkb);
725 return 0;
726}
727
728/* NOTE: We check the lkbidr here rather than the resource table.
729 This is because there may be LKBs queued as ASTs that have been unlinked
730 from their RSBs and are pending deletion once the AST has been delivered */
731
732static int lockspace_busy(struct dlm_ls *ls, int force)
733{
734 int rv;
735
736 spin_lock(&ls->ls_lkbidr_spin);
737 if (force == 0) {
738 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
739 } else if (force == 1) {
740 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
741 } else {
742 rv = 0;
743 }
744 spin_unlock(&ls->ls_lkbidr_spin);
745 return rv;
746}
747
748static int release_lockspace(struct dlm_ls *ls, int force)
749{
750 struct dlm_rsb *rsb;
751 struct rb_node *n;
752 int i, busy, rv;
753
754 busy = lockspace_busy(ls, force);
755
756 spin_lock(&lslist_lock);
757 if (ls->ls_create_count == 1) {
758 if (busy) {
759 rv = -EBUSY;
760 } else {
761 /* remove_lockspace takes ls off lslist */
762 ls->ls_create_count = 0;
763 rv = 0;
764 }
765 } else if (ls->ls_create_count > 1) {
766 rv = --ls->ls_create_count;
767 } else {
768 rv = -EINVAL;
769 }
770 spin_unlock(&lslist_lock);
771
772 if (rv) {
773 log_debug(ls, "release_lockspace no remove %d", rv);
774 return rv;
775 }
776
777 dlm_device_deregister(ls);
778
779 if (force < 3 && dlm_user_daemon_available())
780 do_uevent(ls, 0);
781
782 dlm_recoverd_stop(ls);
783
784 dlm_callback_stop(ls);
785
786 remove_lockspace(ls);
787
788 dlm_delete_debug_file(ls);
789
790 kfree(ls->ls_recover_buf);
791
792 /*
793 * Free all lkb's in idr
794 */
795
796 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
797 idr_destroy(&ls->ls_lkbidr);
798
799 /*
800 * Free all rsb's on rsbtbl[] lists
801 */
802
803 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
804 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
805 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
806 rb_erase(n, &ls->ls_rsbtbl[i].keep);
807 dlm_free_rsb(rsb);
808 }
809
810 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
811 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
812 rb_erase(n, &ls->ls_rsbtbl[i].toss);
813 dlm_free_rsb(rsb);
814 }
815 }
816
817 vfree(ls->ls_rsbtbl);
818
819 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
820 kfree(ls->ls_remove_names[i]);
821
822 while (!list_empty(&ls->ls_new_rsb)) {
823 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
824 res_hashchain);
825 list_del(&rsb->res_hashchain);
826 dlm_free_rsb(rsb);
827 }
828
829 /*
830 * Free structures on any other lists
831 */
832
833 dlm_purge_requestqueue(ls);
834 kfree(ls->ls_recover_args);
835 dlm_clear_members(ls);
836 dlm_clear_members_gone(ls);
837 kfree(ls->ls_node_array);
838 log_rinfo(ls, "release_lockspace final free");
839 kobject_put(&ls->ls_kobj);
840 /* The ls structure will be freed when the kobject is done with */
841
842 module_put(THIS_MODULE);
843 return 0;
844}
845
846/*
847 * Called when a system has released all its locks and is not going to use the
848 * lockspace any longer. We free everything we're managing for this lockspace.
849 * Remaining nodes will go through the recovery process as if we'd died. The
850 * lockspace must continue to function as usual, participating in recoveries,
851 * until this returns.
852 *
853 * Force has 4 possible values:
854 * 0 - don't destroy locksapce if it has any LKBs
855 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
856 * 2 - destroy lockspace regardless of LKBs
857 * 3 - destroy lockspace as part of a forced shutdown
858 */
859
860int dlm_release_lockspace(void *lockspace, int force)
861{
862 struct dlm_ls *ls;
863 int error;
864
865 ls = dlm_find_lockspace_local(lockspace);
866 if (!ls)
867 return -EINVAL;
868 dlm_put_lockspace(ls);
869
870 mutex_lock(&ls_lock);
871 error = release_lockspace(ls, force);
872 if (!error)
873 ls_count--;
874 if (!ls_count)
875 threads_stop();
876 mutex_unlock(&ls_lock);
877
878 return error;
879}
880
881void dlm_stop_lockspaces(void)
882{
883 struct dlm_ls *ls;
884 int count;
885
886 restart:
887 count = 0;
888 spin_lock(&lslist_lock);
889 list_for_each_entry(ls, &lslist, ls_list) {
890 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
891 count++;
892 continue;
893 }
894 spin_unlock(&lslist_lock);
895 log_error(ls, "no userland control daemon, stopping lockspace");
896 dlm_ls_stop(ls);
897 goto restart;
898 }
899 spin_unlock(&lslist_lock);
900
901 if (count)
902 log_print("dlm user daemon left %d lockspaces", count);
903}
904