]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - fs/dlm/lockspace.c
56eae4e4a954f54289501552f2b0be841b5265d6
[linux-2.6-omap-h63xx.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79         uint32_t status = dlm_recover_status(ls);
80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89         struct attribute attr;
90         ssize_t (*show)(struct dlm_ls *, char *);
91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95         .attr  = {.name = "control", .mode = S_IWUSR},
96         .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100         .attr  = {.name = "event_done", .mode = S_IWUSR},
101         .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106         .show  = dlm_id_show,
107         .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
112         .show  = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117         .show  = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121         &dlm_attr_control.attr,
122         &dlm_attr_event.attr,
123         &dlm_attr_id.attr,
124         &dlm_attr_recover_status.attr,
125         &dlm_attr_recover_nodeid.attr,
126         NULL,
127 };
128
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130                              char *buf)
131 {
132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134         return a->show ? a->show(ls, buf) : 0;
135 }
136
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138                               const char *buf, size_t len)
139 {
140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142         return a->store ? a->store(ls, buf, len) : len;
143 }
144
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148         kfree(ls);
149 }
150
151 static struct sysfs_ops dlm_attr_ops = {
152         .show  = dlm_attr_show,
153         .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157         .default_attrs = dlm_attrs,
158         .sysfs_ops     = &dlm_attr_ops,
159         .release       = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166         int error;
167
168         if (in)
169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170         else
171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175         /* dlm_controld will see the uevent, do the necessary group management
176            and then write to sysfs to wake us */
177
178         error = wait_event_interruptible(ls->ls_uevent_wait,
179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183         if (error)
184                 goto out;
185
186         error = ls->ls_uevent_result;
187  out:
188         if (error)
189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190                           error, ls->ls_uevent_result);
191         return error;
192 }
193
194
195 int __init dlm_lockspace_init(void)
196 {
197         ls_count = 0;
198         mutex_init(&ls_lock);
199         INIT_LIST_HEAD(&lslist);
200         spin_lock_init(&lslist_lock);
201
202         dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203         if (!dlm_kset) {
204                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
205                 return -ENOMEM;
206         }
207         return 0;
208 }
209
210 void dlm_lockspace_exit(void)
211 {
212         kset_unregister(dlm_kset);
213 }
214
215 static int dlm_scand(void *data)
216 {
217         struct dlm_ls *ls;
218
219         while (!kthread_should_stop()) {
220                 list_for_each_entry(ls, &lslist, ls_list) {
221                         if (dlm_lock_recovery_try(ls)) {
222                                 dlm_scan_rsbs(ls);
223                                 dlm_scan_timeout(ls);
224                                 dlm_unlock_recovery(ls);
225                         }
226                 }
227                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
228         }
229         return 0;
230 }
231
232 static int dlm_scand_start(void)
233 {
234         struct task_struct *p;
235         int error = 0;
236
237         p = kthread_run(dlm_scand, NULL, "dlm_scand");
238         if (IS_ERR(p))
239                 error = PTR_ERR(p);
240         else
241                 scand_task = p;
242         return error;
243 }
244
245 static void dlm_scand_stop(void)
246 {
247         kthread_stop(scand_task);
248 }
249
250 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
251 {
252         struct dlm_ls *ls;
253
254         spin_lock(&lslist_lock);
255
256         list_for_each_entry(ls, &lslist, ls_list) {
257                 if (ls->ls_global_id == id) {
258                         ls->ls_count++;
259                         goto out;
260                 }
261         }
262         ls = NULL;
263  out:
264         spin_unlock(&lslist_lock);
265         return ls;
266 }
267
268 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
269 {
270         struct dlm_ls *ls;
271
272         spin_lock(&lslist_lock);
273         list_for_each_entry(ls, &lslist, ls_list) {
274                 if (ls->ls_local_handle == lockspace) {
275                         ls->ls_count++;
276                         goto out;
277                 }
278         }
279         ls = NULL;
280  out:
281         spin_unlock(&lslist_lock);
282         return ls;
283 }
284
285 struct dlm_ls *dlm_find_lockspace_device(int minor)
286 {
287         struct dlm_ls *ls;
288
289         spin_lock(&lslist_lock);
290         list_for_each_entry(ls, &lslist, ls_list) {
291                 if (ls->ls_device.minor == minor) {
292                         ls->ls_count++;
293                         goto out;
294                 }
295         }
296         ls = NULL;
297  out:
298         spin_unlock(&lslist_lock);
299         return ls;
300 }
301
302 void dlm_put_lockspace(struct dlm_ls *ls)
303 {
304         spin_lock(&lslist_lock);
305         ls->ls_count--;
306         spin_unlock(&lslist_lock);
307 }
308
309 static void remove_lockspace(struct dlm_ls *ls)
310 {
311         for (;;) {
312                 spin_lock(&lslist_lock);
313                 if (ls->ls_count == 0) {
314                         WARN_ON(ls->ls_create_count != 0);
315                         list_del(&ls->ls_list);
316                         spin_unlock(&lslist_lock);
317                         return;
318                 }
319                 spin_unlock(&lslist_lock);
320                 ssleep(1);
321         }
322 }
323
324 static int threads_start(void)
325 {
326         int error;
327
328         /* Thread which process lock requests for all lockspace's */
329         error = dlm_astd_start();
330         if (error) {
331                 log_print("cannot start dlm_astd thread %d", error);
332                 goto fail;
333         }
334
335         error = dlm_scand_start();
336         if (error) {
337                 log_print("cannot start dlm_scand thread %d", error);
338                 goto astd_fail;
339         }
340
341         /* Thread for sending/receiving messages for all lockspace's */
342         error = dlm_lowcomms_start();
343         if (error) {
344                 log_print("cannot start dlm lowcomms %d", error);
345                 goto scand_fail;
346         }
347
348         return 0;
349
350  scand_fail:
351         dlm_scand_stop();
352  astd_fail:
353         dlm_astd_stop();
354  fail:
355         return error;
356 }
357
358 static void threads_stop(void)
359 {
360         dlm_scand_stop();
361         dlm_lowcomms_stop();
362         dlm_astd_stop();
363 }
364
365 static int new_lockspace(char *name, int namelen, void **lockspace,
366                          uint32_t flags, int lvblen)
367 {
368         struct dlm_ls *ls;
369         int i, size, error;
370         int do_unreg = 0;
371
372         if (namelen > DLM_LOCKSPACE_LEN)
373                 return -EINVAL;
374
375         if (!lvblen || (lvblen % 8))
376                 return -EINVAL;
377
378         if (!try_module_get(THIS_MODULE))
379                 return -EINVAL;
380
381         error = 0;
382
383         spin_lock(&lslist_lock);
384         list_for_each_entry(ls, &lslist, ls_list) {
385                 WARN_ON(ls->ls_create_count <= 0);
386                 if (ls->ls_namelen != namelen)
387                         continue;
388                 if (memcmp(ls->ls_name, name, namelen))
389                         continue;
390                 if (flags & DLM_LSFL_NEWEXCL) {
391                         error = -EEXIST;
392                         break;
393                 }
394                 ls->ls_create_count++;
395                 module_put(THIS_MODULE);
396                 error = 1; /* not an error, return 0 */
397                 break;
398         }
399         spin_unlock(&lslist_lock);
400
401         if (error < 0)
402                 goto out;
403         if (error)
404                 goto ret_zero;
405
406         error = -ENOMEM;
407
408         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
409         if (!ls)
410                 goto out;
411         memcpy(ls->ls_name, name, namelen);
412         ls->ls_namelen = namelen;
413         ls->ls_lvblen = lvblen;
414         ls->ls_count = 0;
415         ls->ls_flags = 0;
416
417         if (flags & DLM_LSFL_TIMEWARN)
418                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
419
420         if (flags & DLM_LSFL_FS)
421                 ls->ls_allocation = GFP_NOFS;
422         else
423                 ls->ls_allocation = GFP_KERNEL;
424
425         /* ls_exflags are forced to match among nodes, and we don't
426            need to require all nodes to have some flags set */
427         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
428                                     DLM_LSFL_NEWEXCL));
429
430         size = dlm_config.ci_rsbtbl_size;
431         ls->ls_rsbtbl_size = size;
432
433         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
434         if (!ls->ls_rsbtbl)
435                 goto out_lsfree;
436         for (i = 0; i < size; i++) {
437                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
438                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
439                 rwlock_init(&ls->ls_rsbtbl[i].lock);
440         }
441
442         size = dlm_config.ci_lkbtbl_size;
443         ls->ls_lkbtbl_size = size;
444
445         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
446         if (!ls->ls_lkbtbl)
447                 goto out_rsbfree;
448         for (i = 0; i < size; i++) {
449                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
450                 rwlock_init(&ls->ls_lkbtbl[i].lock);
451                 ls->ls_lkbtbl[i].counter = 1;
452         }
453
454         size = dlm_config.ci_dirtbl_size;
455         ls->ls_dirtbl_size = size;
456
457         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
458         if (!ls->ls_dirtbl)
459                 goto out_lkbfree;
460         for (i = 0; i < size; i++) {
461                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
462                 rwlock_init(&ls->ls_dirtbl[i].lock);
463         }
464
465         INIT_LIST_HEAD(&ls->ls_waiters);
466         mutex_init(&ls->ls_waiters_mutex);
467         INIT_LIST_HEAD(&ls->ls_orphans);
468         mutex_init(&ls->ls_orphans_mutex);
469         INIT_LIST_HEAD(&ls->ls_timeout);
470         mutex_init(&ls->ls_timeout_mutex);
471
472         INIT_LIST_HEAD(&ls->ls_nodes);
473         INIT_LIST_HEAD(&ls->ls_nodes_gone);
474         ls->ls_num_nodes = 0;
475         ls->ls_low_nodeid = 0;
476         ls->ls_total_weight = 0;
477         ls->ls_node_array = NULL;
478
479         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
480         ls->ls_stub_rsb.res_ls = ls;
481
482         ls->ls_debug_rsb_dentry = NULL;
483         ls->ls_debug_waiters_dentry = NULL;
484
485         init_waitqueue_head(&ls->ls_uevent_wait);
486         ls->ls_uevent_result = 0;
487         init_completion(&ls->ls_members_done);
488         ls->ls_members_result = -1;
489
490         ls->ls_recoverd_task = NULL;
491         mutex_init(&ls->ls_recoverd_active);
492         spin_lock_init(&ls->ls_recover_lock);
493         spin_lock_init(&ls->ls_rcom_spin);
494         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
495         ls->ls_recover_status = 0;
496         ls->ls_recover_seq = 0;
497         ls->ls_recover_args = NULL;
498         init_rwsem(&ls->ls_in_recovery);
499         init_rwsem(&ls->ls_recv_active);
500         INIT_LIST_HEAD(&ls->ls_requestqueue);
501         mutex_init(&ls->ls_requestqueue_mutex);
502         mutex_init(&ls->ls_clear_proc_locks);
503
504         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
505         if (!ls->ls_recover_buf)
506                 goto out_dirfree;
507
508         INIT_LIST_HEAD(&ls->ls_recover_list);
509         spin_lock_init(&ls->ls_recover_list_lock);
510         ls->ls_recover_list_count = 0;
511         ls->ls_local_handle = ls;
512         init_waitqueue_head(&ls->ls_wait_general);
513         INIT_LIST_HEAD(&ls->ls_root_list);
514         init_rwsem(&ls->ls_root_sem);
515
516         down_write(&ls->ls_in_recovery);
517
518         spin_lock(&lslist_lock);
519         ls->ls_create_count = 1;
520         list_add(&ls->ls_list, &lslist);
521         spin_unlock(&lslist_lock);
522
523         /* needs to find ls in lslist */
524         error = dlm_recoverd_start(ls);
525         if (error) {
526                 log_error(ls, "can't start dlm_recoverd %d", error);
527                 goto out_delist;
528         }
529
530         ls->ls_kobj.kset = dlm_kset;
531         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
532                                      "%s", ls->ls_name);
533         if (error)
534                 goto out_stop;
535         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
536
537         /* let kobject handle freeing of ls if there's an error */
538         do_unreg = 1;
539
540         /* This uevent triggers dlm_controld in userspace to add us to the
541            group of nodes that are members of this lockspace (managed by the
542            cluster infrastructure.)  Once it's done that, it tells us who the
543            current lockspace members are (via configfs) and then tells the
544            lockspace to start running (via sysfs) in dlm_ls_start(). */
545
546         error = do_uevent(ls, 1);
547         if (error)
548                 goto out_stop;
549
550         wait_for_completion(&ls->ls_members_done);
551         error = ls->ls_members_result;
552         if (error)
553                 goto out_members;
554
555         dlm_create_debug_file(ls);
556
557         log_debug(ls, "join complete");
558  ret_zero:
559         *lockspace = ls;
560         return 0;
561
562  out_members:
563         do_uevent(ls, 0);
564         dlm_clear_members(ls);
565         kfree(ls->ls_node_array);
566  out_stop:
567         dlm_recoverd_stop(ls);
568  out_delist:
569         spin_lock(&lslist_lock);
570         list_del(&ls->ls_list);
571         spin_unlock(&lslist_lock);
572         kfree(ls->ls_recover_buf);
573  out_dirfree:
574         kfree(ls->ls_dirtbl);
575  out_lkbfree:
576         kfree(ls->ls_lkbtbl);
577  out_rsbfree:
578         kfree(ls->ls_rsbtbl);
579  out_lsfree:
580         if (do_unreg)
581                 kobject_put(&ls->ls_kobj);
582         else
583                 kfree(ls);
584  out:
585         module_put(THIS_MODULE);
586         return error;
587 }
588
589 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
590                       uint32_t flags, int lvblen)
591 {
592         int error = 0;
593
594         mutex_lock(&ls_lock);
595         if (!ls_count)
596                 error = threads_start();
597         if (error)
598                 goto out;
599
600         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
601         if (!error)
602                 ls_count++;
603         else if (!ls_count)
604                 threads_stop();
605  out:
606         mutex_unlock(&ls_lock);
607         return error;
608 }
609
610 /* Return 1 if the lockspace still has active remote locks,
611  *        2 if the lockspace still has active local locks.
612  */
613 static int lockspace_busy(struct dlm_ls *ls)
614 {
615         int i, lkb_found = 0;
616         struct dlm_lkb *lkb;
617
618         /* NOTE: We check the lockidtbl here rather than the resource table.
619            This is because there may be LKBs queued as ASTs that have been
620            unlinked from their RSBs and are pending deletion once the AST has
621            been delivered */
622
623         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
624                 read_lock(&ls->ls_lkbtbl[i].lock);
625                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
626                         lkb_found = 1;
627                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
628                                             lkb_idtbl_list) {
629                                 if (!lkb->lkb_nodeid) {
630                                         read_unlock(&ls->ls_lkbtbl[i].lock);
631                                         return 2;
632                                 }
633                         }
634                 }
635                 read_unlock(&ls->ls_lkbtbl[i].lock);
636         }
637         return lkb_found;
638 }
639
640 static int release_lockspace(struct dlm_ls *ls, int force)
641 {
642         struct dlm_lkb *lkb;
643         struct dlm_rsb *rsb;
644         struct list_head *head;
645         int i, busy, rv;
646
647         busy = lockspace_busy(ls);
648
649         spin_lock(&lslist_lock);
650         if (ls->ls_create_count == 1) {
651                 if (busy > force)
652                         rv = -EBUSY;
653                 else {
654                         /* remove_lockspace takes ls off lslist */
655                         ls->ls_create_count = 0;
656                         rv = 0;
657                 }
658         } else if (ls->ls_create_count > 1) {
659                 rv = --ls->ls_create_count;
660         } else {
661                 rv = -EINVAL;
662         }
663         spin_unlock(&lslist_lock);
664
665         if (rv) {
666                 log_debug(ls, "release_lockspace no remove %d", rv);
667                 return rv;
668         }
669
670         dlm_device_deregister(ls);
671
672         if (force < 3)
673                 do_uevent(ls, 0);
674
675         dlm_recoverd_stop(ls);
676
677         remove_lockspace(ls);
678
679         dlm_delete_debug_file(ls);
680
681         dlm_astd_suspend();
682
683         kfree(ls->ls_recover_buf);
684
685         /*
686          * Free direntry structs.
687          */
688
689         dlm_dir_clear(ls);
690         kfree(ls->ls_dirtbl);
691
692         /*
693          * Free all lkb's on lkbtbl[] lists.
694          */
695
696         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
697                 head = &ls->ls_lkbtbl[i].list;
698                 while (!list_empty(head)) {
699                         lkb = list_entry(head->next, struct dlm_lkb,
700                                          lkb_idtbl_list);
701
702                         list_del(&lkb->lkb_idtbl_list);
703
704                         dlm_del_ast(lkb);
705
706                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
707                                 dlm_free_lvb(lkb->lkb_lvbptr);
708
709                         dlm_free_lkb(lkb);
710                 }
711         }
712         dlm_astd_resume();
713
714         kfree(ls->ls_lkbtbl);
715
716         /*
717          * Free all rsb's on rsbtbl[] lists
718          */
719
720         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
721                 head = &ls->ls_rsbtbl[i].list;
722                 while (!list_empty(head)) {
723                         rsb = list_entry(head->next, struct dlm_rsb,
724                                          res_hashchain);
725
726                         list_del(&rsb->res_hashchain);
727                         dlm_free_rsb(rsb);
728                 }
729
730                 head = &ls->ls_rsbtbl[i].toss;
731                 while (!list_empty(head)) {
732                         rsb = list_entry(head->next, struct dlm_rsb,
733                                          res_hashchain);
734                         list_del(&rsb->res_hashchain);
735                         dlm_free_rsb(rsb);
736                 }
737         }
738
739         kfree(ls->ls_rsbtbl);
740
741         /*
742          * Free structures on any other lists
743          */
744
745         dlm_purge_requestqueue(ls);
746         kfree(ls->ls_recover_args);
747         dlm_clear_free_entries(ls);
748         dlm_clear_members(ls);
749         dlm_clear_members_gone(ls);
750         kfree(ls->ls_node_array);
751         log_debug(ls, "release_lockspace final free");
752         kobject_put(&ls->ls_kobj);
753         /* The ls structure will be freed when the kobject is done with */
754
755         module_put(THIS_MODULE);
756         return 0;
757 }
758
759 /*
760  * Called when a system has released all its locks and is not going to use the
761  * lockspace any longer.  We free everything we're managing for this lockspace.
762  * Remaining nodes will go through the recovery process as if we'd died.  The
763  * lockspace must continue to function as usual, participating in recoveries,
764  * until this returns.
765  *
766  * Force has 4 possible values:
767  * 0 - don't destroy locksapce if it has any LKBs
768  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
769  * 2 - destroy lockspace regardless of LKBs
770  * 3 - destroy lockspace as part of a forced shutdown
771  */
772
773 int dlm_release_lockspace(void *lockspace, int force)
774 {
775         struct dlm_ls *ls;
776         int error;
777
778         ls = dlm_find_lockspace_local(lockspace);
779         if (!ls)
780                 return -EINVAL;
781         dlm_put_lockspace(ls);
782
783         mutex_lock(&ls_lock);
784         error = release_lockspace(ls, force);
785         if (!error)
786                 ls_count--;
787         else if (!ls_count)
788                 threads_stop();
789         mutex_unlock(&ls_lock);
790
791         return error;
792 }
793