1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
25 #include "requestqueue.h"
28 static struct mutex ls_lock;
29 static struct list_head lslist;
30 static spinlock_t lslist_lock;
31 static struct task_struct * scand_task;
34 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 int n = simple_strtol(buf, NULL, 0);
39 ls = dlm_find_lockspace_local(ls->ls_local_handle);
53 dlm_put_lockspace(ls);
57 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
60 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
61 wake_up(&ls->ls_uevent_wait);
65 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
70 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
76 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 uint32_t status = dlm_recover_status(ls);
79 return snprintf(buf, PAGE_SIZE, "%x\n", status);
82 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
88 struct attribute attr;
89 ssize_t (*show)(struct dlm_ls *, char *);
90 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
93 static struct dlm_attr dlm_attr_control = {
94 .attr = {.name = "control", .mode = S_IWUSR},
95 .store = dlm_control_store
98 static struct dlm_attr dlm_attr_event = {
99 .attr = {.name = "event_done", .mode = S_IWUSR},
100 .store = dlm_event_store
103 static struct dlm_attr dlm_attr_id = {
104 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 .store = dlm_id_store
109 static struct dlm_attr dlm_attr_recover_status = {
110 .attr = {.name = "recover_status", .mode = S_IRUGO},
111 .show = dlm_recover_status_show
114 static struct dlm_attr dlm_attr_recover_nodeid = {
115 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
116 .show = dlm_recover_nodeid_show
119 static struct attribute *dlm_attrs[] = {
120 &dlm_attr_control.attr,
121 &dlm_attr_event.attr,
123 &dlm_attr_recover_status.attr,
124 &dlm_attr_recover_nodeid.attr,
128 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
131 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
132 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
133 return a->show ? a->show(ls, buf) : 0;
136 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
137 const char *buf, size_t len)
139 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
140 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141 return a->store ? a->store(ls, buf, len) : len;
144 static void lockspace_kobj_release(struct kobject *k)
146 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
150 static struct sysfs_ops dlm_attr_ops = {
151 .show = dlm_attr_show,
152 .store = dlm_attr_store,
155 static struct kobj_type dlm_ktype = {
156 .default_attrs = dlm_attrs,
157 .sysfs_ops = &dlm_attr_ops,
158 .release = lockspace_kobj_release,
161 static struct kset *dlm_kset;
163 static int do_uevent(struct dlm_ls *ls, int in)
168 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172 log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174 /* dlm_controld will see the uevent, do the necessary group management
175 and then write to sysfs to wake us */
177 error = wait_event_interruptible(ls->ls_uevent_wait,
178 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180 log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
185 error = ls->ls_uevent_result;
188 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
189 error, ls->ls_uevent_result);
194 int __init dlm_lockspace_init(void)
197 mutex_init(&ls_lock);
198 INIT_LIST_HEAD(&lslist);
199 spin_lock_init(&lslist_lock);
201 dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203 printk(KERN_WARNING "%s: can not create kset\n", __FUNCTION__);
209 void dlm_lockspace_exit(void)
211 kset_unregister(dlm_kset);
214 static int dlm_scand(void *data)
218 while (!kthread_should_stop()) {
219 list_for_each_entry(ls, &lslist, ls_list) {
220 if (dlm_lock_recovery_try(ls)) {
222 dlm_scan_timeout(ls);
223 dlm_unlock_recovery(ls);
226 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
231 static int dlm_scand_start(void)
233 struct task_struct *p;
236 p = kthread_run(dlm_scand, NULL, "dlm_scand");
244 static void dlm_scand_stop(void)
246 kthread_stop(scand_task);
249 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
253 spin_lock(&lslist_lock);
255 list_for_each_entry(ls, &lslist, ls_list) {
256 if (ls->ls_namelen == namelen &&
257 memcmp(ls->ls_name, name, namelen) == 0)
262 spin_unlock(&lslist_lock);
266 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
270 spin_lock(&lslist_lock);
272 list_for_each_entry(ls, &lslist, ls_list) {
273 if (ls->ls_global_id == id) {
280 spin_unlock(&lslist_lock);
284 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
288 spin_lock(&lslist_lock);
289 list_for_each_entry(ls, &lslist, ls_list) {
290 if (ls->ls_local_handle == lockspace) {
297 spin_unlock(&lslist_lock);
301 struct dlm_ls *dlm_find_lockspace_device(int minor)
305 spin_lock(&lslist_lock);
306 list_for_each_entry(ls, &lslist, ls_list) {
307 if (ls->ls_device.minor == minor) {
314 spin_unlock(&lslist_lock);
318 void dlm_put_lockspace(struct dlm_ls *ls)
320 spin_lock(&lslist_lock);
322 spin_unlock(&lslist_lock);
325 static void remove_lockspace(struct dlm_ls *ls)
328 spin_lock(&lslist_lock);
329 if (ls->ls_count == 0) {
330 list_del(&ls->ls_list);
331 spin_unlock(&lslist_lock);
334 spin_unlock(&lslist_lock);
339 static int threads_start(void)
343 /* Thread which process lock requests for all lockspace's */
344 error = dlm_astd_start();
346 log_print("cannot start dlm_astd thread %d", error);
350 error = dlm_scand_start();
352 log_print("cannot start dlm_scand thread %d", error);
356 /* Thread for sending/receiving messages for all lockspace's */
357 error = dlm_lowcomms_start();
359 log_print("cannot start dlm lowcomms %d", error);
373 static void threads_stop(void)
380 static int new_lockspace(char *name, int namelen, void **lockspace,
381 uint32_t flags, int lvblen)
384 int i, size, error = -ENOMEM;
387 if (namelen > DLM_LOCKSPACE_LEN)
390 if (!lvblen || (lvblen % 8))
393 if (!try_module_get(THIS_MODULE))
396 ls = dlm_find_lockspace_name(name, namelen);
399 module_put(THIS_MODULE);
403 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
406 memcpy(ls->ls_name, name, namelen);
407 ls->ls_namelen = namelen;
408 ls->ls_lvblen = lvblen;
412 if (flags & DLM_LSFL_TIMEWARN)
413 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
415 if (flags & DLM_LSFL_FS)
416 ls->ls_allocation = GFP_NOFS;
418 ls->ls_allocation = GFP_KERNEL;
420 /* ls_exflags are forced to match among nodes, and we don't
421 need to require all nodes to have TIMEWARN or FS set */
422 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
424 size = dlm_config.ci_rsbtbl_size;
425 ls->ls_rsbtbl_size = size;
427 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
430 for (i = 0; i < size; i++) {
431 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
432 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
433 rwlock_init(&ls->ls_rsbtbl[i].lock);
436 size = dlm_config.ci_lkbtbl_size;
437 ls->ls_lkbtbl_size = size;
439 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
442 for (i = 0; i < size; i++) {
443 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
444 rwlock_init(&ls->ls_lkbtbl[i].lock);
445 ls->ls_lkbtbl[i].counter = 1;
448 size = dlm_config.ci_dirtbl_size;
449 ls->ls_dirtbl_size = size;
451 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
454 for (i = 0; i < size; i++) {
455 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
456 rwlock_init(&ls->ls_dirtbl[i].lock);
459 INIT_LIST_HEAD(&ls->ls_waiters);
460 mutex_init(&ls->ls_waiters_mutex);
461 INIT_LIST_HEAD(&ls->ls_orphans);
462 mutex_init(&ls->ls_orphans_mutex);
463 INIT_LIST_HEAD(&ls->ls_timeout);
464 mutex_init(&ls->ls_timeout_mutex);
466 INIT_LIST_HEAD(&ls->ls_nodes);
467 INIT_LIST_HEAD(&ls->ls_nodes_gone);
468 ls->ls_num_nodes = 0;
469 ls->ls_low_nodeid = 0;
470 ls->ls_total_weight = 0;
471 ls->ls_node_array = NULL;
473 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
474 ls->ls_stub_rsb.res_ls = ls;
476 ls->ls_debug_rsb_dentry = NULL;
477 ls->ls_debug_waiters_dentry = NULL;
479 init_waitqueue_head(&ls->ls_uevent_wait);
480 ls->ls_uevent_result = 0;
481 init_completion(&ls->ls_members_done);
482 ls->ls_members_result = -1;
484 ls->ls_recoverd_task = NULL;
485 mutex_init(&ls->ls_recoverd_active);
486 spin_lock_init(&ls->ls_recover_lock);
487 spin_lock_init(&ls->ls_rcom_spin);
488 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
489 ls->ls_recover_status = 0;
490 ls->ls_recover_seq = 0;
491 ls->ls_recover_args = NULL;
492 init_rwsem(&ls->ls_in_recovery);
493 init_rwsem(&ls->ls_recv_active);
494 INIT_LIST_HEAD(&ls->ls_requestqueue);
495 mutex_init(&ls->ls_requestqueue_mutex);
496 mutex_init(&ls->ls_clear_proc_locks);
498 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
499 if (!ls->ls_recover_buf)
502 INIT_LIST_HEAD(&ls->ls_recover_list);
503 spin_lock_init(&ls->ls_recover_list_lock);
504 ls->ls_recover_list_count = 0;
505 ls->ls_local_handle = ls;
506 init_waitqueue_head(&ls->ls_wait_general);
507 INIT_LIST_HEAD(&ls->ls_root_list);
508 init_rwsem(&ls->ls_root_sem);
510 down_write(&ls->ls_in_recovery);
512 spin_lock(&lslist_lock);
513 list_add(&ls->ls_list, &lslist);
514 spin_unlock(&lslist_lock);
516 /* needs to find ls in lslist */
517 error = dlm_recoverd_start(ls);
519 log_error(ls, "can't start dlm_recoverd %d", error);
523 ls->ls_kobj.kset = dlm_kset;
524 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
528 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
530 /* let kobject handle freeing of ls if there's an error */
533 /* This uevent triggers dlm_controld in userspace to add us to the
534 group of nodes that are members of this lockspace (managed by the
535 cluster infrastructure.) Once it's done that, it tells us who the
536 current lockspace members are (via configfs) and then tells the
537 lockspace to start running (via sysfs) in dlm_ls_start(). */
539 error = do_uevent(ls, 1);
543 wait_for_completion(&ls->ls_members_done);
544 error = ls->ls_members_result;
548 dlm_create_debug_file(ls);
550 log_debug(ls, "join complete");
557 dlm_clear_members(ls);
558 kfree(ls->ls_node_array);
560 dlm_recoverd_stop(ls);
562 spin_lock(&lslist_lock);
563 list_del(&ls->ls_list);
564 spin_unlock(&lslist_lock);
565 kfree(ls->ls_recover_buf);
567 kfree(ls->ls_dirtbl);
569 kfree(ls->ls_lkbtbl);
571 kfree(ls->ls_rsbtbl);
574 kobject_put(&ls->ls_kobj);
578 module_put(THIS_MODULE);
582 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
583 uint32_t flags, int lvblen)
587 mutex_lock(&ls_lock);
589 error = threads_start();
593 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
599 mutex_unlock(&ls_lock);
603 /* Return 1 if the lockspace still has active remote locks,
604 * 2 if the lockspace still has active local locks.
606 static int lockspace_busy(struct dlm_ls *ls)
608 int i, lkb_found = 0;
611 /* NOTE: We check the lockidtbl here rather than the resource table.
612 This is because there may be LKBs queued as ASTs that have been
613 unlinked from their RSBs and are pending deletion once the AST has
616 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
617 read_lock(&ls->ls_lkbtbl[i].lock);
618 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
620 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
622 if (!lkb->lkb_nodeid) {
623 read_unlock(&ls->ls_lkbtbl[i].lock);
628 read_unlock(&ls->ls_lkbtbl[i].lock);
633 static int release_lockspace(struct dlm_ls *ls, int force)
637 struct list_head *head;
639 int busy = lockspace_busy(ls);
647 dlm_recoverd_stop(ls);
649 remove_lockspace(ls);
651 dlm_delete_debug_file(ls);
655 kfree(ls->ls_recover_buf);
658 * Free direntry structs.
662 kfree(ls->ls_dirtbl);
665 * Free all lkb's on lkbtbl[] lists.
668 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
669 head = &ls->ls_lkbtbl[i].list;
670 while (!list_empty(head)) {
671 lkb = list_entry(head->next, struct dlm_lkb,
674 list_del(&lkb->lkb_idtbl_list);
678 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
679 dlm_free_lvb(lkb->lkb_lvbptr);
686 kfree(ls->ls_lkbtbl);
689 * Free all rsb's on rsbtbl[] lists
692 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
693 head = &ls->ls_rsbtbl[i].list;
694 while (!list_empty(head)) {
695 rsb = list_entry(head->next, struct dlm_rsb,
698 list_del(&rsb->res_hashchain);
702 head = &ls->ls_rsbtbl[i].toss;
703 while (!list_empty(head)) {
704 rsb = list_entry(head->next, struct dlm_rsb,
706 list_del(&rsb->res_hashchain);
711 kfree(ls->ls_rsbtbl);
714 * Free structures on any other lists
717 dlm_purge_requestqueue(ls);
718 kfree(ls->ls_recover_args);
719 dlm_clear_free_entries(ls);
720 dlm_clear_members(ls);
721 dlm_clear_members_gone(ls);
722 kfree(ls->ls_node_array);
723 kobject_put(&ls->ls_kobj);
724 /* The ls structure will be freed when the kobject is done with */
726 mutex_lock(&ls_lock);
730 mutex_unlock(&ls_lock);
732 module_put(THIS_MODULE);
737 * Called when a system has released all its locks and is not going to use the
738 * lockspace any longer. We free everything we're managing for this lockspace.
739 * Remaining nodes will go through the recovery process as if we'd died. The
740 * lockspace must continue to function as usual, participating in recoveries,
741 * until this returns.
743 * Force has 4 possible values:
744 * 0 - don't destroy locksapce if it has any LKBs
745 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
746 * 2 - destroy lockspace regardless of LKBs
747 * 3 - destroy lockspace as part of a forced shutdown
750 int dlm_release_lockspace(void *lockspace, int force)
754 ls = dlm_find_lockspace_local(lockspace);
757 dlm_put_lockspace(ls);
758 return release_lockspace(ls, force);