]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - fs/ocfs2/dlm/dlmdomain.c
[PATCH] ocfs2: Alloc at least a page for the DLM hash
[linux-2.6-omap-h63xx.git] / fs / ocfs2 / dlm / dlmdomain.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/utsname.h>
32 #include <linux/init.h>
33 #include <linux/spinlock.h>
34 #include <linux/delay.h>
35 #include <linux/err.h>
36
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43
44 #include "dlmdebug.h"
45 #include "dlmdomain.h"
46
47 #include "dlmver.h"
48
49 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
50 #include "cluster/masklog.h"
51
52 static void dlm_free_pagevec(void **vec, int pages)
53 {
54         while (pages--)
55                 free_page((unsigned long)vec[pages]);
56         kfree(vec);
57 }
58
59 static void **dlm_alloc_pagevec(int pages)
60 {
61         void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
62         int i;
63
64         if (!vec)
65                 return NULL;
66
67         for (i = 0; i < pages; i++)
68                 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
69                         goto out_free;
70
71         mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %Zd buckets per page\n",
72              pages, DLM_HASH_PAGES, DLM_BUCKETS_PER_PAGE);
73         return vec;
74 out_free:
75         dlm_free_pagevec(vec, i);
76         return NULL;
77 }
78
79 /*
80  *
81  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
82  *    dlm_domain_lock
83  *    struct dlm_ctxt->spinlock
84  *    struct dlm_lock_resource->spinlock
85  *    struct dlm_ctxt->master_lock
86  *    struct dlm_ctxt->ast_lock
87  *    dlm_master_list_entry->spinlock
88  *    dlm_lock->spinlock
89  *
90  */
91
92 spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
93 LIST_HEAD(dlm_domains);
94 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
95
96 #define DLM_DOMAIN_BACKOFF_MS 200
97
98 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data);
99 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data);
100 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data);
101 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data);
102
103 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
104
105 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
106 {
107         hlist_del_init(&lockres->hash_node);
108         dlm_lockres_put(lockres);
109 }
110
111 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
112                        struct dlm_lock_resource *res)
113 {
114         struct hlist_head *bucket;
115         struct qstr *q;
116
117         assert_spin_locked(&dlm->spinlock);
118
119         q = &res->lockname;
120         bucket = dlm_lockres_hash(dlm, q->hash);
121
122         /* get a reference for our hashtable */
123         dlm_lockres_get(res);
124
125         hlist_add_head(&res->hash_node, bucket);
126 }
127
128 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
129                                                 const char *name,
130                                                 unsigned int len,
131                                                 unsigned int hash)
132 {
133         struct hlist_head *bucket;
134         struct hlist_node *list;
135
136         mlog_entry("%.*s\n", len, name);
137
138         assert_spin_locked(&dlm->spinlock);
139
140         bucket = dlm_lockres_hash(dlm, hash);
141
142         hlist_for_each(list, bucket) {
143                 struct dlm_lock_resource *res = hlist_entry(list,
144                         struct dlm_lock_resource, hash_node);
145                 if (res->lockname.name[0] != name[0])
146                         continue;
147                 if (unlikely(res->lockname.len != len))
148                         continue;
149                 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
150                         continue;
151                 dlm_lockres_get(res);
152                 return res;
153         }
154         return NULL;
155 }
156
157 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
158                                     const char *name,
159                                     unsigned int len)
160 {
161         struct dlm_lock_resource *res;
162         unsigned int hash = dlm_lockid_hash(name, len);
163
164         spin_lock(&dlm->spinlock);
165         res = __dlm_lookup_lockres(dlm, name, len, hash);
166         spin_unlock(&dlm->spinlock);
167         return res;
168 }
169
170 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
171 {
172         struct dlm_ctxt *tmp = NULL;
173         struct list_head *iter;
174
175         assert_spin_locked(&dlm_domain_lock);
176
177         /* tmp->name here is always NULL terminated,
178          * but domain may not be! */
179         list_for_each(iter, &dlm_domains) {
180                 tmp = list_entry (iter, struct dlm_ctxt, list);
181                 if (strlen(tmp->name) == len &&
182                     memcmp(tmp->name, domain, len)==0)
183                         break;
184                 tmp = NULL;
185         }
186
187         return tmp;
188 }
189
190 /* For null terminated domain strings ONLY */
191 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
192 {
193         assert_spin_locked(&dlm_domain_lock);
194
195         return __dlm_lookup_domain_full(domain, strlen(domain));
196 }
197
198
199 /* returns true on one of two conditions:
200  * 1) the domain does not exist
201  * 2) the domain exists and it's state is "joined" */
202 static int dlm_wait_on_domain_helper(const char *domain)
203 {
204         int ret = 0;
205         struct dlm_ctxt *tmp = NULL;
206
207         spin_lock(&dlm_domain_lock);
208
209         tmp = __dlm_lookup_domain(domain);
210         if (!tmp)
211                 ret = 1;
212         else if (tmp->dlm_state == DLM_CTXT_JOINED)
213                 ret = 1;
214
215         spin_unlock(&dlm_domain_lock);
216         return ret;
217 }
218
219 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
220 {
221         if (dlm->lockres_hash)
222                 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
223
224         if (dlm->name)
225                 kfree(dlm->name);
226
227         kfree(dlm);
228 }
229
230 /* A little strange - this function will be called while holding
231  * dlm_domain_lock and is expected to be holding it on the way out. We
232  * will however drop and reacquire it multiple times */
233 static void dlm_ctxt_release(struct kref *kref)
234 {
235         struct dlm_ctxt *dlm;
236
237         dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
238
239         BUG_ON(dlm->num_joins);
240         BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
241
242         /* we may still be in the list if we hit an error during join. */
243         list_del_init(&dlm->list);
244
245         spin_unlock(&dlm_domain_lock);
246
247         mlog(0, "freeing memory from domain %s\n", dlm->name);
248
249         wake_up(&dlm_domain_events);
250
251         dlm_free_ctxt_mem(dlm);
252
253         spin_lock(&dlm_domain_lock);
254 }
255
256 void dlm_put(struct dlm_ctxt *dlm)
257 {
258         spin_lock(&dlm_domain_lock);
259         kref_put(&dlm->dlm_refs, dlm_ctxt_release);
260         spin_unlock(&dlm_domain_lock);
261 }
262
263 static void __dlm_get(struct dlm_ctxt *dlm)
264 {
265         kref_get(&dlm->dlm_refs);
266 }
267
268 /* given a questionable reference to a dlm object, gets a reference if
269  * it can find it in the list, otherwise returns NULL in which case
270  * you shouldn't trust your pointer. */
271 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
272 {
273         struct list_head *iter;
274         struct dlm_ctxt *target = NULL;
275
276         spin_lock(&dlm_domain_lock);
277
278         list_for_each(iter, &dlm_domains) {
279                 target = list_entry (iter, struct dlm_ctxt, list);
280
281                 if (target == dlm) {
282                         __dlm_get(target);
283                         break;
284                 }
285
286                 target = NULL;
287         }
288
289         spin_unlock(&dlm_domain_lock);
290
291         return target;
292 }
293
294 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
295 {
296         int ret;
297
298         spin_lock(&dlm_domain_lock);
299         ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
300                 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
301         spin_unlock(&dlm_domain_lock);
302
303         return ret;
304 }
305
306 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
307 {
308         dlm_unregister_domain_handlers(dlm);
309         dlm_complete_thread(dlm);
310         dlm_complete_recovery_thread(dlm);
311
312         /* We've left the domain. Now we can take ourselves out of the
313          * list and allow the kref stuff to help us free the
314          * memory. */
315         spin_lock(&dlm_domain_lock);
316         list_del_init(&dlm->list);
317         spin_unlock(&dlm_domain_lock);
318
319         /* Wake up anyone waiting for us to remove this domain */
320         wake_up(&dlm_domain_events);
321 }
322
323 static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
324 {
325         int i;
326         struct dlm_lock_resource *res;
327
328         mlog(0, "Migrating locks from domain %s\n", dlm->name);
329 restart:
330         spin_lock(&dlm->spinlock);
331         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
332                 while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
333                         res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
334                                           struct dlm_lock_resource, hash_node);
335                         /* need reference when manually grabbing lockres */
336                         dlm_lockres_get(res);
337                         /* this should unhash the lockres
338                          * and exit with dlm->spinlock */
339                         mlog(0, "purging res=%p\n", res);
340                         if (dlm_lockres_is_dirty(dlm, res)) {
341                                 /* HACK!  this should absolutely go.
342                                  * need to figure out why some empty
343                                  * lockreses are still marked dirty */
344                                 mlog(ML_ERROR, "lockres %.*s dirty!\n",
345                                      res->lockname.len, res->lockname.name);
346
347                                 spin_unlock(&dlm->spinlock);
348                                 dlm_kick_thread(dlm, res);
349                                 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
350                                 dlm_lockres_put(res);
351                                 goto restart;
352                         }
353                         dlm_purge_lockres(dlm, res);
354                         dlm_lockres_put(res);
355                 }
356         }
357         spin_unlock(&dlm->spinlock);
358
359         mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
360 }
361
362 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
363 {
364         int ret;
365
366         spin_lock(&dlm->spinlock);
367         ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
368         spin_unlock(&dlm->spinlock);
369
370         return ret;
371 }
372
373 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
374 {
375         /* Yikes, a double spinlock! I need domain_lock for the dlm
376          * state and the dlm spinlock for join state... Sorry! */
377 again:
378         spin_lock(&dlm_domain_lock);
379         spin_lock(&dlm->spinlock);
380
381         if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
382                 mlog(0, "Node %d is joining, we wait on it.\n",
383                           dlm->joining_node);
384                 spin_unlock(&dlm->spinlock);
385                 spin_unlock(&dlm_domain_lock);
386
387                 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
388                 goto again;
389         }
390
391         dlm->dlm_state = DLM_CTXT_LEAVING;
392         spin_unlock(&dlm->spinlock);
393         spin_unlock(&dlm_domain_lock);
394 }
395
396 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
397 {
398         int node = -1;
399
400         assert_spin_locked(&dlm->spinlock);
401
402         mlog(ML_NOTICE, "Nodes in my domain (\"%s\"):\n", dlm->name);
403
404         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
405                                      node + 1)) < O2NM_MAX_NODES) {
406                 mlog(ML_NOTICE, " node %d\n", node);
407         }
408 }
409
410 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data)
411 {
412         struct dlm_ctxt *dlm = data;
413         unsigned int node;
414         struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
415
416         mlog_entry("%p %u %p", msg, len, data);
417
418         if (!dlm_grab(dlm))
419                 return 0;
420
421         node = exit_msg->node_idx;
422
423         mlog(0, "Node %u leaves domain %s\n", node, dlm->name);
424
425         spin_lock(&dlm->spinlock);
426         clear_bit(node, dlm->domain_map);
427         __dlm_print_nodes(dlm);
428
429         /* notify anything attached to the heartbeat events */
430         dlm_hb_event_notify_attached(dlm, node, 0);
431
432         spin_unlock(&dlm->spinlock);
433
434         dlm_put(dlm);
435
436         return 0;
437 }
438
439 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
440                                     unsigned int node)
441 {
442         int status;
443         struct dlm_exit_domain leave_msg;
444
445         mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
446                   node, dlm->name, dlm->node_num);
447
448         memset(&leave_msg, 0, sizeof(leave_msg));
449         leave_msg.node_idx = dlm->node_num;
450
451         status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
452                                     &leave_msg, sizeof(leave_msg), node,
453                                     NULL);
454
455         mlog(0, "status return %d from o2net_send_message\n", status);
456
457         return status;
458 }
459
460
461 static void dlm_leave_domain(struct dlm_ctxt *dlm)
462 {
463         int node, clear_node, status;
464
465         /* At this point we've migrated away all our locks and won't
466          * accept mastership of new ones. The dlm is responsible for
467          * almost nothing now. We make sure not to confuse any joining
468          * nodes and then commence shutdown procedure. */
469
470         spin_lock(&dlm->spinlock);
471         /* Clear ourselves from the domain map */
472         clear_bit(dlm->node_num, dlm->domain_map);
473         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
474                                      0)) < O2NM_MAX_NODES) {
475                 /* Drop the dlm spinlock. This is safe wrt the domain_map.
476                  * -nodes cannot be added now as the
477                  *   query_join_handlers knows to respond with OK_NO_MAP
478                  * -we catch the right network errors if a node is
479                  *   removed from the map while we're sending him the
480                  *   exit message. */
481                 spin_unlock(&dlm->spinlock);
482
483                 clear_node = 1;
484
485                 status = dlm_send_one_domain_exit(dlm, node);
486                 if (status < 0 &&
487                     status != -ENOPROTOOPT &&
488                     status != -ENOTCONN) {
489                         mlog(ML_NOTICE, "Error %d sending domain exit message "
490                              "to node %d\n", status, node);
491
492                         /* Not sure what to do here but lets sleep for
493                          * a bit in case this was a transient
494                          * error... */
495                         msleep(DLM_DOMAIN_BACKOFF_MS);
496                         clear_node = 0;
497                 }
498
499                 spin_lock(&dlm->spinlock);
500                 /* If we're not clearing the node bit then we intend
501                  * to loop back around to try again. */
502                 if (clear_node)
503                         clear_bit(node, dlm->domain_map);
504         }
505         spin_unlock(&dlm->spinlock);
506 }
507
508 int dlm_joined(struct dlm_ctxt *dlm)
509 {
510         int ret = 0;
511
512         spin_lock(&dlm_domain_lock);
513
514         if (dlm->dlm_state == DLM_CTXT_JOINED)
515                 ret = 1;
516
517         spin_unlock(&dlm_domain_lock);
518
519         return ret;
520 }
521
522 int dlm_shutting_down(struct dlm_ctxt *dlm)
523 {
524         int ret = 0;
525
526         spin_lock(&dlm_domain_lock);
527
528         if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
529                 ret = 1;
530
531         spin_unlock(&dlm_domain_lock);
532
533         return ret;
534 }
535
536 void dlm_unregister_domain(struct dlm_ctxt *dlm)
537 {
538         int leave = 0;
539
540         spin_lock(&dlm_domain_lock);
541         BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
542         BUG_ON(!dlm->num_joins);
543
544         dlm->num_joins--;
545         if (!dlm->num_joins) {
546                 /* We mark it "in shutdown" now so new register
547                  * requests wait until we've completely left the
548                  * domain. Don't use DLM_CTXT_LEAVING yet as we still
549                  * want new domain joins to communicate with us at
550                  * least until we've completed migration of our
551                  * resources. */
552                 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
553                 leave = 1;
554         }
555         spin_unlock(&dlm_domain_lock);
556
557         if (leave) {
558                 mlog(0, "shutting down domain %s\n", dlm->name);
559
560                 /* We changed dlm state, notify the thread */
561                 dlm_kick_thread(dlm, NULL);
562
563                 dlm_migrate_all_locks(dlm);
564                 dlm_mark_domain_leaving(dlm);
565                 dlm_leave_domain(dlm);
566                 dlm_complete_dlm_shutdown(dlm);
567         }
568         dlm_put(dlm);
569 }
570 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
571
572 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
573 {
574         struct dlm_query_join_request *query;
575         enum dlm_query_join_response response;
576         struct dlm_ctxt *dlm = NULL;
577
578         query = (struct dlm_query_join_request *) msg->buf;
579
580         mlog(0, "node %u wants to join domain %s\n", query->node_idx,
581                   query->domain);
582
583         /*
584          * If heartbeat doesn't consider the node live, tell it
585          * to back off and try again.  This gives heartbeat a chance
586          * to catch up.
587          */
588         if (!o2hb_check_node_heartbeating(query->node_idx)) {
589                 mlog(0, "node %u is not in our live map yet\n",
590                      query->node_idx);
591
592                 response = JOIN_DISALLOW;
593                 goto respond;
594         }
595
596         response = JOIN_OK_NO_MAP;
597
598         spin_lock(&dlm_domain_lock);
599         dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
600         /* Once the dlm ctxt is marked as leaving then we don't want
601          * to be put in someone's domain map. 
602          * Also, explicitly disallow joining at certain troublesome
603          * times (ie. during recovery). */
604         if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
605                 int bit = query->node_idx;
606                 spin_lock(&dlm->spinlock);
607
608                 if (dlm->dlm_state == DLM_CTXT_NEW &&
609                     dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
610                         /*If this is a brand new context and we
611                          * haven't started our join process yet, then
612                          * the other node won the race. */
613                         response = JOIN_OK_NO_MAP;
614                 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
615                         /* Disallow parallel joins. */
616                         response = JOIN_DISALLOW;
617                 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
618                         mlog(ML_NOTICE, "node %u trying to join, but recovery "
619                              "is ongoing.\n", bit);
620                         response = JOIN_DISALLOW;
621                 } else if (test_bit(bit, dlm->recovery_map)) {
622                         mlog(ML_NOTICE, "node %u trying to join, but it "
623                              "still needs recovery.\n", bit);
624                         response = JOIN_DISALLOW;
625                 } else if (test_bit(bit, dlm->domain_map)) {
626                         mlog(ML_NOTICE, "node %u trying to join, but it "
627                              "is still in the domain! needs recovery?\n",
628                              bit);
629                         response = JOIN_DISALLOW;
630                 } else {
631                         /* Alright we're fully a part of this domain
632                          * so we keep some state as to who's joining
633                          * and indicate to him that needs to be fixed
634                          * up. */
635                         response = JOIN_OK;
636                         __dlm_set_joining_node(dlm, query->node_idx);
637                 }
638
639                 spin_unlock(&dlm->spinlock);
640         }
641         spin_unlock(&dlm_domain_lock);
642
643 respond:
644         mlog(0, "We respond with %u\n", response);
645
646         return response;
647 }
648
649 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
650 {
651         struct dlm_assert_joined *assert;
652         struct dlm_ctxt *dlm = NULL;
653
654         assert = (struct dlm_assert_joined *) msg->buf;
655
656         mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
657                   assert->domain);
658
659         spin_lock(&dlm_domain_lock);
660         dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
661         /* XXX should we consider no dlm ctxt an error? */
662         if (dlm) {
663                 spin_lock(&dlm->spinlock);
664
665                 /* Alright, this node has officially joined our
666                  * domain. Set him in the map and clean up our
667                  * leftover join state. */
668                 BUG_ON(dlm->joining_node != assert->node_idx);
669                 set_bit(assert->node_idx, dlm->domain_map);
670                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
671
672                 __dlm_print_nodes(dlm);
673
674                 /* notify anything attached to the heartbeat events */
675                 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
676
677                 spin_unlock(&dlm->spinlock);
678         }
679         spin_unlock(&dlm_domain_lock);
680
681         return 0;
682 }
683
684 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data)
685 {
686         struct dlm_cancel_join *cancel;
687         struct dlm_ctxt *dlm = NULL;
688
689         cancel = (struct dlm_cancel_join *) msg->buf;
690
691         mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
692                   cancel->domain);
693
694         spin_lock(&dlm_domain_lock);
695         dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
696
697         if (dlm) {
698                 spin_lock(&dlm->spinlock);
699
700                 /* Yikes, this guy wants to cancel his join. No
701                  * problem, we simply cleanup our join state. */
702                 BUG_ON(dlm->joining_node != cancel->node_idx);
703                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
704
705                 spin_unlock(&dlm->spinlock);
706         }
707         spin_unlock(&dlm_domain_lock);
708
709         return 0;
710 }
711
712 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
713                                     unsigned int node)
714 {
715         int status;
716         struct dlm_cancel_join cancel_msg;
717
718         memset(&cancel_msg, 0, sizeof(cancel_msg));
719         cancel_msg.node_idx = dlm->node_num;
720         cancel_msg.name_len = strlen(dlm->name);
721         memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
722
723         status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
724                                     &cancel_msg, sizeof(cancel_msg), node,
725                                     NULL);
726         if (status < 0) {
727                 mlog_errno(status);
728                 goto bail;
729         }
730
731 bail:
732         return status;
733 }
734
735 /* map_size should be in bytes. */
736 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
737                                  unsigned long *node_map,
738                                  unsigned int map_size)
739 {
740         int status, tmpstat;
741         unsigned int node;
742
743         if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
744                          sizeof(unsigned long))) {
745                 mlog(ML_ERROR,
746                      "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
747                      map_size, BITS_TO_LONGS(O2NM_MAX_NODES));
748                 return -EINVAL;
749         }
750
751         status = 0;
752         node = -1;
753         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
754                                      node + 1)) < O2NM_MAX_NODES) {
755                 if (node == dlm->node_num)
756                         continue;
757
758                 tmpstat = dlm_send_one_join_cancel(dlm, node);
759                 if (tmpstat) {
760                         mlog(ML_ERROR, "Error return %d cancelling join on "
761                              "node %d\n", tmpstat, node);
762                         if (!status)
763                                 status = tmpstat;
764                 }
765         }
766
767         if (status)
768                 mlog_errno(status);
769         return status;
770 }
771
772 static int dlm_request_join(struct dlm_ctxt *dlm,
773                             int node,
774                             enum dlm_query_join_response *response)
775 {
776         int status, retval;
777         struct dlm_query_join_request join_msg;
778
779         mlog(0, "querying node %d\n", node);
780
781         memset(&join_msg, 0, sizeof(join_msg));
782         join_msg.node_idx = dlm->node_num;
783         join_msg.name_len = strlen(dlm->name);
784         memcpy(join_msg.domain, dlm->name, join_msg.name_len);
785
786         status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
787                                     sizeof(join_msg), node, &retval);
788         if (status < 0 && status != -ENOPROTOOPT) {
789                 mlog_errno(status);
790                 goto bail;
791         }
792
793         /* -ENOPROTOOPT from the net code means the other side isn't
794             listening for our message type -- that's fine, it means
795             his dlm isn't up, so we can consider him a 'yes' but not
796             joined into the domain.  */
797         if (status == -ENOPROTOOPT) {
798                 status = 0;
799                 *response = JOIN_OK_NO_MAP;
800         } else if (retval == JOIN_DISALLOW ||
801                    retval == JOIN_OK ||
802                    retval == JOIN_OK_NO_MAP) {
803                 *response = retval;
804         } else {
805                 status = -EINVAL;
806                 mlog(ML_ERROR, "invalid response %d from node %u\n", retval,
807                      node);
808         }
809
810         mlog(0, "status %d, node %d response is %d\n", status, node,
811                   *response);
812
813 bail:
814         return status;
815 }
816
817 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
818                                     unsigned int node)
819 {
820         int status;
821         struct dlm_assert_joined assert_msg;
822
823         mlog(0, "Sending join assert to node %u\n", node);
824
825         memset(&assert_msg, 0, sizeof(assert_msg));
826         assert_msg.node_idx = dlm->node_num;
827         assert_msg.name_len = strlen(dlm->name);
828         memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
829
830         status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
831                                     &assert_msg, sizeof(assert_msg), node,
832                                     NULL);
833         if (status < 0)
834                 mlog_errno(status);
835
836         return status;
837 }
838
839 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
840                                   unsigned long *node_map)
841 {
842         int status, node, live;
843
844         status = 0;
845         node = -1;
846         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
847                                      node + 1)) < O2NM_MAX_NODES) {
848                 if (node == dlm->node_num)
849                         continue;
850
851                 do {
852                         /* It is very important that this message be
853                          * received so we spin until either the node
854                          * has died or it gets the message. */
855                         status = dlm_send_one_join_assert(dlm, node);
856
857                         spin_lock(&dlm->spinlock);
858                         live = test_bit(node, dlm->live_nodes_map);
859                         spin_unlock(&dlm->spinlock);
860
861                         if (status) {
862                                 mlog(ML_ERROR, "Error return %d asserting "
863                                      "join on node %d\n", status, node);
864
865                                 /* give us some time between errors... */
866                                 if (live)
867                                         msleep(DLM_DOMAIN_BACKOFF_MS);
868                         }
869                 } while (status && live);
870         }
871 }
872
873 struct domain_join_ctxt {
874         unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
875         unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
876 };
877
878 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
879                                    struct domain_join_ctxt *ctxt,
880                                    enum dlm_query_join_response response)
881 {
882         int ret;
883
884         if (response == JOIN_DISALLOW) {
885                 mlog(0, "Latest response of disallow -- should restart\n");
886                 return 1;
887         }
888
889         spin_lock(&dlm->spinlock);
890         /* For now, we restart the process if the node maps have
891          * changed at all */
892         ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
893                      sizeof(dlm->live_nodes_map));
894         spin_unlock(&dlm->spinlock);
895
896         if (ret)
897                 mlog(0, "Node maps changed -- should restart\n");
898
899         return ret;
900 }
901
902 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
903 {
904         int status = 0, tmpstat, node;
905         struct domain_join_ctxt *ctxt;
906         enum dlm_query_join_response response;
907
908         mlog_entry("%p", dlm);
909
910         ctxt = kcalloc(1, sizeof(*ctxt), GFP_KERNEL);
911         if (!ctxt) {
912                 status = -ENOMEM;
913                 mlog_errno(status);
914                 goto bail;
915         }
916
917         /* group sem locking should work for us here -- we're already
918          * registered for heartbeat events so filling this should be
919          * atomic wrt getting those handlers called. */
920         o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
921
922         spin_lock(&dlm->spinlock);
923         memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
924
925         __dlm_set_joining_node(dlm, dlm->node_num);
926
927         spin_unlock(&dlm->spinlock);
928
929         node = -1;
930         while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
931                                      node + 1)) < O2NM_MAX_NODES) {
932                 if (node == dlm->node_num)
933                         continue;
934
935                 status = dlm_request_join(dlm, node, &response);
936                 if (status < 0) {
937                         mlog_errno(status);
938                         goto bail;
939                 }
940
941                 /* Ok, either we got a response or the node doesn't have a
942                  * dlm up. */
943                 if (response == JOIN_OK)
944                         set_bit(node, ctxt->yes_resp_map);
945
946                 if (dlm_should_restart_join(dlm, ctxt, response)) {
947                         status = -EAGAIN;
948                         goto bail;
949                 }
950         }
951
952         mlog(0, "Yay, done querying nodes!\n");
953
954         /* Yay, everyone agree's we can join the domain. My domain is
955          * comprised of all nodes who were put in the
956          * yes_resp_map. Copy that into our domain map and send a join
957          * assert message to clean up everyone elses state. */
958         spin_lock(&dlm->spinlock);
959         memcpy(dlm->domain_map, ctxt->yes_resp_map,
960                sizeof(ctxt->yes_resp_map));
961         set_bit(dlm->node_num, dlm->domain_map);
962         spin_unlock(&dlm->spinlock);
963
964         dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
965
966         /* Joined state *must* be set before the joining node
967          * information, otherwise the query_join handler may read no
968          * current joiner but a state of NEW and tell joining nodes
969          * we're not in the domain. */
970         spin_lock(&dlm_domain_lock);
971         dlm->dlm_state = DLM_CTXT_JOINED;
972         dlm->num_joins++;
973         spin_unlock(&dlm_domain_lock);
974
975 bail:
976         spin_lock(&dlm->spinlock);
977         __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
978         if (!status)
979                 __dlm_print_nodes(dlm);
980         spin_unlock(&dlm->spinlock);
981
982         if (ctxt) {
983                 /* Do we need to send a cancel message to any nodes? */
984                 if (status < 0) {
985                         tmpstat = dlm_send_join_cancels(dlm,
986                                                         ctxt->yes_resp_map,
987                                                         sizeof(ctxt->yes_resp_map));
988                         if (tmpstat < 0)
989                                 mlog_errno(tmpstat);
990                 }
991                 kfree(ctxt);
992         }
993
994         mlog(0, "returning %d\n", status);
995         return status;
996 }
997
998 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
999 {
1000         o2hb_unregister_callback(&dlm->dlm_hb_up);
1001         o2hb_unregister_callback(&dlm->dlm_hb_down);
1002         o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1003 }
1004
1005 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1006 {
1007         int status;
1008
1009         mlog(0, "registering handlers.\n");
1010
1011         o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1012                             dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1013         status = o2hb_register_callback(&dlm->dlm_hb_down);
1014         if (status)
1015                 goto bail;
1016
1017         o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1018                             dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1019         status = o2hb_register_callback(&dlm->dlm_hb_up);
1020         if (status)
1021                 goto bail;
1022
1023         status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1024                                         sizeof(struct dlm_master_request),
1025                                         dlm_master_request_handler,
1026                                         dlm, &dlm->dlm_domain_handlers);
1027         if (status)
1028                 goto bail;
1029
1030         status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1031                                         sizeof(struct dlm_assert_master),
1032                                         dlm_assert_master_handler,
1033                                         dlm, &dlm->dlm_domain_handlers);
1034         if (status)
1035                 goto bail;
1036
1037         status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1038                                         sizeof(struct dlm_create_lock),
1039                                         dlm_create_lock_handler,
1040                                         dlm, &dlm->dlm_domain_handlers);
1041         if (status)
1042                 goto bail;
1043
1044         status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1045                                         DLM_CONVERT_LOCK_MAX_LEN,
1046                                         dlm_convert_lock_handler,
1047                                         dlm, &dlm->dlm_domain_handlers);
1048         if (status)
1049                 goto bail;
1050
1051         status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1052                                         DLM_UNLOCK_LOCK_MAX_LEN,
1053                                         dlm_unlock_lock_handler,
1054                                         dlm, &dlm->dlm_domain_handlers);
1055         if (status)
1056                 goto bail;
1057
1058         status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1059                                         DLM_PROXY_AST_MAX_LEN,
1060                                         dlm_proxy_ast_handler,
1061                                         dlm, &dlm->dlm_domain_handlers);
1062         if (status)
1063                 goto bail;
1064
1065         status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1066                                         sizeof(struct dlm_exit_domain),
1067                                         dlm_exit_domain_handler,
1068                                         dlm, &dlm->dlm_domain_handlers);
1069         if (status)
1070                 goto bail;
1071
1072         status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1073                                         sizeof(struct dlm_migrate_request),
1074                                         dlm_migrate_request_handler,
1075                                         dlm, &dlm->dlm_domain_handlers);
1076         if (status)
1077                 goto bail;
1078
1079         status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1080                                         DLM_MIG_LOCKRES_MAX_LEN,
1081                                         dlm_mig_lockres_handler,
1082                                         dlm, &dlm->dlm_domain_handlers);
1083         if (status)
1084                 goto bail;
1085
1086         status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1087                                         sizeof(struct dlm_master_requery),
1088                                         dlm_master_requery_handler,
1089                                         dlm, &dlm->dlm_domain_handlers);
1090         if (status)
1091                 goto bail;
1092
1093         status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1094                                         sizeof(struct dlm_lock_request),
1095                                         dlm_request_all_locks_handler,
1096                                         dlm, &dlm->dlm_domain_handlers);
1097         if (status)
1098                 goto bail;
1099
1100         status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1101                                         sizeof(struct dlm_reco_data_done),
1102                                         dlm_reco_data_done_handler,
1103                                         dlm, &dlm->dlm_domain_handlers);
1104         if (status)
1105                 goto bail;
1106
1107         status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1108                                         sizeof(struct dlm_begin_reco),
1109                                         dlm_begin_reco_handler,
1110                                         dlm, &dlm->dlm_domain_handlers);
1111         if (status)
1112                 goto bail;
1113
1114         status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1115                                         sizeof(struct dlm_finalize_reco),
1116                                         dlm_finalize_reco_handler,
1117                                         dlm, &dlm->dlm_domain_handlers);
1118         if (status)
1119                 goto bail;
1120
1121 bail:
1122         if (status)
1123                 dlm_unregister_domain_handlers(dlm);
1124
1125         return status;
1126 }
1127
1128 static int dlm_join_domain(struct dlm_ctxt *dlm)
1129 {
1130         int status;
1131
1132         BUG_ON(!dlm);
1133
1134         mlog(0, "Join domain %s\n", dlm->name);
1135
1136         status = dlm_register_domain_handlers(dlm);
1137         if (status) {
1138                 mlog_errno(status);
1139                 goto bail;
1140         }
1141
1142         status = dlm_launch_thread(dlm);
1143         if (status < 0) {
1144                 mlog_errno(status);
1145                 goto bail;
1146         }
1147
1148         status = dlm_launch_recovery_thread(dlm);
1149         if (status < 0) {
1150                 mlog_errno(status);
1151                 goto bail;
1152         }
1153
1154         do {
1155                 unsigned int backoff;
1156                 status = dlm_try_to_join_domain(dlm);
1157
1158                 /* If we're racing another node to the join, then we
1159                  * need to back off temporarily and let them
1160                  * complete. */
1161                 if (status == -EAGAIN) {
1162                         if (signal_pending(current)) {
1163                                 status = -ERESTARTSYS;
1164                                 goto bail;
1165                         }
1166
1167                         /*
1168                          * <chip> After you!
1169                          * <dale> No, after you!
1170                          * <chip> I insist!
1171                          * <dale> But you first!
1172                          * ...
1173                          */
1174                         backoff = (unsigned int)(jiffies & 0x3);
1175                         backoff *= DLM_DOMAIN_BACKOFF_MS;
1176                         mlog(0, "backoff %d\n", backoff);
1177                         msleep(backoff);
1178                 }
1179         } while (status == -EAGAIN);
1180
1181         if (status < 0) {
1182                 mlog_errno(status);
1183                 goto bail;
1184         }
1185
1186         status = 0;
1187 bail:
1188         wake_up(&dlm_domain_events);
1189
1190         if (status) {
1191                 dlm_unregister_domain_handlers(dlm);
1192                 dlm_complete_thread(dlm);
1193                 dlm_complete_recovery_thread(dlm);
1194         }
1195
1196         return status;
1197 }
1198
1199 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1200                                 u32 key)
1201 {
1202         int i;
1203         struct dlm_ctxt *dlm = NULL;
1204
1205         dlm = kcalloc(1, sizeof(*dlm), GFP_KERNEL);
1206         if (!dlm) {
1207                 mlog_errno(-ENOMEM);
1208                 goto leave;
1209         }
1210
1211         dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
1212         if (dlm->name == NULL) {
1213                 mlog_errno(-ENOMEM);
1214                 kfree(dlm);
1215                 dlm = NULL;
1216                 goto leave;
1217         }
1218
1219         dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1220         if (!dlm->lockres_hash) {
1221                 mlog_errno(-ENOMEM);
1222                 kfree(dlm->name);
1223                 kfree(dlm);
1224                 dlm = NULL;
1225                 goto leave;
1226         }
1227
1228         for (i = 0; i < DLM_HASH_BUCKETS; i++)
1229                 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1230
1231         strcpy(dlm->name, domain);
1232         dlm->key = key;
1233         dlm->node_num = o2nm_this_node();
1234
1235         spin_lock_init(&dlm->spinlock);
1236         spin_lock_init(&dlm->master_lock);
1237         spin_lock_init(&dlm->ast_lock);
1238         INIT_LIST_HEAD(&dlm->list);
1239         INIT_LIST_HEAD(&dlm->dirty_list);
1240         INIT_LIST_HEAD(&dlm->reco.resources);
1241         INIT_LIST_HEAD(&dlm->reco.received);
1242         INIT_LIST_HEAD(&dlm->reco.node_data);
1243         INIT_LIST_HEAD(&dlm->purge_list);
1244         INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1245         dlm->reco.state = 0;
1246
1247         INIT_LIST_HEAD(&dlm->pending_asts);
1248         INIT_LIST_HEAD(&dlm->pending_basts);
1249
1250         mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1251                   dlm->recovery_map, &(dlm->recovery_map[0]));
1252
1253         memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1254         memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1255         memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1256
1257         dlm->dlm_thread_task = NULL;
1258         dlm->dlm_reco_thread_task = NULL;
1259         init_waitqueue_head(&dlm->dlm_thread_wq);
1260         init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1261         init_waitqueue_head(&dlm->reco.event);
1262         init_waitqueue_head(&dlm->ast_wq);
1263         init_waitqueue_head(&dlm->migration_wq);
1264         INIT_LIST_HEAD(&dlm->master_list);
1265         INIT_LIST_HEAD(&dlm->mle_hb_events);
1266
1267         dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1268         init_waitqueue_head(&dlm->dlm_join_events);
1269
1270         dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1271         dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1272         atomic_set(&dlm->local_resources, 0);
1273         atomic_set(&dlm->remote_resources, 0);
1274         atomic_set(&dlm->unknown_resources, 0);
1275
1276         spin_lock_init(&dlm->work_lock);
1277         INIT_LIST_HEAD(&dlm->work_list);
1278         INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm);
1279
1280         kref_init(&dlm->dlm_refs);
1281         dlm->dlm_state = DLM_CTXT_NEW;
1282
1283         INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1284
1285         mlog(0, "context init: refcount %u\n",
1286                   atomic_read(&dlm->dlm_refs.refcount));
1287
1288 leave:
1289         return dlm;
1290 }
1291
1292 /*
1293  * dlm_register_domain: one-time setup per "domain"
1294  */
1295 struct dlm_ctxt * dlm_register_domain(const char *domain,
1296                                u32 key)
1297 {
1298         int ret;
1299         struct dlm_ctxt *dlm = NULL;
1300         struct dlm_ctxt *new_ctxt = NULL;
1301
1302         if (strlen(domain) > O2NM_MAX_NAME_LEN) {
1303                 ret = -ENAMETOOLONG;
1304                 mlog(ML_ERROR, "domain name length too long\n");
1305                 goto leave;
1306         }
1307
1308         if (!o2hb_check_local_node_heartbeating()) {
1309                 mlog(ML_ERROR, "the local node has not been configured, or is "
1310                      "not heartbeating\n");
1311                 ret = -EPROTO;
1312                 goto leave;
1313         }
1314
1315         mlog(0, "register called for domain \"%s\"\n", domain);
1316
1317 retry:
1318         dlm = NULL;
1319         if (signal_pending(current)) {
1320                 ret = -ERESTARTSYS;
1321                 mlog_errno(ret);
1322                 goto leave;
1323         }
1324
1325         spin_lock(&dlm_domain_lock);
1326
1327         dlm = __dlm_lookup_domain(domain);
1328         if (dlm) {
1329                 if (dlm->dlm_state != DLM_CTXT_JOINED) {
1330                         spin_unlock(&dlm_domain_lock);
1331
1332                         mlog(0, "This ctxt is not joined yet!\n");
1333                         wait_event_interruptible(dlm_domain_events,
1334                                                  dlm_wait_on_domain_helper(
1335                                                          domain));
1336                         goto retry;
1337                 }
1338
1339                 __dlm_get(dlm);
1340                 dlm->num_joins++;
1341
1342                 spin_unlock(&dlm_domain_lock);
1343
1344                 ret = 0;
1345                 goto leave;
1346         }
1347
1348         /* doesn't exist */
1349         if (!new_ctxt) {
1350                 spin_unlock(&dlm_domain_lock);
1351
1352                 new_ctxt = dlm_alloc_ctxt(domain, key);
1353                 if (new_ctxt)
1354                         goto retry;
1355
1356                 ret = -ENOMEM;
1357                 mlog_errno(ret);
1358                 goto leave;
1359         }
1360
1361         /* a little variable switch-a-roo here... */
1362         dlm = new_ctxt;
1363         new_ctxt = NULL;
1364
1365         /* add the new domain */
1366         list_add_tail(&dlm->list, &dlm_domains);
1367         spin_unlock(&dlm_domain_lock);
1368
1369         ret = dlm_join_domain(dlm);
1370         if (ret) {
1371                 mlog_errno(ret);
1372                 dlm_put(dlm);
1373                 goto leave;
1374         }
1375
1376         ret = 0;
1377 leave:
1378         if (new_ctxt)
1379                 dlm_free_ctxt_mem(new_ctxt);
1380
1381         if (ret < 0)
1382                 dlm = ERR_PTR(ret);
1383
1384         return dlm;
1385 }
1386 EXPORT_SYMBOL_GPL(dlm_register_domain);
1387
1388 static LIST_HEAD(dlm_join_handlers);
1389
1390 static void dlm_unregister_net_handlers(void)
1391 {
1392         o2net_unregister_handler_list(&dlm_join_handlers);
1393 }
1394
1395 static int dlm_register_net_handlers(void)
1396 {
1397         int status = 0;
1398
1399         status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1400                                         sizeof(struct dlm_query_join_request),
1401                                         dlm_query_join_handler,
1402                                         NULL, &dlm_join_handlers);
1403         if (status)
1404                 goto bail;
1405
1406         status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1407                                         sizeof(struct dlm_assert_joined),
1408                                         dlm_assert_joined_handler,
1409                                         NULL, &dlm_join_handlers);
1410         if (status)
1411                 goto bail;
1412
1413         status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1414                                         sizeof(struct dlm_cancel_join),
1415                                         dlm_cancel_join_handler,
1416                                         NULL, &dlm_join_handlers);
1417
1418 bail:
1419         if (status < 0)
1420                 dlm_unregister_net_handlers();
1421
1422         return status;
1423 }
1424
1425 /* Domain eviction callback handling.
1426  *
1427  * The file system requires notification of node death *before* the
1428  * dlm completes it's recovery work, otherwise it may be able to
1429  * acquire locks on resources requiring recovery. Since the dlm can
1430  * evict a node from it's domain *before* heartbeat fires, a similar
1431  * mechanism is required. */
1432
1433 /* Eviction is not expected to happen often, so a per-domain lock is
1434  * not necessary. Eviction callbacks are allowed to sleep for short
1435  * periods of time. */
1436 static DECLARE_RWSEM(dlm_callback_sem);
1437
1438 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1439                                         int node_num)
1440 {
1441         struct list_head *iter;
1442         struct dlm_eviction_cb *cb;
1443
1444         down_read(&dlm_callback_sem);
1445         list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1446                 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1447
1448                 cb->ec_func(node_num, cb->ec_data);
1449         }
1450         up_read(&dlm_callback_sem);
1451 }
1452
1453 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1454                            dlm_eviction_func *f,
1455                            void *data)
1456 {
1457         INIT_LIST_HEAD(&cb->ec_item);
1458         cb->ec_func = f;
1459         cb->ec_data = data;
1460 }
1461 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1462
1463 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1464                               struct dlm_eviction_cb *cb)
1465 {
1466         down_write(&dlm_callback_sem);
1467         list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1468         up_write(&dlm_callback_sem);
1469 }
1470 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1471
1472 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1473 {
1474         down_write(&dlm_callback_sem);
1475         list_del_init(&cb->ec_item);
1476         up_write(&dlm_callback_sem);
1477 }
1478 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1479
1480 static int __init dlm_init(void)
1481 {
1482         int status;
1483
1484         dlm_print_version();
1485
1486         status = dlm_init_mle_cache();
1487         if (status)
1488                 return -1;
1489
1490         status = dlm_register_net_handlers();
1491         if (status) {
1492                 dlm_destroy_mle_cache();
1493                 return -1;
1494         }
1495
1496         return 0;
1497 }
1498
1499 static void __exit dlm_exit (void)
1500 {
1501         dlm_unregister_net_handlers();
1502         dlm_destroy_mle_cache();
1503 }
1504
1505 MODULE_AUTHOR("Oracle");
1506 MODULE_LICENSE("GPL");
1507
1508 module_init(dlm_init);
1509 module_exit(dlm_exit);