1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
25 #include "requestqueue.h"
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
38 int n = simple_strtol(buf, NULL, 0);
40 ls = dlm_find_lockspace_local(ls->ls_local_handle);
54 dlm_put_lockspace(ls);
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 uint32_t status = dlm_recover_status(ls);
80 return snprintf(buf, PAGE_SIZE, "%x\n", status);
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
85 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
89 struct attribute attr;
90 ssize_t (*show)(struct dlm_ls *, char *);
91 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
94 static struct dlm_attr dlm_attr_control = {
95 .attr = {.name = "control", .mode = S_IWUSR},
96 .store = dlm_control_store
99 static struct dlm_attr dlm_attr_event = {
100 .attr = {.name = "event_done", .mode = S_IWUSR},
101 .store = dlm_event_store
104 static struct dlm_attr dlm_attr_id = {
105 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
107 .store = dlm_id_store
110 static struct dlm_attr dlm_attr_recover_status = {
111 .attr = {.name = "recover_status", .mode = S_IRUGO},
112 .show = dlm_recover_status_show
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
117 .show = dlm_recover_nodeid_show
120 static struct attribute *dlm_attrs[] = {
121 &dlm_attr_control.attr,
122 &dlm_attr_event.attr,
124 &dlm_attr_recover_status.attr,
125 &dlm_attr_recover_nodeid.attr,
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
132 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
133 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 return a->show ? a->show(ls, buf) : 0;
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 const char *buf, size_t len)
140 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
141 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 return a->store ? a->store(ls, buf, len) : len;
145 static void lockspace_kobj_release(struct kobject *k)
147 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
151 static struct sysfs_ops dlm_attr_ops = {
152 .show = dlm_attr_show,
153 .store = dlm_attr_store,
156 static struct kobj_type dlm_ktype = {
157 .default_attrs = dlm_attrs,
158 .sysfs_ops = &dlm_attr_ops,
159 .release = lockspace_kobj_release,
162 static struct kset *dlm_kset;
164 static int do_uevent(struct dlm_ls *ls, int in)
169 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
171 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
173 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
175 /* dlm_controld will see the uevent, do the necessary group management
176 and then write to sysfs to wake us */
178 error = wait_event_interruptible(ls->ls_uevent_wait,
179 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
181 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
186 error = ls->ls_uevent_result;
189 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 error, ls->ls_uevent_result);
195 int __init dlm_lockspace_init(void)
198 mutex_init(&ls_lock);
199 INIT_LIST_HEAD(&lslist);
200 spin_lock_init(&lslist_lock);
202 dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
204 printk(KERN_WARNING "%s: can not create kset\n", __func__);
210 void dlm_lockspace_exit(void)
212 kset_unregister(dlm_kset);
215 static int dlm_scand(void *data)
219 while (!kthread_should_stop()) {
220 list_for_each_entry(ls, &lslist, ls_list) {
221 if (dlm_lock_recovery_try(ls)) {
223 dlm_scan_timeout(ls);
224 dlm_unlock_recovery(ls);
227 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
232 static int dlm_scand_start(void)
234 struct task_struct *p;
237 p = kthread_run(dlm_scand, NULL, "dlm_scand");
245 static void dlm_scand_stop(void)
247 kthread_stop(scand_task);
250 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
254 spin_lock(&lslist_lock);
256 list_for_each_entry(ls, &lslist, ls_list) {
257 if (ls->ls_global_id == id) {
264 spin_unlock(&lslist_lock);
268 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
272 spin_lock(&lslist_lock);
273 list_for_each_entry(ls, &lslist, ls_list) {
274 if (ls->ls_local_handle == lockspace) {
281 spin_unlock(&lslist_lock);
285 struct dlm_ls *dlm_find_lockspace_device(int minor)
289 spin_lock(&lslist_lock);
290 list_for_each_entry(ls, &lslist, ls_list) {
291 if (ls->ls_device.minor == minor) {
298 spin_unlock(&lslist_lock);
302 void dlm_put_lockspace(struct dlm_ls *ls)
304 spin_lock(&lslist_lock);
306 spin_unlock(&lslist_lock);
309 static void remove_lockspace(struct dlm_ls *ls)
312 spin_lock(&lslist_lock);
313 if (ls->ls_count == 0) {
314 WARN_ON(ls->ls_create_count != 0);
315 list_del(&ls->ls_list);
316 spin_unlock(&lslist_lock);
319 spin_unlock(&lslist_lock);
324 static int threads_start(void)
328 /* Thread which process lock requests for all lockspace's */
329 error = dlm_astd_start();
331 log_print("cannot start dlm_astd thread %d", error);
335 error = dlm_scand_start();
337 log_print("cannot start dlm_scand thread %d", error);
341 /* Thread for sending/receiving messages for all lockspace's */
342 error = dlm_lowcomms_start();
344 log_print("cannot start dlm lowcomms %d", error);
358 static void threads_stop(void)
365 static int new_lockspace(char *name, int namelen, void **lockspace,
366 uint32_t flags, int lvblen)
372 if (namelen > DLM_LOCKSPACE_LEN)
375 if (!lvblen || (lvblen % 8))
378 if (!try_module_get(THIS_MODULE))
383 spin_lock(&lslist_lock);
384 list_for_each_entry(ls, &lslist, ls_list) {
385 WARN_ON(ls->ls_create_count <= 0);
386 if (ls->ls_namelen != namelen)
388 if (memcmp(ls->ls_name, name, namelen))
390 if (flags & DLM_LSFL_NEWEXCL) {
394 ls->ls_create_count++;
395 module_put(THIS_MODULE);
396 error = 1; /* not an error, return 0 */
399 spin_unlock(&lslist_lock);
408 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
411 memcpy(ls->ls_name, name, namelen);
412 ls->ls_namelen = namelen;
413 ls->ls_lvblen = lvblen;
417 if (flags & DLM_LSFL_TIMEWARN)
418 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
420 if (flags & DLM_LSFL_FS)
421 ls->ls_allocation = GFP_NOFS;
423 ls->ls_allocation = GFP_KERNEL;
425 /* ls_exflags are forced to match among nodes, and we don't
426 need to require all nodes to have some flags set */
427 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
430 size = dlm_config.ci_rsbtbl_size;
431 ls->ls_rsbtbl_size = size;
433 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
436 for (i = 0; i < size; i++) {
437 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
438 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
439 rwlock_init(&ls->ls_rsbtbl[i].lock);
442 size = dlm_config.ci_lkbtbl_size;
443 ls->ls_lkbtbl_size = size;
445 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
448 for (i = 0; i < size; i++) {
449 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
450 rwlock_init(&ls->ls_lkbtbl[i].lock);
451 ls->ls_lkbtbl[i].counter = 1;
454 size = dlm_config.ci_dirtbl_size;
455 ls->ls_dirtbl_size = size;
457 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
460 for (i = 0; i < size; i++) {
461 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
462 rwlock_init(&ls->ls_dirtbl[i].lock);
465 INIT_LIST_HEAD(&ls->ls_waiters);
466 mutex_init(&ls->ls_waiters_mutex);
467 INIT_LIST_HEAD(&ls->ls_orphans);
468 mutex_init(&ls->ls_orphans_mutex);
469 INIT_LIST_HEAD(&ls->ls_timeout);
470 mutex_init(&ls->ls_timeout_mutex);
472 INIT_LIST_HEAD(&ls->ls_nodes);
473 INIT_LIST_HEAD(&ls->ls_nodes_gone);
474 ls->ls_num_nodes = 0;
475 ls->ls_low_nodeid = 0;
476 ls->ls_total_weight = 0;
477 ls->ls_node_array = NULL;
479 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
480 ls->ls_stub_rsb.res_ls = ls;
482 ls->ls_debug_rsb_dentry = NULL;
483 ls->ls_debug_waiters_dentry = NULL;
485 init_waitqueue_head(&ls->ls_uevent_wait);
486 ls->ls_uevent_result = 0;
487 init_completion(&ls->ls_members_done);
488 ls->ls_members_result = -1;
490 ls->ls_recoverd_task = NULL;
491 mutex_init(&ls->ls_recoverd_active);
492 spin_lock_init(&ls->ls_recover_lock);
493 spin_lock_init(&ls->ls_rcom_spin);
494 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
495 ls->ls_recover_status = 0;
496 ls->ls_recover_seq = 0;
497 ls->ls_recover_args = NULL;
498 init_rwsem(&ls->ls_in_recovery);
499 init_rwsem(&ls->ls_recv_active);
500 INIT_LIST_HEAD(&ls->ls_requestqueue);
501 mutex_init(&ls->ls_requestqueue_mutex);
502 mutex_init(&ls->ls_clear_proc_locks);
504 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
505 if (!ls->ls_recover_buf)
508 INIT_LIST_HEAD(&ls->ls_recover_list);
509 spin_lock_init(&ls->ls_recover_list_lock);
510 ls->ls_recover_list_count = 0;
511 ls->ls_local_handle = ls;
512 init_waitqueue_head(&ls->ls_wait_general);
513 INIT_LIST_HEAD(&ls->ls_root_list);
514 init_rwsem(&ls->ls_root_sem);
516 down_write(&ls->ls_in_recovery);
518 spin_lock(&lslist_lock);
519 ls->ls_create_count = 1;
520 list_add(&ls->ls_list, &lslist);
521 spin_unlock(&lslist_lock);
523 /* needs to find ls in lslist */
524 error = dlm_recoverd_start(ls);
526 log_error(ls, "can't start dlm_recoverd %d", error);
530 ls->ls_kobj.kset = dlm_kset;
531 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
535 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
537 /* let kobject handle freeing of ls if there's an error */
540 /* This uevent triggers dlm_controld in userspace to add us to the
541 group of nodes that are members of this lockspace (managed by the
542 cluster infrastructure.) Once it's done that, it tells us who the
543 current lockspace members are (via configfs) and then tells the
544 lockspace to start running (via sysfs) in dlm_ls_start(). */
546 error = do_uevent(ls, 1);
550 wait_for_completion(&ls->ls_members_done);
551 error = ls->ls_members_result;
555 dlm_create_debug_file(ls);
557 log_debug(ls, "join complete");
564 dlm_clear_members(ls);
565 kfree(ls->ls_node_array);
567 dlm_recoverd_stop(ls);
569 spin_lock(&lslist_lock);
570 list_del(&ls->ls_list);
571 spin_unlock(&lslist_lock);
572 kfree(ls->ls_recover_buf);
574 kfree(ls->ls_dirtbl);
576 kfree(ls->ls_lkbtbl);
578 kfree(ls->ls_rsbtbl);
581 kobject_put(&ls->ls_kobj);
585 module_put(THIS_MODULE);
589 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
590 uint32_t flags, int lvblen)
594 mutex_lock(&ls_lock);
596 error = threads_start();
600 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
606 mutex_unlock(&ls_lock);
610 /* Return 1 if the lockspace still has active remote locks,
611 * 2 if the lockspace still has active local locks.
613 static int lockspace_busy(struct dlm_ls *ls)
615 int i, lkb_found = 0;
618 /* NOTE: We check the lockidtbl here rather than the resource table.
619 This is because there may be LKBs queued as ASTs that have been
620 unlinked from their RSBs and are pending deletion once the AST has
623 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
624 read_lock(&ls->ls_lkbtbl[i].lock);
625 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
627 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
629 if (!lkb->lkb_nodeid) {
630 read_unlock(&ls->ls_lkbtbl[i].lock);
635 read_unlock(&ls->ls_lkbtbl[i].lock);
640 static int release_lockspace(struct dlm_ls *ls, int force)
644 struct list_head *head;
647 busy = lockspace_busy(ls);
649 spin_lock(&lslist_lock);
650 if (ls->ls_create_count == 1) {
654 /* remove_lockspace takes ls off lslist */
655 ls->ls_create_count = 0;
658 } else if (ls->ls_create_count > 1) {
659 rv = --ls->ls_create_count;
663 spin_unlock(&lslist_lock);
666 log_debug(ls, "release_lockspace no remove %d", rv);
670 dlm_device_deregister(ls);
675 dlm_recoverd_stop(ls);
677 remove_lockspace(ls);
679 dlm_delete_debug_file(ls);
683 kfree(ls->ls_recover_buf);
686 * Free direntry structs.
690 kfree(ls->ls_dirtbl);
693 * Free all lkb's on lkbtbl[] lists.
696 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
697 head = &ls->ls_lkbtbl[i].list;
698 while (!list_empty(head)) {
699 lkb = list_entry(head->next, struct dlm_lkb,
702 list_del(&lkb->lkb_idtbl_list);
706 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
707 dlm_free_lvb(lkb->lkb_lvbptr);
714 kfree(ls->ls_lkbtbl);
717 * Free all rsb's on rsbtbl[] lists
720 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
721 head = &ls->ls_rsbtbl[i].list;
722 while (!list_empty(head)) {
723 rsb = list_entry(head->next, struct dlm_rsb,
726 list_del(&rsb->res_hashchain);
730 head = &ls->ls_rsbtbl[i].toss;
731 while (!list_empty(head)) {
732 rsb = list_entry(head->next, struct dlm_rsb,
734 list_del(&rsb->res_hashchain);
739 kfree(ls->ls_rsbtbl);
742 * Free structures on any other lists
745 dlm_purge_requestqueue(ls);
746 kfree(ls->ls_recover_args);
747 dlm_clear_free_entries(ls);
748 dlm_clear_members(ls);
749 dlm_clear_members_gone(ls);
750 kfree(ls->ls_node_array);
751 log_debug(ls, "release_lockspace final free");
752 kobject_put(&ls->ls_kobj);
753 /* The ls structure will be freed when the kobject is done with */
755 module_put(THIS_MODULE);
760 * Called when a system has released all its locks and is not going to use the
761 * lockspace any longer. We free everything we're managing for this lockspace.
762 * Remaining nodes will go through the recovery process as if we'd died. The
763 * lockspace must continue to function as usual, participating in recoveries,
764 * until this returns.
766 * Force has 4 possible values:
767 * 0 - don't destroy locksapce if it has any LKBs
768 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
769 * 2 - destroy lockspace regardless of LKBs
770 * 3 - destroy lockspace as part of a forced shutdown
773 int dlm_release_lockspace(void *lockspace, int force)
778 ls = dlm_find_lockspace_local(lockspace);
781 dlm_put_lockspace(ls);
783 mutex_lock(&ls_lock);
784 error = release_lockspace(ls, force);
789 mutex_unlock(&ls_lock);