]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - fs/dlm/lockspace.c
Merge branch 'r6040' of git://git.kernel.org/pub/scm/linux/kernel/git/romieu/netdev...
[linux-2.6-omap-h63xx.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26
27 static int                      ls_count;
28 static struct mutex             ls_lock;
29 static struct list_head         lslist;
30 static spinlock_t               lslist_lock;
31 static struct task_struct *     scand_task;
32
33
34 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
35 {
36         ssize_t ret = len;
37         int n = simple_strtol(buf, NULL, 0);
38
39         ls = dlm_find_lockspace_local(ls->ls_local_handle);
40         if (!ls)
41                 return -EINVAL;
42
43         switch (n) {
44         case 0:
45                 dlm_ls_stop(ls);
46                 break;
47         case 1:
48                 dlm_ls_start(ls);
49                 break;
50         default:
51                 ret = -EINVAL;
52         }
53         dlm_put_lockspace(ls);
54         return ret;
55 }
56
57 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
58 {
59         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
60         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
61         wake_up(&ls->ls_uevent_wait);
62         return len;
63 }
64
65 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
66 {
67         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
68 }
69
70 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
71 {
72         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
73         return len;
74 }
75
76 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
77 {
78         uint32_t status = dlm_recover_status(ls);
79         return snprintf(buf, PAGE_SIZE, "%x\n", status);
80 }
81
82 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
83 {
84         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
85 }
86
87 struct dlm_attr {
88         struct attribute attr;
89         ssize_t (*show)(struct dlm_ls *, char *);
90         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
91 };
92
93 static struct dlm_attr dlm_attr_control = {
94         .attr  = {.name = "control", .mode = S_IWUSR},
95         .store = dlm_control_store
96 };
97
98 static struct dlm_attr dlm_attr_event = {
99         .attr  = {.name = "event_done", .mode = S_IWUSR},
100         .store = dlm_event_store
101 };
102
103 static struct dlm_attr dlm_attr_id = {
104         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
105         .show  = dlm_id_show,
106         .store = dlm_id_store
107 };
108
109 static struct dlm_attr dlm_attr_recover_status = {
110         .attr  = {.name = "recover_status", .mode = S_IRUGO},
111         .show  = dlm_recover_status_show
112 };
113
114 static struct dlm_attr dlm_attr_recover_nodeid = {
115         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
116         .show  = dlm_recover_nodeid_show
117 };
118
119 static struct attribute *dlm_attrs[] = {
120         &dlm_attr_control.attr,
121         &dlm_attr_event.attr,
122         &dlm_attr_id.attr,
123         &dlm_attr_recover_status.attr,
124         &dlm_attr_recover_nodeid.attr,
125         NULL,
126 };
127
128 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
129                              char *buf)
130 {
131         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
132         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
133         return a->show ? a->show(ls, buf) : 0;
134 }
135
136 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
137                               const char *buf, size_t len)
138 {
139         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141         return a->store ? a->store(ls, buf, len) : len;
142 }
143
144 static void lockspace_kobj_release(struct kobject *k)
145 {
146         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
147         kfree(ls);
148 }
149
150 static struct sysfs_ops dlm_attr_ops = {
151         .show  = dlm_attr_show,
152         .store = dlm_attr_store,
153 };
154
155 static struct kobj_type dlm_ktype = {
156         .default_attrs = dlm_attrs,
157         .sysfs_ops     = &dlm_attr_ops,
158         .release       = lockspace_kobj_release,
159 };
160
161 static struct kset *dlm_kset;
162
163 static int do_uevent(struct dlm_ls *ls, int in)
164 {
165         int error;
166
167         if (in)
168                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
169         else
170                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
171
172         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
173
174         /* dlm_controld will see the uevent, do the necessary group management
175            and then write to sysfs to wake us */
176
177         error = wait_event_interruptible(ls->ls_uevent_wait,
178                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
179
180         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
181
182         if (error)
183                 goto out;
184
185         error = ls->ls_uevent_result;
186  out:
187         if (error)
188                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
189                           error, ls->ls_uevent_result);
190         return error;
191 }
192
193
194 int __init dlm_lockspace_init(void)
195 {
196         ls_count = 0;
197         mutex_init(&ls_lock);
198         INIT_LIST_HEAD(&lslist);
199         spin_lock_init(&lslist_lock);
200
201         dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
202         if (!dlm_kset) {
203                 printk(KERN_WARNING "%s: can not create kset\n", __FUNCTION__);
204                 return -ENOMEM;
205         }
206         return 0;
207 }
208
209 void dlm_lockspace_exit(void)
210 {
211         kset_unregister(dlm_kset);
212 }
213
214 static int dlm_scand(void *data)
215 {
216         struct dlm_ls *ls;
217
218         while (!kthread_should_stop()) {
219                 list_for_each_entry(ls, &lslist, ls_list) {
220                         if (dlm_lock_recovery_try(ls)) {
221                                 dlm_scan_rsbs(ls);
222                                 dlm_scan_timeout(ls);
223                                 dlm_unlock_recovery(ls);
224                         }
225                 }
226                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
227         }
228         return 0;
229 }
230
231 static int dlm_scand_start(void)
232 {
233         struct task_struct *p;
234         int error = 0;
235
236         p = kthread_run(dlm_scand, NULL, "dlm_scand");
237         if (IS_ERR(p))
238                 error = PTR_ERR(p);
239         else
240                 scand_task = p;
241         return error;
242 }
243
244 static void dlm_scand_stop(void)
245 {
246         kthread_stop(scand_task);
247 }
248
249 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
250 {
251         struct dlm_ls *ls;
252
253         spin_lock(&lslist_lock);
254
255         list_for_each_entry(ls, &lslist, ls_list) {
256                 if (ls->ls_namelen == namelen &&
257                     memcmp(ls->ls_name, name, namelen) == 0)
258                         goto out;
259         }
260         ls = NULL;
261  out:
262         spin_unlock(&lslist_lock);
263         return ls;
264 }
265
266 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
267 {
268         struct dlm_ls *ls;
269
270         spin_lock(&lslist_lock);
271
272         list_for_each_entry(ls, &lslist, ls_list) {
273                 if (ls->ls_global_id == id) {
274                         ls->ls_count++;
275                         goto out;
276                 }
277         }
278         ls = NULL;
279  out:
280         spin_unlock(&lslist_lock);
281         return ls;
282 }
283
284 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
285 {
286         struct dlm_ls *ls;
287
288         spin_lock(&lslist_lock);
289         list_for_each_entry(ls, &lslist, ls_list) {
290                 if (ls->ls_local_handle == lockspace) {
291                         ls->ls_count++;
292                         goto out;
293                 }
294         }
295         ls = NULL;
296  out:
297         spin_unlock(&lslist_lock);
298         return ls;
299 }
300
301 struct dlm_ls *dlm_find_lockspace_device(int minor)
302 {
303         struct dlm_ls *ls;
304
305         spin_lock(&lslist_lock);
306         list_for_each_entry(ls, &lslist, ls_list) {
307                 if (ls->ls_device.minor == minor) {
308                         ls->ls_count++;
309                         goto out;
310                 }
311         }
312         ls = NULL;
313  out:
314         spin_unlock(&lslist_lock);
315         return ls;
316 }
317
318 void dlm_put_lockspace(struct dlm_ls *ls)
319 {
320         spin_lock(&lslist_lock);
321         ls->ls_count--;
322         spin_unlock(&lslist_lock);
323 }
324
325 static void remove_lockspace(struct dlm_ls *ls)
326 {
327         for (;;) {
328                 spin_lock(&lslist_lock);
329                 if (ls->ls_count == 0) {
330                         list_del(&ls->ls_list);
331                         spin_unlock(&lslist_lock);
332                         return;
333                 }
334                 spin_unlock(&lslist_lock);
335                 ssleep(1);
336         }
337 }
338
339 static int threads_start(void)
340 {
341         int error;
342
343         /* Thread which process lock requests for all lockspace's */
344         error = dlm_astd_start();
345         if (error) {
346                 log_print("cannot start dlm_astd thread %d", error);
347                 goto fail;
348         }
349
350         error = dlm_scand_start();
351         if (error) {
352                 log_print("cannot start dlm_scand thread %d", error);
353                 goto astd_fail;
354         }
355
356         /* Thread for sending/receiving messages for all lockspace's */
357         error = dlm_lowcomms_start();
358         if (error) {
359                 log_print("cannot start dlm lowcomms %d", error);
360                 goto scand_fail;
361         }
362
363         return 0;
364
365  scand_fail:
366         dlm_scand_stop();
367  astd_fail:
368         dlm_astd_stop();
369  fail:
370         return error;
371 }
372
373 static void threads_stop(void)
374 {
375         dlm_scand_stop();
376         dlm_lowcomms_stop();
377         dlm_astd_stop();
378 }
379
380 static int new_lockspace(char *name, int namelen, void **lockspace,
381                          uint32_t flags, int lvblen)
382 {
383         struct dlm_ls *ls;
384         int i, size, error = -ENOMEM;
385         int do_unreg = 0;
386
387         if (namelen > DLM_LOCKSPACE_LEN)
388                 return -EINVAL;
389
390         if (!lvblen || (lvblen % 8))
391                 return -EINVAL;
392
393         if (!try_module_get(THIS_MODULE))
394                 return -EINVAL;
395
396         ls = dlm_find_lockspace_name(name, namelen);
397         if (ls) {
398                 *lockspace = ls;
399                 module_put(THIS_MODULE);
400                 return -EEXIST;
401         }
402
403         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
404         if (!ls)
405                 goto out;
406         memcpy(ls->ls_name, name, namelen);
407         ls->ls_namelen = namelen;
408         ls->ls_lvblen = lvblen;
409         ls->ls_count = 0;
410         ls->ls_flags = 0;
411
412         if (flags & DLM_LSFL_TIMEWARN)
413                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
414
415         if (flags & DLM_LSFL_FS)
416                 ls->ls_allocation = GFP_NOFS;
417         else
418                 ls->ls_allocation = GFP_KERNEL;
419
420         /* ls_exflags are forced to match among nodes, and we don't
421            need to require all nodes to have TIMEWARN or FS set */
422         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
423
424         size = dlm_config.ci_rsbtbl_size;
425         ls->ls_rsbtbl_size = size;
426
427         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
428         if (!ls->ls_rsbtbl)
429                 goto out_lsfree;
430         for (i = 0; i < size; i++) {
431                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
432                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
433                 rwlock_init(&ls->ls_rsbtbl[i].lock);
434         }
435
436         size = dlm_config.ci_lkbtbl_size;
437         ls->ls_lkbtbl_size = size;
438
439         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
440         if (!ls->ls_lkbtbl)
441                 goto out_rsbfree;
442         for (i = 0; i < size; i++) {
443                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
444                 rwlock_init(&ls->ls_lkbtbl[i].lock);
445                 ls->ls_lkbtbl[i].counter = 1;
446         }
447
448         size = dlm_config.ci_dirtbl_size;
449         ls->ls_dirtbl_size = size;
450
451         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
452         if (!ls->ls_dirtbl)
453                 goto out_lkbfree;
454         for (i = 0; i < size; i++) {
455                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
456                 rwlock_init(&ls->ls_dirtbl[i].lock);
457         }
458
459         INIT_LIST_HEAD(&ls->ls_waiters);
460         mutex_init(&ls->ls_waiters_mutex);
461         INIT_LIST_HEAD(&ls->ls_orphans);
462         mutex_init(&ls->ls_orphans_mutex);
463         INIT_LIST_HEAD(&ls->ls_timeout);
464         mutex_init(&ls->ls_timeout_mutex);
465
466         INIT_LIST_HEAD(&ls->ls_nodes);
467         INIT_LIST_HEAD(&ls->ls_nodes_gone);
468         ls->ls_num_nodes = 0;
469         ls->ls_low_nodeid = 0;
470         ls->ls_total_weight = 0;
471         ls->ls_node_array = NULL;
472
473         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
474         ls->ls_stub_rsb.res_ls = ls;
475
476         ls->ls_debug_rsb_dentry = NULL;
477         ls->ls_debug_waiters_dentry = NULL;
478
479         init_waitqueue_head(&ls->ls_uevent_wait);
480         ls->ls_uevent_result = 0;
481         init_completion(&ls->ls_members_done);
482         ls->ls_members_result = -1;
483
484         ls->ls_recoverd_task = NULL;
485         mutex_init(&ls->ls_recoverd_active);
486         spin_lock_init(&ls->ls_recover_lock);
487         spin_lock_init(&ls->ls_rcom_spin);
488         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
489         ls->ls_recover_status = 0;
490         ls->ls_recover_seq = 0;
491         ls->ls_recover_args = NULL;
492         init_rwsem(&ls->ls_in_recovery);
493         init_rwsem(&ls->ls_recv_active);
494         INIT_LIST_HEAD(&ls->ls_requestqueue);
495         mutex_init(&ls->ls_requestqueue_mutex);
496         mutex_init(&ls->ls_clear_proc_locks);
497
498         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
499         if (!ls->ls_recover_buf)
500                 goto out_dirfree;
501
502         INIT_LIST_HEAD(&ls->ls_recover_list);
503         spin_lock_init(&ls->ls_recover_list_lock);
504         ls->ls_recover_list_count = 0;
505         ls->ls_local_handle = ls;
506         init_waitqueue_head(&ls->ls_wait_general);
507         INIT_LIST_HEAD(&ls->ls_root_list);
508         init_rwsem(&ls->ls_root_sem);
509
510         down_write(&ls->ls_in_recovery);
511
512         spin_lock(&lslist_lock);
513         list_add(&ls->ls_list, &lslist);
514         spin_unlock(&lslist_lock);
515
516         /* needs to find ls in lslist */
517         error = dlm_recoverd_start(ls);
518         if (error) {
519                 log_error(ls, "can't start dlm_recoverd %d", error);
520                 goto out_delist;
521         }
522
523         ls->ls_kobj.kset = dlm_kset;
524         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
525                                      "%s", ls->ls_name);
526         if (error)
527                 goto out_stop;
528         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
529
530         /* let kobject handle freeing of ls if there's an error */
531         do_unreg = 1;
532
533         /* This uevent triggers dlm_controld in userspace to add us to the
534            group of nodes that are members of this lockspace (managed by the
535            cluster infrastructure.)  Once it's done that, it tells us who the
536            current lockspace members are (via configfs) and then tells the
537            lockspace to start running (via sysfs) in dlm_ls_start(). */
538
539         error = do_uevent(ls, 1);
540         if (error)
541                 goto out_stop;
542
543         wait_for_completion(&ls->ls_members_done);
544         error = ls->ls_members_result;
545         if (error)
546                 goto out_members;
547
548         dlm_create_debug_file(ls);
549
550         log_debug(ls, "join complete");
551
552         *lockspace = ls;
553         return 0;
554
555  out_members:
556         do_uevent(ls, 0);
557         dlm_clear_members(ls);
558         kfree(ls->ls_node_array);
559  out_stop:
560         dlm_recoverd_stop(ls);
561  out_delist:
562         spin_lock(&lslist_lock);
563         list_del(&ls->ls_list);
564         spin_unlock(&lslist_lock);
565         kfree(ls->ls_recover_buf);
566  out_dirfree:
567         kfree(ls->ls_dirtbl);
568  out_lkbfree:
569         kfree(ls->ls_lkbtbl);
570  out_rsbfree:
571         kfree(ls->ls_rsbtbl);
572  out_lsfree:
573         if (do_unreg)
574                 kobject_put(&ls->ls_kobj);
575         else
576                 kfree(ls);
577  out:
578         module_put(THIS_MODULE);
579         return error;
580 }
581
582 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
583                       uint32_t flags, int lvblen)
584 {
585         int error = 0;
586
587         mutex_lock(&ls_lock);
588         if (!ls_count)
589                 error = threads_start();
590         if (error)
591                 goto out;
592
593         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
594         if (!error)
595                 ls_count++;
596         else if (!ls_count)
597                 threads_stop();
598  out:
599         mutex_unlock(&ls_lock);
600         return error;
601 }
602
603 /* Return 1 if the lockspace still has active remote locks,
604  *        2 if the lockspace still has active local locks.
605  */
606 static int lockspace_busy(struct dlm_ls *ls)
607 {
608         int i, lkb_found = 0;
609         struct dlm_lkb *lkb;
610
611         /* NOTE: We check the lockidtbl here rather than the resource table.
612            This is because there may be LKBs queued as ASTs that have been
613            unlinked from their RSBs and are pending deletion once the AST has
614            been delivered */
615
616         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
617                 read_lock(&ls->ls_lkbtbl[i].lock);
618                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
619                         lkb_found = 1;
620                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
621                                             lkb_idtbl_list) {
622                                 if (!lkb->lkb_nodeid) {
623                                         read_unlock(&ls->ls_lkbtbl[i].lock);
624                                         return 2;
625                                 }
626                         }
627                 }
628                 read_unlock(&ls->ls_lkbtbl[i].lock);
629         }
630         return lkb_found;
631 }
632
633 static int release_lockspace(struct dlm_ls *ls, int force)
634 {
635         struct dlm_lkb *lkb;
636         struct dlm_rsb *rsb;
637         struct list_head *head;
638         int i;
639         int busy = lockspace_busy(ls);
640
641         if (busy > force)
642                 return -EBUSY;
643
644         if (force < 3)
645                 do_uevent(ls, 0);
646
647         dlm_recoverd_stop(ls);
648
649         remove_lockspace(ls);
650
651         dlm_delete_debug_file(ls);
652
653         dlm_astd_suspend();
654
655         kfree(ls->ls_recover_buf);
656
657         /*
658          * Free direntry structs.
659          */
660
661         dlm_dir_clear(ls);
662         kfree(ls->ls_dirtbl);
663
664         /*
665          * Free all lkb's on lkbtbl[] lists.
666          */
667
668         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
669                 head = &ls->ls_lkbtbl[i].list;
670                 while (!list_empty(head)) {
671                         lkb = list_entry(head->next, struct dlm_lkb,
672                                          lkb_idtbl_list);
673
674                         list_del(&lkb->lkb_idtbl_list);
675
676                         dlm_del_ast(lkb);
677
678                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
679                                 dlm_free_lvb(lkb->lkb_lvbptr);
680
681                         dlm_free_lkb(lkb);
682                 }
683         }
684         dlm_astd_resume();
685
686         kfree(ls->ls_lkbtbl);
687
688         /*
689          * Free all rsb's on rsbtbl[] lists
690          */
691
692         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
693                 head = &ls->ls_rsbtbl[i].list;
694                 while (!list_empty(head)) {
695                         rsb = list_entry(head->next, struct dlm_rsb,
696                                          res_hashchain);
697
698                         list_del(&rsb->res_hashchain);
699                         dlm_free_rsb(rsb);
700                 }
701
702                 head = &ls->ls_rsbtbl[i].toss;
703                 while (!list_empty(head)) {
704                         rsb = list_entry(head->next, struct dlm_rsb,
705                                          res_hashchain);
706                         list_del(&rsb->res_hashchain);
707                         dlm_free_rsb(rsb);
708                 }
709         }
710
711         kfree(ls->ls_rsbtbl);
712
713         /*
714          * Free structures on any other lists
715          */
716
717         dlm_purge_requestqueue(ls);
718         kfree(ls->ls_recover_args);
719         dlm_clear_free_entries(ls);
720         dlm_clear_members(ls);
721         dlm_clear_members_gone(ls);
722         kfree(ls->ls_node_array);
723         kobject_put(&ls->ls_kobj);
724         /* The ls structure will be freed when the kobject is done with */
725
726         mutex_lock(&ls_lock);
727         ls_count--;
728         if (!ls_count)
729                 threads_stop();
730         mutex_unlock(&ls_lock);
731
732         module_put(THIS_MODULE);
733         return 0;
734 }
735
736 /*
737  * Called when a system has released all its locks and is not going to use the
738  * lockspace any longer.  We free everything we're managing for this lockspace.
739  * Remaining nodes will go through the recovery process as if we'd died.  The
740  * lockspace must continue to function as usual, participating in recoveries,
741  * until this returns.
742  *
743  * Force has 4 possible values:
744  * 0 - don't destroy locksapce if it has any LKBs
745  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
746  * 2 - destroy lockspace regardless of LKBs
747  * 3 - destroy lockspace as part of a forced shutdown
748  */
749
750 int dlm_release_lockspace(void *lockspace, int force)
751 {
752         struct dlm_ls *ls;
753
754         ls = dlm_find_lockspace_local(lockspace);
755         if (!ls)
756                 return -EINVAL;
757         dlm_put_lockspace(ls);
758         return release_lockspace(ls, force);
759 }
760