]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - fs/dlm/lock.c
7807958846c58691bc1160a374bcd6d2c92836f2
[linux-2.6-omap-h63xx.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86                                     struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88
89 /*
90  * Lock compatibilty matrix - thanks Steve
91  * UN = Unlocked state. Not really a state, used as a flag
92  * PD = Padding. Used to make the matrix a nice power of two in size
93  * Other states are the same as the VMS DLM.
94  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
95  */
96
97 static const int __dlm_compat_matrix[8][8] = {
98       /* UN NL CR CW PR PW EX PD */
99         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
101         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
102         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
103         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
104         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
105         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
106         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
107 };
108
109 /*
110  * This defines the direction of transfer of LVB data.
111  * Granted mode is the row; requested mode is the column.
112  * Usage: matrix[grmode+1][rqmode+1]
113  * 1 = LVB is returned to the caller
114  * 0 = LVB is written to the resource
115  * -1 = nothing happens to the LVB
116  */
117
118 const int dlm_lvb_operations[8][8] = {
119         /* UN   NL  CR  CW  PR  PW  EX  PD*/
120         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
121         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
122         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
123         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
124         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
125         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
126         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
128 };
129
130 #define modes_compat(gr, rq) \
131         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
132
133 int dlm_modes_compat(int mode1, int mode2)
134 {
135         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
136 }
137
138 /*
139  * Compatibility matrix for conversions with QUECVT set.
140  * Granted mode is the row; requested mode is the column.
141  * Usage: matrix[grmode+1][rqmode+1]
142  */
143
144 static const int __quecvt_compat_matrix[8][8] = {
145       /* UN NL CR CW PR PW EX PD */
146         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
147         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
148         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
149         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
150         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
151         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
153         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
154 };
155
156 void dlm_print_lkb(struct dlm_lkb *lkb)
157 {
158         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
159                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
160                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
161                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
162                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
163 }
164
165 void dlm_print_rsb(struct dlm_rsb *r)
166 {
167         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
168                r->res_nodeid, r->res_flags, r->res_first_lkid,
169                r->res_recover_locks_count, r->res_name);
170 }
171
172 void dlm_dump_rsb(struct dlm_rsb *r)
173 {
174         struct dlm_lkb *lkb;
175
176         dlm_print_rsb(r);
177
178         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
179                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
180         printk(KERN_ERR "rsb lookup list\n");
181         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
182                 dlm_print_lkb(lkb);
183         printk(KERN_ERR "rsb grant queue:\n");
184         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb convert queue:\n");
187         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb wait queue:\n");
190         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192 }
193
194 /* Threads cannot use the lockspace while it's being recovered */
195
196 static inline void lock_recovery(struct dlm_ls *ls)
197 {
198         down_read(&ls->ls_in_recovery);
199 }
200
201 static inline void unlock_recovery(struct dlm_ls *ls)
202 {
203         up_read(&ls->ls_in_recovery);
204 }
205
206 static inline int lock_recovery_try(struct dlm_ls *ls)
207 {
208         return down_read_trylock(&ls->ls_in_recovery);
209 }
210
211 static inline int can_be_queued(struct dlm_lkb *lkb)
212 {
213         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
214 }
215
216 static inline int force_blocking_asts(struct dlm_lkb *lkb)
217 {
218         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
219 }
220
221 static inline int is_demoted(struct dlm_lkb *lkb)
222 {
223         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
224 }
225
226 static inline int is_remote(struct dlm_rsb *r)
227 {
228         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
229         return !!r->res_nodeid;
230 }
231
232 static inline int is_process_copy(struct dlm_lkb *lkb)
233 {
234         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
235 }
236
237 static inline int is_master_copy(struct dlm_lkb *lkb)
238 {
239         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
240                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
241         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
242 }
243
244 static inline int middle_conversion(struct dlm_lkb *lkb)
245 {
246         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
247             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
248                 return 1;
249         return 0;
250 }
251
252 static inline int down_conversion(struct dlm_lkb *lkb)
253 {
254         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255 }
256
257 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
258 {
259         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
260 }
261
262 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
263 {
264         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
265 }
266
267 static inline int is_overlap(struct dlm_lkb *lkb)
268 {
269         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
270                                   DLM_IFL_OVERLAP_CANCEL));
271 }
272
273 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
274 {
275         if (is_master_copy(lkb))
276                 return;
277
278         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
279
280         lkb->lkb_lksb->sb_status = rv;
281         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
282
283         dlm_add_ast(lkb, AST_COMP);
284 }
285
286 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
287 {
288         queue_cast(r, lkb,
289                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
290 }
291
292 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
293 {
294         if (is_master_copy(lkb))
295                 send_bast(r, lkb, rqmode);
296         else {
297                 lkb->lkb_bastmode = rqmode;
298                 dlm_add_ast(lkb, AST_BAST);
299         }
300 }
301
302 /*
303  * Basic operations on rsb's and lkb's
304  */
305
306 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
307 {
308         struct dlm_rsb *r;
309
310         r = allocate_rsb(ls, len);
311         if (!r)
312                 return NULL;
313
314         r->res_ls = ls;
315         r->res_length = len;
316         memcpy(r->res_name, name, len);
317         mutex_init(&r->res_mutex);
318
319         INIT_LIST_HEAD(&r->res_lookup);
320         INIT_LIST_HEAD(&r->res_grantqueue);
321         INIT_LIST_HEAD(&r->res_convertqueue);
322         INIT_LIST_HEAD(&r->res_waitqueue);
323         INIT_LIST_HEAD(&r->res_root_list);
324         INIT_LIST_HEAD(&r->res_recover_list);
325
326         return r;
327 }
328
329 static int search_rsb_list(struct list_head *head, char *name, int len,
330                            unsigned int flags, struct dlm_rsb **r_ret)
331 {
332         struct dlm_rsb *r;
333         int error = 0;
334
335         list_for_each_entry(r, head, res_hashchain) {
336                 if (len == r->res_length && !memcmp(name, r->res_name, len))
337                         goto found;
338         }
339         return -EBADR;
340
341  found:
342         if (r->res_nodeid && (flags & R_MASTER))
343                 error = -ENOTBLK;
344         *r_ret = r;
345         return error;
346 }
347
348 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
349                        unsigned int flags, struct dlm_rsb **r_ret)
350 {
351         struct dlm_rsb *r;
352         int error;
353
354         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
355         if (!error) {
356                 kref_get(&r->res_ref);
357                 goto out;
358         }
359         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
360         if (error)
361                 goto out;
362
363         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
364
365         if (dlm_no_directory(ls))
366                 goto out;
367
368         if (r->res_nodeid == -1) {
369                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
370                 r->res_first_lkid = 0;
371         } else if (r->res_nodeid > 0) {
372                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
373                 r->res_first_lkid = 0;
374         } else {
375                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
376                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
377         }
378  out:
379         *r_ret = r;
380         return error;
381 }
382
383 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
384                       unsigned int flags, struct dlm_rsb **r_ret)
385 {
386         int error;
387         write_lock(&ls->ls_rsbtbl[b].lock);
388         error = _search_rsb(ls, name, len, b, flags, r_ret);
389         write_unlock(&ls->ls_rsbtbl[b].lock);
390         return error;
391 }
392
393 /*
394  * Find rsb in rsbtbl and potentially create/add one
395  *
396  * Delaying the release of rsb's has a similar benefit to applications keeping
397  * NL locks on an rsb, but without the guarantee that the cached master value
398  * will still be valid when the rsb is reused.  Apps aren't always smart enough
399  * to keep NL locks on an rsb that they may lock again shortly; this can lead
400  * to excessive master lookups and removals if we don't delay the release.
401  *
402  * Searching for an rsb means looking through both the normal list and toss
403  * list.  When found on the toss list the rsb is moved to the normal list with
404  * ref count of 1; when found on normal list the ref count is incremented.
405  */
406
407 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
408                     unsigned int flags, struct dlm_rsb **r_ret)
409 {
410         struct dlm_rsb *r, *tmp;
411         uint32_t hash, bucket;
412         int error = 0;
413
414         if (dlm_no_directory(ls))
415                 flags |= R_CREATE;
416
417         hash = jhash(name, namelen, 0);
418         bucket = hash & (ls->ls_rsbtbl_size - 1);
419
420         error = search_rsb(ls, name, namelen, bucket, flags, &r);
421         if (!error)
422                 goto out;
423
424         if (error == -EBADR && !(flags & R_CREATE))
425                 goto out;
426
427         /* the rsb was found but wasn't a master copy */
428         if (error == -ENOTBLK)
429                 goto out;
430
431         error = -ENOMEM;
432         r = create_rsb(ls, name, namelen);
433         if (!r)
434                 goto out;
435
436         r->res_hash = hash;
437         r->res_bucket = bucket;
438         r->res_nodeid = -1;
439         kref_init(&r->res_ref);
440
441         /* With no directory, the master can be set immediately */
442         if (dlm_no_directory(ls)) {
443                 int nodeid = dlm_dir_nodeid(r);
444                 if (nodeid == dlm_our_nodeid())
445                         nodeid = 0;
446                 r->res_nodeid = nodeid;
447         }
448
449         write_lock(&ls->ls_rsbtbl[bucket].lock);
450         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
451         if (!error) {
452                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
453                 free_rsb(r);
454                 r = tmp;
455                 goto out;
456         }
457         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
458         write_unlock(&ls->ls_rsbtbl[bucket].lock);
459         error = 0;
460  out:
461         *r_ret = r;
462         return error;
463 }
464
465 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
466                  unsigned int flags, struct dlm_rsb **r_ret)
467 {
468         return find_rsb(ls, name, namelen, flags, r_ret);
469 }
470
471 /* This is only called to add a reference when the code already holds
472    a valid reference to the rsb, so there's no need for locking. */
473
474 static inline void hold_rsb(struct dlm_rsb *r)
475 {
476         kref_get(&r->res_ref);
477 }
478
479 void dlm_hold_rsb(struct dlm_rsb *r)
480 {
481         hold_rsb(r);
482 }
483
484 static void toss_rsb(struct kref *kref)
485 {
486         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
487         struct dlm_ls *ls = r->res_ls;
488
489         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
490         kref_init(&r->res_ref);
491         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
492         r->res_toss_time = jiffies;
493         if (r->res_lvbptr) {
494                 free_lvb(r->res_lvbptr);
495                 r->res_lvbptr = NULL;
496         }
497 }
498
499 /* When all references to the rsb are gone it's transfered to
500    the tossed list for later disposal. */
501
502 static void put_rsb(struct dlm_rsb *r)
503 {
504         struct dlm_ls *ls = r->res_ls;
505         uint32_t bucket = r->res_bucket;
506
507         write_lock(&ls->ls_rsbtbl[bucket].lock);
508         kref_put(&r->res_ref, toss_rsb);
509         write_unlock(&ls->ls_rsbtbl[bucket].lock);
510 }
511
512 void dlm_put_rsb(struct dlm_rsb *r)
513 {
514         put_rsb(r);
515 }
516
517 /* See comment for unhold_lkb */
518
519 static void unhold_rsb(struct dlm_rsb *r)
520 {
521         int rv;
522         rv = kref_put(&r->res_ref, toss_rsb);
523         DLM_ASSERT(!rv, dlm_dump_rsb(r););
524 }
525
526 static void kill_rsb(struct kref *kref)
527 {
528         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
529
530         /* All work is done after the return from kref_put() so we
531            can release the write_lock before the remove and free. */
532
533         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
534         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
535         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
536         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
537         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
538         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
539 }
540
541 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
542    The rsb must exist as long as any lkb's for it do. */
543
544 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
545 {
546         hold_rsb(r);
547         lkb->lkb_resource = r;
548 }
549
550 static void detach_lkb(struct dlm_lkb *lkb)
551 {
552         if (lkb->lkb_resource) {
553                 put_rsb(lkb->lkb_resource);
554                 lkb->lkb_resource = NULL;
555         }
556 }
557
558 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
559 {
560         struct dlm_lkb *lkb, *tmp;
561         uint32_t lkid = 0;
562         uint16_t bucket;
563
564         lkb = allocate_lkb(ls);
565         if (!lkb)
566                 return -ENOMEM;
567
568         lkb->lkb_nodeid = -1;
569         lkb->lkb_grmode = DLM_LOCK_IV;
570         kref_init(&lkb->lkb_ref);
571         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
572         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
573
574         get_random_bytes(&bucket, sizeof(bucket));
575         bucket &= (ls->ls_lkbtbl_size - 1);
576
577         write_lock(&ls->ls_lkbtbl[bucket].lock);
578
579         /* counter can roll over so we must verify lkid is not in use */
580
581         while (lkid == 0) {
582                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
583
584                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
585                                     lkb_idtbl_list) {
586                         if (tmp->lkb_id != lkid)
587                                 continue;
588                         lkid = 0;
589                         break;
590                 }
591         }
592
593         lkb->lkb_id = lkid;
594         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
595         write_unlock(&ls->ls_lkbtbl[bucket].lock);
596
597         *lkb_ret = lkb;
598         return 0;
599 }
600
601 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
602 {
603         uint16_t bucket = lkid & 0xFFFF;
604         struct dlm_lkb *lkb;
605
606         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
607                 if (lkb->lkb_id == lkid)
608                         return lkb;
609         }
610         return NULL;
611 }
612
613 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
614 {
615         struct dlm_lkb *lkb;
616         uint16_t bucket = lkid & 0xFFFF;
617
618         if (bucket >= ls->ls_lkbtbl_size)
619                 return -EBADSLT;
620
621         read_lock(&ls->ls_lkbtbl[bucket].lock);
622         lkb = __find_lkb(ls, lkid);
623         if (lkb)
624                 kref_get(&lkb->lkb_ref);
625         read_unlock(&ls->ls_lkbtbl[bucket].lock);
626
627         *lkb_ret = lkb;
628         return lkb ? 0 : -ENOENT;
629 }
630
631 static void kill_lkb(struct kref *kref)
632 {
633         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
634
635         /* All work is done after the return from kref_put() so we
636            can release the write_lock before the detach_lkb */
637
638         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
639 }
640
641 /* __put_lkb() is used when an lkb may not have an rsb attached to
642    it so we need to provide the lockspace explicitly */
643
644 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
645 {
646         uint16_t bucket = lkb->lkb_id & 0xFFFF;
647
648         write_lock(&ls->ls_lkbtbl[bucket].lock);
649         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
650                 list_del(&lkb->lkb_idtbl_list);
651                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
652
653                 detach_lkb(lkb);
654
655                 /* for local/process lkbs, lvbptr points to caller's lksb */
656                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
657                         free_lvb(lkb->lkb_lvbptr);
658                 free_lkb(lkb);
659                 return 1;
660         } else {
661                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
662                 return 0;
663         }
664 }
665
666 int dlm_put_lkb(struct dlm_lkb *lkb)
667 {
668         struct dlm_ls *ls;
669
670         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
671         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
672
673         ls = lkb->lkb_resource->res_ls;
674         return __put_lkb(ls, lkb);
675 }
676
677 /* This is only called to add a reference when the code already holds
678    a valid reference to the lkb, so there's no need for locking. */
679
680 static inline void hold_lkb(struct dlm_lkb *lkb)
681 {
682         kref_get(&lkb->lkb_ref);
683 }
684
685 /* This is called when we need to remove a reference and are certain
686    it's not the last ref.  e.g. del_lkb is always called between a
687    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
688    put_lkb would work fine, but would involve unnecessary locking */
689
690 static inline void unhold_lkb(struct dlm_lkb *lkb)
691 {
692         int rv;
693         rv = kref_put(&lkb->lkb_ref, kill_lkb);
694         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
695 }
696
697 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
698                             int mode)
699 {
700         struct dlm_lkb *lkb = NULL;
701
702         list_for_each_entry(lkb, head, lkb_statequeue)
703                 if (lkb->lkb_rqmode < mode)
704                         break;
705
706         if (!lkb)
707                 list_add_tail(new, head);
708         else
709                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
710 }
711
712 /* add/remove lkb to rsb's grant/convert/wait queue */
713
714 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
715 {
716         kref_get(&lkb->lkb_ref);
717
718         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
719
720         lkb->lkb_status = status;
721
722         switch (status) {
723         case DLM_LKSTS_WAITING:
724                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
725                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
726                 else
727                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
728                 break;
729         case DLM_LKSTS_GRANTED:
730                 /* convention says granted locks kept in order of grmode */
731                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
732                                 lkb->lkb_grmode);
733                 break;
734         case DLM_LKSTS_CONVERT:
735                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
736                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
737                 else
738                         list_add_tail(&lkb->lkb_statequeue,
739                                       &r->res_convertqueue);
740                 break;
741         default:
742                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
743         }
744 }
745
746 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
747 {
748         lkb->lkb_status = 0;
749         list_del(&lkb->lkb_statequeue);
750         unhold_lkb(lkb);
751 }
752
753 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
754 {
755         hold_lkb(lkb);
756         del_lkb(r, lkb);
757         add_lkb(r, lkb, sts);
758         unhold_lkb(lkb);
759 }
760
761 static int msg_reply_type(int mstype)
762 {
763         switch (mstype) {
764         case DLM_MSG_REQUEST:
765                 return DLM_MSG_REQUEST_REPLY;
766         case DLM_MSG_CONVERT:
767                 return DLM_MSG_CONVERT_REPLY;
768         case DLM_MSG_UNLOCK:
769                 return DLM_MSG_UNLOCK_REPLY;
770         case DLM_MSG_CANCEL:
771                 return DLM_MSG_CANCEL_REPLY;
772         case DLM_MSG_LOOKUP:
773                 return DLM_MSG_LOOKUP_REPLY;
774         }
775         return -1;
776 }
777
778 /* add/remove lkb from global waiters list of lkb's waiting for
779    a reply from a remote node */
780
781 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
782 {
783         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
784         int error = 0;
785
786         mutex_lock(&ls->ls_waiters_mutex);
787
788         if (is_overlap_unlock(lkb) ||
789             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
790                 error = -EINVAL;
791                 goto out;
792         }
793
794         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
795                 switch (mstype) {
796                 case DLM_MSG_UNLOCK:
797                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
798                         break;
799                 case DLM_MSG_CANCEL:
800                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
801                         break;
802                 default:
803                         error = -EBUSY;
804                         goto out;
805                 }
806                 lkb->lkb_wait_count++;
807                 hold_lkb(lkb);
808
809                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
810                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
811                           lkb->lkb_wait_count, lkb->lkb_flags);
812                 goto out;
813         }
814
815         DLM_ASSERT(!lkb->lkb_wait_count,
816                    dlm_print_lkb(lkb);
817                    printk("wait_count %d\n", lkb->lkb_wait_count););
818
819         lkb->lkb_wait_count++;
820         lkb->lkb_wait_type = mstype;
821         hold_lkb(lkb);
822         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
823  out:
824         if (error)
825                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
826                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
827                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
828         mutex_unlock(&ls->ls_waiters_mutex);
829         return error;
830 }
831
832 /* We clear the RESEND flag because we might be taking an lkb off the waiters
833    list as part of process_requestqueue (e.g. a lookup that has an optimized
834    request reply on the requestqueue) between dlm_recover_waiters_pre() which
835    set RESEND and dlm_recover_waiters_post() */
836
837 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
838 {
839         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
840         int overlap_done = 0;
841
842         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
843                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
844                 overlap_done = 1;
845                 goto out_del;
846         }
847
848         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
849                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
850                 overlap_done = 1;
851                 goto out_del;
852         }
853
854         /* N.B. type of reply may not always correspond to type of original
855            msg due to lookup->request optimization, verify others? */
856
857         if (lkb->lkb_wait_type) {
858                 lkb->lkb_wait_type = 0;
859                 goto out_del;
860         }
861
862         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
863                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
864         return -1;
865
866  out_del:
867         /* the force-unlock/cancel has completed and we haven't recvd a reply
868            to the op that was in progress prior to the unlock/cancel; we
869            give up on any reply to the earlier op.  FIXME: not sure when/how
870            this would happen */
871
872         if (overlap_done && lkb->lkb_wait_type) {
873                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
874                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
875                 lkb->lkb_wait_count--;
876                 lkb->lkb_wait_type = 0;
877         }
878
879         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
880
881         lkb->lkb_flags &= ~DLM_IFL_RESEND;
882         lkb->lkb_wait_count--;
883         if (!lkb->lkb_wait_count)
884                 list_del_init(&lkb->lkb_wait_reply);
885         unhold_lkb(lkb);
886         return 0;
887 }
888
889 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
890 {
891         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
892         int error;
893
894         mutex_lock(&ls->ls_waiters_mutex);
895         error = _remove_from_waiters(lkb, mstype);
896         mutex_unlock(&ls->ls_waiters_mutex);
897         return error;
898 }
899
900 /* Handles situations where we might be processing a "fake" or "stub" reply in
901    which we can't try to take waiters_mutex again. */
902
903 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
904 {
905         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
906         int error;
907
908         if (ms != &ls->ls_stub_ms)
909                 mutex_lock(&ls->ls_waiters_mutex);
910         error = _remove_from_waiters(lkb, ms->m_type);
911         if (ms != &ls->ls_stub_ms)
912                 mutex_unlock(&ls->ls_waiters_mutex);
913         return error;
914 }
915
916 static void dir_remove(struct dlm_rsb *r)
917 {
918         int to_nodeid;
919
920         if (dlm_no_directory(r->res_ls))
921                 return;
922
923         to_nodeid = dlm_dir_nodeid(r);
924         if (to_nodeid != dlm_our_nodeid())
925                 send_remove(r);
926         else
927                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
928                                      r->res_name, r->res_length);
929 }
930
931 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
932    found since they are in order of newest to oldest? */
933
934 static int shrink_bucket(struct dlm_ls *ls, int b)
935 {
936         struct dlm_rsb *r;
937         int count = 0, found;
938
939         for (;;) {
940                 found = 0;
941                 write_lock(&ls->ls_rsbtbl[b].lock);
942                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
943                                             res_hashchain) {
944                         if (!time_after_eq(jiffies, r->res_toss_time +
945                                            dlm_config.ci_toss_secs * HZ))
946                                 continue;
947                         found = 1;
948                         break;
949                 }
950
951                 if (!found) {
952                         write_unlock(&ls->ls_rsbtbl[b].lock);
953                         break;
954                 }
955
956                 if (kref_put(&r->res_ref, kill_rsb)) {
957                         list_del(&r->res_hashchain);
958                         write_unlock(&ls->ls_rsbtbl[b].lock);
959
960                         if (is_master(r))
961                                 dir_remove(r);
962                         free_rsb(r);
963                         count++;
964                 } else {
965                         write_unlock(&ls->ls_rsbtbl[b].lock);
966                         log_error(ls, "tossed rsb in use %s", r->res_name);
967                 }
968         }
969
970         return count;
971 }
972
973 void dlm_scan_rsbs(struct dlm_ls *ls)
974 {
975         int i;
976
977         if (dlm_locking_stopped(ls))
978                 return;
979
980         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
981                 shrink_bucket(ls, i);
982                 cond_resched();
983         }
984 }
985
986 /* lkb is master or local copy */
987
988 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
989 {
990         int b, len = r->res_ls->ls_lvblen;
991
992         /* b=1 lvb returned to caller
993            b=0 lvb written to rsb or invalidated
994            b=-1 do nothing */
995
996         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
997
998         if (b == 1) {
999                 if (!lkb->lkb_lvbptr)
1000                         return;
1001
1002                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1003                         return;
1004
1005                 if (!r->res_lvbptr)
1006                         return;
1007
1008                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1009                 lkb->lkb_lvbseq = r->res_lvbseq;
1010
1011         } else if (b == 0) {
1012                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1013                         rsb_set_flag(r, RSB_VALNOTVALID);
1014                         return;
1015                 }
1016
1017                 if (!lkb->lkb_lvbptr)
1018                         return;
1019
1020                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1021                         return;
1022
1023                 if (!r->res_lvbptr)
1024                         r->res_lvbptr = allocate_lvb(r->res_ls);
1025
1026                 if (!r->res_lvbptr)
1027                         return;
1028
1029                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1030                 r->res_lvbseq++;
1031                 lkb->lkb_lvbseq = r->res_lvbseq;
1032                 rsb_clear_flag(r, RSB_VALNOTVALID);
1033         }
1034
1035         if (rsb_flag(r, RSB_VALNOTVALID))
1036                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1037 }
1038
1039 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1040 {
1041         if (lkb->lkb_grmode < DLM_LOCK_PW)
1042                 return;
1043
1044         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1045                 rsb_set_flag(r, RSB_VALNOTVALID);
1046                 return;
1047         }
1048
1049         if (!lkb->lkb_lvbptr)
1050                 return;
1051
1052         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1053                 return;
1054
1055         if (!r->res_lvbptr)
1056                 r->res_lvbptr = allocate_lvb(r->res_ls);
1057
1058         if (!r->res_lvbptr)
1059                 return;
1060
1061         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1062         r->res_lvbseq++;
1063         rsb_clear_flag(r, RSB_VALNOTVALID);
1064 }
1065
1066 /* lkb is process copy (pc) */
1067
1068 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1069                             struct dlm_message *ms)
1070 {
1071         int b;
1072
1073         if (!lkb->lkb_lvbptr)
1074                 return;
1075
1076         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1077                 return;
1078
1079         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1080         if (b == 1) {
1081                 int len = receive_extralen(ms);
1082                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1083                 lkb->lkb_lvbseq = ms->m_lvbseq;
1084         }
1085 }
1086
1087 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1088    remove_lock -- used for unlock, removes lkb from granted
1089    revert_lock -- used for cancel, moves lkb from convert to granted
1090    grant_lock  -- used for request and convert, adds lkb to granted or
1091                   moves lkb from convert or waiting to granted
1092
1093    Each of these is used for master or local copy lkb's.  There is
1094    also a _pc() variation used to make the corresponding change on
1095    a process copy (pc) lkb. */
1096
1097 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1098 {
1099         del_lkb(r, lkb);
1100         lkb->lkb_grmode = DLM_LOCK_IV;
1101         /* this unhold undoes the original ref from create_lkb()
1102            so this leads to the lkb being freed */
1103         unhold_lkb(lkb);
1104 }
1105
1106 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1107 {
1108         set_lvb_unlock(r, lkb);
1109         _remove_lock(r, lkb);
1110 }
1111
1112 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1113 {
1114         _remove_lock(r, lkb);
1115 }
1116
1117 /* returns: 0 did nothing
1118             1 moved lock to granted
1119            -1 removed lock */
1120
1121 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1122 {
1123         int rv = 0;
1124
1125         lkb->lkb_rqmode = DLM_LOCK_IV;
1126
1127         switch (lkb->lkb_status) {
1128         case DLM_LKSTS_GRANTED:
1129                 break;
1130         case DLM_LKSTS_CONVERT:
1131                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1132                 rv = 1;
1133                 break;
1134         case DLM_LKSTS_WAITING:
1135                 del_lkb(r, lkb);
1136                 lkb->lkb_grmode = DLM_LOCK_IV;
1137                 /* this unhold undoes the original ref from create_lkb()
1138                    so this leads to the lkb being freed */
1139                 unhold_lkb(lkb);
1140                 rv = -1;
1141                 break;
1142         default:
1143                 log_print("invalid status for revert %d", lkb->lkb_status);
1144         }
1145         return rv;
1146 }
1147
1148 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1149 {
1150         return revert_lock(r, lkb);
1151 }
1152
1153 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1154 {
1155         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1156                 lkb->lkb_grmode = lkb->lkb_rqmode;
1157                 if (lkb->lkb_status)
1158                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1159                 else
1160                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1161         }
1162
1163         lkb->lkb_rqmode = DLM_LOCK_IV;
1164 }
1165
1166 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1167 {
1168         set_lvb_lock(r, lkb);
1169         _grant_lock(r, lkb);
1170         lkb->lkb_highbast = 0;
1171 }
1172
1173 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1174                           struct dlm_message *ms)
1175 {
1176         set_lvb_lock_pc(r, lkb, ms);
1177         _grant_lock(r, lkb);
1178 }
1179
1180 /* called by grant_pending_locks() which means an async grant message must
1181    be sent to the requesting node in addition to granting the lock if the
1182    lkb belongs to a remote node. */
1183
1184 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1185 {
1186         grant_lock(r, lkb);
1187         if (is_master_copy(lkb))
1188                 send_grant(r, lkb);
1189         else
1190                 queue_cast(r, lkb, 0);
1191 }
1192
1193 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1194 {
1195         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1196                                            lkb_statequeue);
1197         if (lkb->lkb_id == first->lkb_id)
1198                 return 1;
1199
1200         return 0;
1201 }
1202
1203 /* Check if the given lkb conflicts with another lkb on the queue. */
1204
1205 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1206 {
1207         struct dlm_lkb *this;
1208
1209         list_for_each_entry(this, head, lkb_statequeue) {
1210                 if (this == lkb)
1211                         continue;
1212                 if (!modes_compat(this, lkb))
1213                         return 1;
1214         }
1215         return 0;
1216 }
1217
1218 /*
1219  * "A conversion deadlock arises with a pair of lock requests in the converting
1220  * queue for one resource.  The granted mode of each lock blocks the requested
1221  * mode of the other lock."
1222  *
1223  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1224  * convert queue from being granted, then demote lkb (set grmode to NL).
1225  * This second form requires that we check for conv-deadlk even when
1226  * now == 0 in _can_be_granted().
1227  *
1228  * Example:
1229  * Granted Queue: empty
1230  * Convert Queue: NL->EX (first lock)
1231  *                PR->EX (second lock)
1232  *
1233  * The first lock can't be granted because of the granted mode of the second
1234  * lock and the second lock can't be granted because it's not first in the
1235  * list.  We demote the granted mode of the second lock (the lkb passed to this
1236  * function).
1237  *
1238  * After the resolution, the "grant pending" function needs to go back and try
1239  * to grant locks on the convert queue again since the first lock can now be
1240  * granted.
1241  */
1242
1243 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1244 {
1245         struct dlm_lkb *this, *first = NULL, *self = NULL;
1246
1247         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1248                 if (!first)
1249                         first = this;
1250                 if (this == lkb) {
1251                         self = lkb;
1252                         continue;
1253                 }
1254
1255                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1256                         return 1;
1257         }
1258
1259         /* if lkb is on the convert queue and is preventing the first
1260            from being granted, then there's deadlock and we demote lkb.
1261            multiple converting locks may need to do this before the first
1262            converting lock can be granted. */
1263
1264         if (self && self != first) {
1265                 if (!modes_compat(lkb, first) &&
1266                     !queue_conflict(&rsb->res_grantqueue, first))
1267                         return 1;
1268         }
1269
1270         return 0;
1271 }
1272
1273 /*
1274  * Return 1 if the lock can be granted, 0 otherwise.
1275  * Also detect and resolve conversion deadlocks.
1276  *
1277  * lkb is the lock to be granted
1278  *
1279  * now is 1 if the function is being called in the context of the
1280  * immediate request, it is 0 if called later, after the lock has been
1281  * queued.
1282  *
1283  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1284  */
1285
1286 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1287 {
1288         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1289
1290         /*
1291          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1292          * a new request for a NL mode lock being blocked.
1293          *
1294          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1295          * request, then it would be granted.  In essence, the use of this flag
1296          * tells the Lock Manager to expedite theis request by not considering
1297          * what may be in the CONVERTING or WAITING queues...  As of this
1298          * writing, the EXPEDITE flag can be used only with new requests for NL
1299          * mode locks.  This flag is not valid for conversion requests.
1300          *
1301          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1302          * conversion or used with a non-NL requested mode.  We also know an
1303          * EXPEDITE request is always granted immediately, so now must always
1304          * be 1.  The full condition to grant an expedite request: (now &&
1305          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1306          * therefore be shortened to just checking the flag.
1307          */
1308
1309         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1310                 return 1;
1311
1312         /*
1313          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1314          * added to the remaining conditions.
1315          */
1316
1317         if (queue_conflict(&r->res_grantqueue, lkb))
1318                 goto out;
1319
1320         /*
1321          * 6-3: By default, a conversion request is immediately granted if the
1322          * requested mode is compatible with the modes of all other granted
1323          * locks
1324          */
1325
1326         if (queue_conflict(&r->res_convertqueue, lkb))
1327                 goto out;
1328
1329         /*
1330          * 6-5: But the default algorithm for deciding whether to grant or
1331          * queue conversion requests does not by itself guarantee that such
1332          * requests are serviced on a "first come first serve" basis.  This, in
1333          * turn, can lead to a phenomenon known as "indefinate postponement".
1334          *
1335          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1336          * the system service employed to request a lock conversion.  This flag
1337          * forces certain conversion requests to be queued, even if they are
1338          * compatible with the granted modes of other locks on the same
1339          * resource.  Thus, the use of this flag results in conversion requests
1340          * being ordered on a "first come first servce" basis.
1341          *
1342          * DCT: This condition is all about new conversions being able to occur
1343          * "in place" while the lock remains on the granted queue (assuming
1344          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1345          * doesn't _have_ to go onto the convert queue where it's processed in
1346          * order.  The "now" variable is necessary to distinguish converts
1347          * being received and processed for the first time now, because once a
1348          * convert is moved to the conversion queue the condition below applies
1349          * requiring fifo granting.
1350          */
1351
1352         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1353                 return 1;
1354
1355         /*
1356          * The NOORDER flag is set to avoid the standard vms rules on grant
1357          * order.
1358          */
1359
1360         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1361                 return 1;
1362
1363         /*
1364          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1365          * granted until all other conversion requests ahead of it are granted
1366          * and/or canceled.
1367          */
1368
1369         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1370                 return 1;
1371
1372         /*
1373          * 6-4: By default, a new request is immediately granted only if all
1374          * three of the following conditions are satisfied when the request is
1375          * issued:
1376          * - The queue of ungranted conversion requests for the resource is
1377          *   empty.
1378          * - The queue of ungranted new requests for the resource is empty.
1379          * - The mode of the new request is compatible with the most
1380          *   restrictive mode of all granted locks on the resource.
1381          */
1382
1383         if (now && !conv && list_empty(&r->res_convertqueue) &&
1384             list_empty(&r->res_waitqueue))
1385                 return 1;
1386
1387         /*
1388          * 6-4: Once a lock request is in the queue of ungranted new requests,
1389          * it cannot be granted until the queue of ungranted conversion
1390          * requests is empty, all ungranted new requests ahead of it are
1391          * granted and/or canceled, and it is compatible with the granted mode
1392          * of the most restrictive lock granted on the resource.
1393          */
1394
1395         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1396             first_in_list(lkb, &r->res_waitqueue))
1397                 return 1;
1398
1399  out:
1400         /*
1401          * The following, enabled by CONVDEADLK, departs from VMS.
1402          */
1403
1404         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1405             conversion_deadlock_detect(r, lkb)) {
1406                 lkb->lkb_grmode = DLM_LOCK_NL;
1407                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1408         }
1409
1410         return 0;
1411 }
1412
1413 /*
1414  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1415  * simple way to provide a big optimization to applications that can use them.
1416  */
1417
1418 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1419 {
1420         uint32_t flags = lkb->lkb_exflags;
1421         int rv;
1422         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1423
1424         rv = _can_be_granted(r, lkb, now);
1425         if (rv)
1426                 goto out;
1427
1428         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1429                 goto out;
1430
1431         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1432                 alt = DLM_LOCK_PR;
1433         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1434                 alt = DLM_LOCK_CW;
1435
1436         if (alt) {
1437                 lkb->lkb_rqmode = alt;
1438                 rv = _can_be_granted(r, lkb, now);
1439                 if (rv)
1440                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1441                 else
1442                         lkb->lkb_rqmode = rqmode;
1443         }
1444  out:
1445         return rv;
1446 }
1447
1448 static int grant_pending_convert(struct dlm_rsb *r, int high)
1449 {
1450         struct dlm_lkb *lkb, *s;
1451         int hi, demoted, quit, grant_restart, demote_restart;
1452
1453         quit = 0;
1454  restart:
1455         grant_restart = 0;
1456         demote_restart = 0;
1457         hi = DLM_LOCK_IV;
1458
1459         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1460                 demoted = is_demoted(lkb);
1461                 if (can_be_granted(r, lkb, 0)) {
1462                         grant_lock_pending(r, lkb);
1463                         grant_restart = 1;
1464                 } else {
1465                         hi = max_t(int, lkb->lkb_rqmode, hi);
1466                         if (!demoted && is_demoted(lkb))
1467                                 demote_restart = 1;
1468                 }
1469         }
1470
1471         if (grant_restart)
1472                 goto restart;
1473         if (demote_restart && !quit) {
1474                 quit = 1;
1475                 goto restart;
1476         }
1477
1478         return max_t(int, high, hi);
1479 }
1480
1481 static int grant_pending_wait(struct dlm_rsb *r, int high)
1482 {
1483         struct dlm_lkb *lkb, *s;
1484
1485         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1486                 if (can_be_granted(r, lkb, 0))
1487                         grant_lock_pending(r, lkb);
1488                 else
1489                         high = max_t(int, lkb->lkb_rqmode, high);
1490         }
1491
1492         return high;
1493 }
1494
1495 static void grant_pending_locks(struct dlm_rsb *r)
1496 {
1497         struct dlm_lkb *lkb, *s;
1498         int high = DLM_LOCK_IV;
1499
1500         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1501
1502         high = grant_pending_convert(r, high);
1503         high = grant_pending_wait(r, high);
1504
1505         if (high == DLM_LOCK_IV)
1506                 return;
1507
1508         /*
1509          * If there are locks left on the wait/convert queue then send blocking
1510          * ASTs to granted locks based on the largest requested mode (high)
1511          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1512          */
1513
1514         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1515                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1516                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1517                         queue_bast(r, lkb, high);
1518                         lkb->lkb_highbast = high;
1519                 }
1520         }
1521 }
1522
1523 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1524                             struct dlm_lkb *lkb)
1525 {
1526         struct dlm_lkb *gr;
1527
1528         list_for_each_entry(gr, head, lkb_statequeue) {
1529                 if (gr->lkb_bastaddr &&
1530                     gr->lkb_highbast < lkb->lkb_rqmode &&
1531                     !modes_compat(gr, lkb)) {
1532                         queue_bast(r, gr, lkb->lkb_rqmode);
1533                         gr->lkb_highbast = lkb->lkb_rqmode;
1534                 }
1535         }
1536 }
1537
1538 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1539 {
1540         send_bast_queue(r, &r->res_grantqueue, lkb);
1541 }
1542
1543 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1544 {
1545         send_bast_queue(r, &r->res_grantqueue, lkb);
1546         send_bast_queue(r, &r->res_convertqueue, lkb);
1547 }
1548
1549 /* set_master(r, lkb) -- set the master nodeid of a resource
1550
1551    The purpose of this function is to set the nodeid field in the given
1552    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1553    known, it can just be copied to the lkb and the function will return
1554    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1555    before it can be copied to the lkb.
1556
1557    When the rsb nodeid is being looked up remotely, the initial lkb
1558    causing the lookup is kept on the ls_waiters list waiting for the
1559    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1560    on the rsb's res_lookup list until the master is verified.
1561
1562    Return values:
1563    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1564    1: the rsb master is not available and the lkb has been placed on
1565       a wait queue
1566 */
1567
1568 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1569 {
1570         struct dlm_ls *ls = r->res_ls;
1571         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1572
1573         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1574                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1575                 r->res_first_lkid = lkb->lkb_id;
1576                 lkb->lkb_nodeid = r->res_nodeid;
1577                 return 0;
1578         }
1579
1580         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1581                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1582                 return 1;
1583         }
1584
1585         if (r->res_nodeid == 0) {
1586                 lkb->lkb_nodeid = 0;
1587                 return 0;
1588         }
1589
1590         if (r->res_nodeid > 0) {
1591                 lkb->lkb_nodeid = r->res_nodeid;
1592                 return 0;
1593         }
1594
1595         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1596
1597         dir_nodeid = dlm_dir_nodeid(r);
1598
1599         if (dir_nodeid != our_nodeid) {
1600                 r->res_first_lkid = lkb->lkb_id;
1601                 send_lookup(r, lkb);
1602                 return 1;
1603         }
1604
1605         for (;;) {
1606                 /* It's possible for dlm_scand to remove an old rsb for
1607                    this same resource from the toss list, us to create
1608                    a new one, look up the master locally, and find it
1609                    already exists just before dlm_scand does the
1610                    dir_remove() on the previous rsb. */
1611
1612                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1613                                        r->res_length, &ret_nodeid);
1614                 if (!error)
1615                         break;
1616                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1617                 schedule();
1618         }
1619
1620         if (ret_nodeid == our_nodeid) {
1621                 r->res_first_lkid = 0;
1622                 r->res_nodeid = 0;
1623                 lkb->lkb_nodeid = 0;
1624         } else {
1625                 r->res_first_lkid = lkb->lkb_id;
1626                 r->res_nodeid = ret_nodeid;
1627                 lkb->lkb_nodeid = ret_nodeid;
1628         }
1629         return 0;
1630 }
1631
1632 static void process_lookup_list(struct dlm_rsb *r)
1633 {
1634         struct dlm_lkb *lkb, *safe;
1635
1636         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1637                 list_del_init(&lkb->lkb_rsb_lookup);
1638                 _request_lock(r, lkb);
1639                 schedule();
1640         }
1641 }
1642
1643 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1644
1645 static void confirm_master(struct dlm_rsb *r, int error)
1646 {
1647         struct dlm_lkb *lkb;
1648
1649         if (!r->res_first_lkid)
1650                 return;
1651
1652         switch (error) {
1653         case 0:
1654         case -EINPROGRESS:
1655                 r->res_first_lkid = 0;
1656                 process_lookup_list(r);
1657                 break;
1658
1659         case -EAGAIN:
1660                 /* the remote master didn't queue our NOQUEUE request;
1661                    make a waiting lkb the first_lkid */
1662
1663                 r->res_first_lkid = 0;
1664
1665                 if (!list_empty(&r->res_lookup)) {
1666                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1667                                          lkb_rsb_lookup);
1668                         list_del_init(&lkb->lkb_rsb_lookup);
1669                         r->res_first_lkid = lkb->lkb_id;
1670                         _request_lock(r, lkb);
1671                 } else
1672                         r->res_nodeid = -1;
1673                 break;
1674
1675         default:
1676                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1677         }
1678 }
1679
1680 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1681                          int namelen, uint32_t parent_lkid, void *ast,
1682                          void *astarg, void *bast, struct dlm_args *args)
1683 {
1684         int rv = -EINVAL;
1685
1686         /* check for invalid arg usage */
1687
1688         if (mode < 0 || mode > DLM_LOCK_EX)
1689                 goto out;
1690
1691         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1692                 goto out;
1693
1694         if (flags & DLM_LKF_CANCEL)
1695                 goto out;
1696
1697         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1698                 goto out;
1699
1700         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1701                 goto out;
1702
1703         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1704                 goto out;
1705
1706         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1707                 goto out;
1708
1709         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1710                 goto out;
1711
1712         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1713                 goto out;
1714
1715         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1716                 goto out;
1717
1718         if (!ast || !lksb)
1719                 goto out;
1720
1721         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1722                 goto out;
1723
1724         /* parent/child locks not yet supported */
1725         if (parent_lkid)
1726                 goto out;
1727
1728         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1729                 goto out;
1730
1731         /* these args will be copied to the lkb in validate_lock_args,
1732            it cannot be done now because when converting locks, fields in
1733            an active lkb cannot be modified before locking the rsb */
1734
1735         args->flags = flags;
1736         args->astaddr = ast;
1737         args->astparam = (long) astarg;
1738         args->bastaddr = bast;
1739         args->mode = mode;
1740         args->lksb = lksb;
1741         rv = 0;
1742  out:
1743         return rv;
1744 }
1745
1746 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1747 {
1748         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1749                       DLM_LKF_FORCEUNLOCK))
1750                 return -EINVAL;
1751
1752         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1753                 return -EINVAL;
1754
1755         args->flags = flags;
1756         args->astparam = (long) astarg;
1757         return 0;
1758 }
1759
1760 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1761                               struct dlm_args *args)
1762 {
1763         int rv = -EINVAL;
1764
1765         if (args->flags & DLM_LKF_CONVERT) {
1766                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1767                         goto out;
1768
1769                 if (args->flags & DLM_LKF_QUECVT &&
1770                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1771                         goto out;
1772
1773                 rv = -EBUSY;
1774                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1775                         goto out;
1776
1777                 if (lkb->lkb_wait_type)
1778                         goto out;
1779
1780                 if (is_overlap(lkb))
1781                         goto out;
1782         }
1783
1784         lkb->lkb_exflags = args->flags;
1785         lkb->lkb_sbflags = 0;
1786         lkb->lkb_astaddr = args->astaddr;
1787         lkb->lkb_astparam = args->astparam;
1788         lkb->lkb_bastaddr = args->bastaddr;
1789         lkb->lkb_rqmode = args->mode;
1790         lkb->lkb_lksb = args->lksb;
1791         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1792         lkb->lkb_ownpid = (int) current->pid;
1793         rv = 0;
1794  out:
1795         return rv;
1796 }
1797
1798 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1799    for success */
1800
1801 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1802    because there may be a lookup in progress and it's valid to do
1803    cancel/unlockf on it */
1804
1805 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1806 {
1807         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1808         int rv = -EINVAL;
1809
1810         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1811                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1812                 dlm_print_lkb(lkb);
1813                 goto out;
1814         }
1815
1816         /* an lkb may still exist even though the lock is EOL'ed due to a
1817            cancel, unlock or failed noqueue request; an app can't use these
1818            locks; return same error as if the lkid had not been found at all */
1819
1820         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1821                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1822                 rv = -ENOENT;
1823                 goto out;
1824         }
1825
1826         /* an lkb may be waiting for an rsb lookup to complete where the
1827            lookup was initiated by another lock */
1828
1829         if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1830                 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1831                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1832                         list_del_init(&lkb->lkb_rsb_lookup);
1833                         queue_cast(lkb->lkb_resource, lkb,
1834                                    args->flags & DLM_LKF_CANCEL ?
1835                                    -DLM_ECANCEL : -DLM_EUNLOCK);
1836                         unhold_lkb(lkb); /* undoes create_lkb() */
1837                         rv = -EBUSY;
1838                         goto out;
1839                 }
1840         }
1841
1842         /* cancel not allowed with another cancel/unlock in progress */
1843
1844         if (args->flags & DLM_LKF_CANCEL) {
1845                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1846                         goto out;
1847
1848                 if (is_overlap(lkb))
1849                         goto out;
1850
1851                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1852                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1853                         rv = -EBUSY;
1854                         goto out;
1855                 }
1856
1857                 switch (lkb->lkb_wait_type) {
1858                 case DLM_MSG_LOOKUP:
1859                 case DLM_MSG_REQUEST:
1860                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1861                         rv = -EBUSY;
1862                         goto out;
1863                 case DLM_MSG_UNLOCK:
1864                 case DLM_MSG_CANCEL:
1865                         goto out;
1866                 }
1867                 /* add_to_waiters() will set OVERLAP_CANCEL */
1868                 goto out_ok;
1869         }
1870
1871         /* do we need to allow a force-unlock if there's a normal unlock
1872            already in progress?  in what conditions could the normal unlock
1873            fail such that we'd want to send a force-unlock to be sure? */
1874
1875         if (args->flags & DLM_LKF_FORCEUNLOCK) {
1876                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1877                         goto out;
1878
1879                 if (is_overlap_unlock(lkb))
1880                         goto out;
1881
1882                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1883                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1884                         rv = -EBUSY;
1885                         goto out;
1886                 }
1887
1888                 switch (lkb->lkb_wait_type) {
1889                 case DLM_MSG_LOOKUP:
1890                 case DLM_MSG_REQUEST:
1891                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1892                         rv = -EBUSY;
1893                         goto out;
1894                 case DLM_MSG_UNLOCK:
1895                         goto out;
1896                 }
1897                 /* add_to_waiters() will set OVERLAP_UNLOCK */
1898                 goto out_ok;
1899         }
1900
1901         /* normal unlock not allowed if there's any op in progress */
1902         rv = -EBUSY;
1903         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1904                 goto out;
1905
1906  out_ok:
1907         /* an overlapping op shouldn't blow away exflags from other op */
1908         lkb->lkb_exflags |= args->flags;
1909         lkb->lkb_sbflags = 0;
1910         lkb->lkb_astparam = args->astparam;
1911         rv = 0;
1912  out:
1913         if (rv)
1914                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1915                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1916                           args->flags, lkb->lkb_wait_type,
1917                           lkb->lkb_resource->res_name);
1918         return rv;
1919 }
1920
1921 /*
1922  * Four stage 4 varieties:
1923  * do_request(), do_convert(), do_unlock(), do_cancel()
1924  * These are called on the master node for the given lock and
1925  * from the central locking logic.
1926  */
1927
1928 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1929 {
1930         int error = 0;
1931
1932         if (can_be_granted(r, lkb, 1)) {
1933                 grant_lock(r, lkb);
1934                 queue_cast(r, lkb, 0);
1935                 goto out;
1936         }
1937
1938         if (can_be_queued(lkb)) {
1939                 error = -EINPROGRESS;
1940                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1941                 send_blocking_asts(r, lkb);
1942                 goto out;
1943         }
1944
1945         error = -EAGAIN;
1946         if (force_blocking_asts(lkb))
1947                 send_blocking_asts_all(r, lkb);
1948         queue_cast(r, lkb, -EAGAIN);
1949
1950  out:
1951         return error;
1952 }
1953
1954 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1955 {
1956         int error = 0;
1957
1958         /* changing an existing lock may allow others to be granted */
1959
1960         if (can_be_granted(r, lkb, 1)) {
1961                 grant_lock(r, lkb);
1962                 queue_cast(r, lkb, 0);
1963                 grant_pending_locks(r);
1964                 goto out;
1965         }
1966
1967         if (can_be_queued(lkb)) {
1968                 if (is_demoted(lkb))
1969                         grant_pending_locks(r);
1970                 error = -EINPROGRESS;
1971                 del_lkb(r, lkb);
1972                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1973                 send_blocking_asts(r, lkb);
1974                 goto out;
1975         }
1976
1977         error = -EAGAIN;
1978         if (force_blocking_asts(lkb))
1979                 send_blocking_asts_all(r, lkb);
1980         queue_cast(r, lkb, -EAGAIN);
1981
1982  out:
1983         return error;
1984 }
1985
1986 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1987 {
1988         remove_lock(r, lkb);
1989         queue_cast(r, lkb, -DLM_EUNLOCK);
1990         grant_pending_locks(r);
1991         return -DLM_EUNLOCK;
1992 }
1993
1994 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
1995  
1996 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1997 {
1998         int error;
1999
2000         error = revert_lock(r, lkb);
2001         if (error) {
2002                 queue_cast(r, lkb, -DLM_ECANCEL);
2003                 grant_pending_locks(r);
2004                 return -DLM_ECANCEL;
2005         }
2006         return 0;
2007 }
2008
2009 /*
2010  * Four stage 3 varieties:
2011  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2012  */
2013
2014 /* add a new lkb to a possibly new rsb, called by requesting process */
2015
2016 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2017 {
2018         int error;
2019
2020         /* set_master: sets lkb nodeid from r */
2021
2022         error = set_master(r, lkb);
2023         if (error < 0)
2024                 goto out;
2025         if (error) {
2026                 error = 0;
2027                 goto out;
2028         }
2029
2030         if (is_remote(r))
2031                 /* receive_request() calls do_request() on remote node */
2032                 error = send_request(r, lkb);
2033         else
2034                 error = do_request(r, lkb);
2035  out:
2036         return error;
2037 }
2038
2039 /* change some property of an existing lkb, e.g. mode */
2040
2041 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2042 {
2043         int error;
2044
2045         if (is_remote(r))
2046                 /* receive_convert() calls do_convert() on remote node */
2047                 error = send_convert(r, lkb);
2048         else
2049                 error = do_convert(r, lkb);
2050
2051         return error;
2052 }
2053
2054 /* remove an existing lkb from the granted queue */
2055
2056 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2057 {
2058         int error;
2059
2060         if (is_remote(r))
2061                 /* receive_unlock() calls do_unlock() on remote node */
2062                 error = send_unlock(r, lkb);
2063         else
2064                 error = do_unlock(r, lkb);
2065
2066         return error;
2067 }
2068
2069 /* remove an existing lkb from the convert or wait queue */
2070
2071 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2072 {
2073         int error;
2074
2075         if (is_remote(r))
2076                 /* receive_cancel() calls do_cancel() on remote node */
2077                 error = send_cancel(r, lkb);
2078         else
2079                 error = do_cancel(r, lkb);
2080
2081         return error;
2082 }
2083
2084 /*
2085  * Four stage 2 varieties:
2086  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2087  */
2088
2089 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2090                         int len, struct dlm_args *args)
2091 {
2092         struct dlm_rsb *r;
2093         int error;
2094
2095         error = validate_lock_args(ls, lkb, args);
2096         if (error)
2097                 goto out;
2098
2099         error = find_rsb(ls, name, len, R_CREATE, &r);
2100         if (error)
2101                 goto out;
2102
2103         lock_rsb(r);
2104
2105         attach_lkb(r, lkb);
2106         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2107
2108         error = _request_lock(r, lkb);
2109
2110         unlock_rsb(r);
2111         put_rsb(r);
2112
2113  out:
2114         return error;
2115 }
2116
2117 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2118                         struct dlm_args *args)
2119 {
2120         struct dlm_rsb *r;
2121         int error;
2122
2123         r = lkb->lkb_resource;
2124
2125         hold_rsb(r);
2126         lock_rsb(r);
2127
2128         error = validate_lock_args(ls, lkb, args);
2129         if (error)
2130                 goto out;
2131
2132         error = _convert_lock(r, lkb);
2133  out:
2134         unlock_rsb(r);
2135         put_rsb(r);
2136         return error;
2137 }
2138
2139 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2140                        struct dlm_args *args)
2141 {
2142         struct dlm_rsb *r;
2143         int error;
2144
2145         r = lkb->lkb_resource;
2146
2147         hold_rsb(r);
2148         lock_rsb(r);
2149
2150         error = validate_unlock_args(lkb, args);
2151         if (error)
2152                 goto out;
2153
2154         error = _unlock_lock(r, lkb);
2155  out:
2156         unlock_rsb(r);
2157         put_rsb(r);
2158         return error;
2159 }
2160
2161 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2162                        struct dlm_args *args)
2163 {
2164         struct dlm_rsb *r;
2165         int error;
2166
2167         r = lkb->lkb_resource;
2168
2169         hold_rsb(r);
2170         lock_rsb(r);
2171
2172         error = validate_unlock_args(lkb, args);
2173         if (error)
2174                 goto out;
2175
2176         error = _cancel_lock(r, lkb);
2177  out:
2178         unlock_rsb(r);
2179         put_rsb(r);
2180         return error;
2181 }
2182
2183 /*
2184  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2185  */
2186
2187 int dlm_lock(dlm_lockspace_t *lockspace,
2188              int mode,
2189              struct dlm_lksb *lksb,
2190              uint32_t flags,
2191              void *name,
2192              unsigned int namelen,
2193              uint32_t parent_lkid,
2194              void (*ast) (void *astarg),
2195              void *astarg,
2196              void (*bast) (void *astarg, int mode))
2197 {
2198         struct dlm_ls *ls;
2199         struct dlm_lkb *lkb;
2200         struct dlm_args args;
2201         int error, convert = flags & DLM_LKF_CONVERT;
2202
2203         ls = dlm_find_lockspace_local(lockspace);
2204         if (!ls)
2205                 return -EINVAL;
2206
2207         lock_recovery(ls);
2208
2209         if (convert)
2210                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2211         else
2212                 error = create_lkb(ls, &lkb);
2213
2214         if (error)
2215                 goto out;
2216
2217         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
2218                               astarg, bast, &args);
2219         if (error)
2220                 goto out_put;
2221
2222         if (convert)
2223                 error = convert_lock(ls, lkb, &args);
2224         else
2225                 error = request_lock(ls, lkb, name, namelen, &args);
2226
2227         if (error == -EINPROGRESS)
2228                 error = 0;
2229  out_put:
2230         if (convert || error)
2231                 __put_lkb(ls, lkb);
2232         if (error == -EAGAIN)
2233                 error = 0;
2234  out:
2235         unlock_recovery(ls);
2236         dlm_put_lockspace(ls);
2237         return error;
2238 }
2239
2240 int dlm_unlock(dlm_lockspace_t *lockspace,
2241                uint32_t lkid,
2242                uint32_t flags,
2243                struct dlm_lksb *lksb,
2244                void *astarg)
2245 {
2246         struct dlm_ls *ls;
2247         struct dlm_lkb *lkb;
2248         struct dlm_args args;
2249         int error;
2250
2251         ls = dlm_find_lockspace_local(lockspace);
2252         if (!ls)
2253                 return -EINVAL;
2254
2255         lock_recovery(ls);
2256
2257         error = find_lkb(ls, lkid, &lkb);
2258         if (error)
2259                 goto out;
2260
2261         error = set_unlock_args(flags, astarg, &args);
2262         if (error)
2263                 goto out_put;
2264
2265         if (flags & DLM_LKF_CANCEL)
2266                 error = cancel_lock(ls, lkb, &args);
2267         else
2268                 error = unlock_lock(ls, lkb, &args);
2269
2270         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2271                 error = 0;
2272         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2273                 error = 0;
2274  out_put:
2275         dlm_put_lkb(lkb);
2276  out:
2277         unlock_recovery(ls);
2278         dlm_put_lockspace(ls);
2279         return error;
2280 }
2281
2282 /*
2283  * send/receive routines for remote operations and replies
2284  *
2285  * send_args
2286  * send_common
2287  * send_request                 receive_request
2288  * send_convert                 receive_convert
2289  * send_unlock                  receive_unlock
2290  * send_cancel                  receive_cancel
2291  * send_grant                   receive_grant
2292  * send_bast                    receive_bast
2293  * send_lookup                  receive_lookup
2294  * send_remove                  receive_remove
2295  *
2296  *                              send_common_reply
2297  * receive_request_reply        send_request_reply
2298  * receive_convert_reply        send_convert_reply
2299  * receive_unlock_reply         send_unlock_reply
2300  * receive_cancel_reply         send_cancel_reply
2301  * receive_lookup_reply         send_lookup_reply
2302  */
2303
2304 static int _create_message(struct dlm_ls *ls, int mb_len,
2305                            int to_nodeid, int mstype,
2306                            struct dlm_message **ms_ret,
2307                            struct dlm_mhandle **mh_ret)
2308 {
2309         struct dlm_message *ms;
2310         struct dlm_mhandle *mh;
2311         char *mb;
2312
2313         /* get_buffer gives us a message handle (mh) that we need to
2314            pass into lowcomms_commit and a message buffer (mb) that we
2315            write our data into */
2316
2317         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2318         if (!mh)
2319                 return -ENOBUFS;
2320
2321         memset(mb, 0, mb_len);
2322
2323         ms = (struct dlm_message *) mb;
2324
2325         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2326         ms->m_header.h_lockspace = ls->ls_global_id;
2327         ms->m_header.h_nodeid = dlm_our_nodeid();
2328         ms->m_header.h_length = mb_len;
2329         ms->m_header.h_cmd = DLM_MSG;
2330
2331         ms->m_type = mstype;
2332
2333         *mh_ret = mh;
2334         *ms_ret = ms;
2335         return 0;
2336 }
2337
2338 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2339                           int to_nodeid, int mstype,
2340                           struct dlm_message **ms_ret,
2341                           struct dlm_mhandle **mh_ret)
2342 {
2343         int mb_len = sizeof(struct dlm_message);
2344
2345         switch (mstype) {
2346         case DLM_MSG_REQUEST:
2347         case DLM_MSG_LOOKUP:
2348         case DLM_MSG_REMOVE:
2349                 mb_len += r->res_length;
2350                 break;
2351         case DLM_MSG_CONVERT:
2352         case DLM_MSG_UNLOCK:
2353         case DLM_MSG_REQUEST_REPLY:
2354         case DLM_MSG_CONVERT_REPLY:
2355         case DLM_MSG_GRANT:
2356                 if (lkb && lkb->lkb_lvbptr)
2357                         mb_len += r->res_ls->ls_lvblen;
2358                 break;
2359         }
2360
2361         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2362                                ms_ret, mh_ret);
2363 }
2364
2365 /* further lowcomms enhancements or alternate implementations may make
2366    the return value from this function useful at some point */
2367
2368 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2369 {
2370         dlm_message_out(ms);
2371         dlm_lowcomms_commit_buffer(mh);
2372         return 0;
2373 }
2374
2375 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2376                       struct dlm_message *ms)
2377 {
2378         ms->m_nodeid   = lkb->lkb_nodeid;
2379         ms->m_pid      = lkb->lkb_ownpid;
2380         ms->m_lkid     = lkb->lkb_id;
2381         ms->m_remid    = lkb->lkb_remid;
2382         ms->m_exflags  = lkb->lkb_exflags;
2383         ms->m_sbflags  = lkb->lkb_sbflags;
2384         ms->m_flags    = lkb->lkb_flags;
2385         ms->m_lvbseq   = lkb->lkb_lvbseq;
2386         ms->m_status   = lkb->lkb_status;
2387         ms->m_grmode   = lkb->lkb_grmode;
2388         ms->m_rqmode   = lkb->lkb_rqmode;
2389         ms->m_hash     = r->res_hash;
2390
2391         /* m_result and m_bastmode are set from function args,
2392            not from lkb fields */
2393
2394         if (lkb->lkb_bastaddr)
2395                 ms->m_asts |= AST_BAST;
2396         if (lkb->lkb_astaddr)
2397                 ms->m_asts |= AST_COMP;
2398
2399         /* compare with switch in create_message; send_remove() doesn't
2400            use send_args() */
2401
2402         switch (ms->m_type) {
2403         case DLM_MSG_REQUEST:
2404         case DLM_MSG_LOOKUP:
2405                 memcpy(ms->m_extra, r->res_name, r->res_length);
2406                 break;
2407         case DLM_MSG_CONVERT:
2408         case DLM_MSG_UNLOCK:
2409         case DLM_MSG_REQUEST_REPLY:
2410         case DLM_MSG_CONVERT_REPLY:
2411         case DLM_MSG_GRANT:
2412                 if (!lkb->lkb_lvbptr)
2413                         break;
2414                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2415                 break;
2416         }
2417 }
2418
2419 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2420 {
2421         struct dlm_message *ms;
2422         struct dlm_mhandle *mh;
2423         int to_nodeid, error;
2424
2425         error = add_to_waiters(lkb, mstype);
2426         if (error)
2427                 return error;
2428
2429         to_nodeid = r->res_nodeid;
2430
2431         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2432         if (error)
2433                 goto fail;
2434
2435         send_args(r, lkb, ms);
2436
2437         error = send_message(mh, ms);
2438         if (error)
2439                 goto fail;
2440         return 0;
2441
2442  fail:
2443         remove_from_waiters(lkb, msg_reply_type(mstype));
2444         return error;
2445 }
2446
2447 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2448 {
2449         return send_common(r, lkb, DLM_MSG_REQUEST);
2450 }
2451
2452 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2453 {
2454         int error;
2455
2456         error = send_common(r, lkb, DLM_MSG_CONVERT);
2457
2458         /* down conversions go without a reply from the master */
2459         if (!error && down_conversion(lkb)) {
2460                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2461                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2462                 r->res_ls->ls_stub_ms.m_result = 0;
2463                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2464                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2465         }
2466
2467         return error;
2468 }
2469
2470 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2471    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2472    that the master is still correct. */
2473
2474 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2475 {
2476         return send_common(r, lkb, DLM_MSG_UNLOCK);
2477 }
2478
2479 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2480 {
2481         return send_common(r, lkb, DLM_MSG_CANCEL);
2482 }
2483
2484 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2485 {
2486         struct dlm_message *ms;
2487         struct dlm_mhandle *mh;
2488         int to_nodeid, error;
2489
2490         to_nodeid = lkb->lkb_nodeid;
2491
2492         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2493         if (error)
2494                 goto out;
2495
2496         send_args(r, lkb, ms);
2497
2498         ms->m_result = 0;
2499
2500         error = send_message(mh, ms);
2501  out:
2502         return error;
2503 }
2504
2505 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2506 {
2507         struct dlm_message *ms;
2508         struct dlm_mhandle *mh;
2509         int to_nodeid, error;
2510
2511         to_nodeid = lkb->lkb_nodeid;
2512
2513         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2514         if (error)
2515                 goto out;
2516
2517         send_args(r, lkb, ms);
2518
2519         ms->m_bastmode = mode;
2520
2521         error = send_message(mh, ms);
2522  out:
2523         return error;
2524 }
2525
2526 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2527 {
2528         struct dlm_message *ms;
2529         struct dlm_mhandle *mh;
2530         int to_nodeid, error;
2531
2532         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2533         if (error)
2534                 return error;
2535
2536         to_nodeid = dlm_dir_nodeid(r);
2537
2538         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2539         if (error)
2540                 goto fail;
2541
2542         send_args(r, lkb, ms);
2543
2544         error = send_message(mh, ms);
2545         if (error)
2546                 goto fail;
2547         return 0;
2548
2549  fail:
2550         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2551         return error;
2552 }
2553
2554 static int send_remove(struct dlm_rsb *r)
2555 {
2556         struct dlm_message *ms;
2557         struct dlm_mhandle *mh;
2558         int to_nodeid, error;
2559
2560         to_nodeid = dlm_dir_nodeid(r);
2561
2562         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2563         if (error)
2564                 goto out;
2565
2566         memcpy(ms->m_extra, r->res_name, r->res_length);
2567         ms->m_hash = r->res_hash;
2568
2569         error = send_message(mh, ms);
2570  out:
2571         return error;
2572 }
2573
2574 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2575                              int mstype, int rv)
2576 {
2577         struct dlm_message *ms;
2578         struct dlm_mhandle *mh;
2579         int to_nodeid, error;
2580
2581         to_nodeid = lkb->lkb_nodeid;
2582
2583         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2584         if (error)
2585                 goto out;
2586
2587         send_args(r, lkb, ms);
2588
2589         ms->m_result = rv;
2590
2591         error = send_message(mh, ms);
2592  out:
2593         return error;
2594 }
2595
2596 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2597 {
2598         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2599 }
2600
2601 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2602 {
2603         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2604 }
2605
2606 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2607 {
2608         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2609 }
2610
2611 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2612 {
2613         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2614 }
2615
2616 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2617                              int ret_nodeid, int rv)
2618 {
2619         struct dlm_rsb *r = &ls->ls_stub_rsb;
2620         struct dlm_message *ms;
2621         struct dlm_mhandle *mh;
2622         int error, nodeid = ms_in->m_header.h_nodeid;
2623
2624         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2625         if (error)
2626                 goto out;
2627
2628         ms->m_lkid = ms_in->m_lkid;
2629         ms->m_result = rv;
2630         ms->m_nodeid = ret_nodeid;
2631
2632         error = send_message(mh, ms);
2633  out:
2634         return error;
2635 }
2636
2637 /* which args we save from a received message depends heavily on the type
2638    of message, unlike the send side where we can safely send everything about
2639    the lkb for any type of message */
2640
2641 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2642 {
2643         lkb->lkb_exflags = ms->m_exflags;
2644         lkb->lkb_sbflags = ms->m_sbflags;
2645         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2646                          (ms->m_flags & 0x0000FFFF);
2647 }
2648
2649 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2650 {
2651         lkb->lkb_sbflags = ms->m_sbflags;
2652         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2653                          (ms->m_flags & 0x0000FFFF);
2654 }
2655
2656 static int receive_extralen(struct dlm_message *ms)
2657 {
2658         return (ms->m_header.h_length - sizeof(struct dlm_message));
2659 }
2660
2661 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2662                        struct dlm_message *ms)
2663 {
2664         int len;
2665
2666         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2667                 if (!lkb->lkb_lvbptr)
2668                         lkb->lkb_lvbptr = allocate_lvb(ls);
2669                 if (!lkb->lkb_lvbptr)
2670                         return -ENOMEM;
2671                 len = receive_extralen(ms);
2672                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2673         }
2674         return 0;
2675 }
2676
2677 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2678                                 struct dlm_message *ms)
2679 {
2680         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2681         lkb->lkb_ownpid = ms->m_pid;
2682         lkb->lkb_remid = ms->m_lkid;
2683         lkb->lkb_grmode = DLM_LOCK_IV;
2684         lkb->lkb_rqmode = ms->m_rqmode;
2685         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2686         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2687
2688         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2689
2690         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2691                 /* lkb was just created so there won't be an lvb yet */
2692                 lkb->lkb_lvbptr = allocate_lvb(ls);
2693                 if (!lkb->lkb_lvbptr)
2694                         return -ENOMEM;
2695         }
2696
2697         return 0;
2698 }
2699
2700 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2701                                 struct dlm_message *ms)
2702 {
2703         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2704                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2705                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2706                           lkb->lkb_id, lkb->lkb_remid);
2707                 return -EINVAL;
2708         }
2709
2710         if (!is_master_copy(lkb))
2711                 return -EINVAL;
2712
2713         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2714                 return -EBUSY;
2715
2716         if (receive_lvb(ls, lkb, ms))
2717                 return -ENOMEM;
2718
2719         lkb->lkb_rqmode = ms->m_rqmode;
2720         lkb->lkb_lvbseq = ms->m_lvbseq;
2721
2722         return 0;
2723 }
2724
2725 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2726                                struct dlm_message *ms)
2727 {
2728         if (!is_master_copy(lkb))
2729                 return -EINVAL;
2730         if (receive_lvb(ls, lkb, ms))
2731                 return -ENOMEM;
2732         return 0;
2733 }
2734
2735 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2736    uses to send a reply and that the remote end uses to process the reply. */
2737
2738 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2739 {
2740         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2741         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2742         lkb->lkb_remid = ms->m_lkid;
2743 }
2744
2745 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2746 {
2747         struct dlm_lkb *lkb;
2748         struct dlm_rsb *r;
2749         int error, namelen;
2750
2751         error = create_lkb(ls, &lkb);
2752         if (error)
2753                 goto fail;
2754
2755         receive_flags(lkb, ms);
2756         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2757         error = receive_request_args(ls, lkb, ms);
2758         if (error) {
2759                 __put_lkb(ls, lkb);
2760                 goto fail;
2761         }
2762
2763         namelen = receive_extralen(ms);
2764
2765         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2766         if (error) {
2767                 __put_lkb(ls, lkb);
2768                 goto fail;
2769         }
2770
2771         lock_rsb(r);
2772
2773         attach_lkb(r, lkb);
2774         error = do_request(r, lkb);
2775         send_request_reply(r, lkb, error);
2776
2777         unlock_rsb(r);
2778         put_rsb(r);
2779
2780         if (error == -EINPROGRESS)
2781                 error = 0;
2782         if (error)
2783                 dlm_put_lkb(lkb);
2784         return;
2785
2786  fail:
2787         setup_stub_lkb(ls, ms);
2788         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2789 }
2790
2791 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2792 {
2793         struct dlm_lkb *lkb;
2794         struct dlm_rsb *r;
2795         int error, reply = 1;
2796
2797         error = find_lkb(ls, ms->m_remid, &lkb);
2798         if (error)
2799                 goto fail;
2800
2801         r = lkb->lkb_resource;
2802
2803         hold_rsb(r);
2804         lock_rsb(r);
2805
2806         receive_flags(lkb, ms);
2807         error = receive_convert_args(ls, lkb, ms);
2808         if (error)
2809                 goto out;
2810         reply = !down_conversion(lkb);
2811
2812         error = do_convert(r, lkb);
2813  out:
2814         if (reply)
2815                 send_convert_reply(r, lkb, error);
2816
2817         unlock_rsb(r);
2818         put_rsb(r);
2819         dlm_put_lkb(lkb);
2820         return;
2821
2822  fail:
2823         setup_stub_lkb(ls, ms);
2824         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2825 }
2826
2827 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2828 {
2829         struct dlm_lkb *lkb;
2830         struct dlm_rsb *r;
2831         int error;
2832
2833         error = find_lkb(ls, ms->m_remid, &lkb);
2834         if (error)
2835                 goto fail;
2836
2837         r = lkb->lkb_resource;
2838
2839         hold_rsb(r);
2840         lock_rsb(r);
2841
2842         receive_flags(lkb, ms);
2843         error = receive_unlock_args(ls, lkb, ms);
2844         if (error)
2845                 goto out;
2846
2847         error = do_unlock(r, lkb);
2848  out:
2849         send_unlock_reply(r, lkb, error);
2850
2851         unlock_rsb(r);
2852         put_rsb(r);
2853         dlm_put_lkb(lkb);
2854         return;
2855
2856  fail:
2857         setup_stub_lkb(ls, ms);
2858         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2859 }
2860
2861 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2862 {
2863         struct dlm_lkb *lkb;
2864         struct dlm_rsb *r;
2865         int error;
2866
2867         error = find_lkb(ls, ms->m_remid, &lkb);
2868         if (error)
2869                 goto fail;
2870
2871         receive_flags(lkb, ms);
2872
2873         r = lkb->lkb_resource;
2874
2875         hold_rsb(r);
2876         lock_rsb(r);
2877
2878         error = do_cancel(r, lkb);
2879         send_cancel_reply(r, lkb, error);
2880
2881         unlock_rsb(r);
2882         put_rsb(r);
2883         dlm_put_lkb(lkb);
2884         return;
2885
2886  fail:
2887         setup_stub_lkb(ls, ms);
2888         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2889 }
2890
2891 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2892 {
2893         struct dlm_lkb *lkb;
2894         struct dlm_rsb *r;
2895         int error;
2896
2897         error = find_lkb(ls, ms->m_remid, &lkb);
2898         if (error) {
2899                 log_error(ls, "receive_grant no lkb");
2900                 return;
2901         }
2902         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2903
2904         r = lkb->lkb_resource;
2905
2906         hold_rsb(r);
2907         lock_rsb(r);
2908
2909         receive_flags_reply(lkb, ms);
2910         grant_lock_pc(r, lkb, ms);
2911         queue_cast(r, lkb, 0);
2912
2913         unlock_rsb(r);
2914         put_rsb(r);
2915         dlm_put_lkb(lkb);
2916 }
2917
2918 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2919 {
2920         struct dlm_lkb *lkb;
2921         struct dlm_rsb *r;
2922         int error;
2923
2924         error = find_lkb(ls, ms->m_remid, &lkb);
2925         if (error) {
2926                 log_error(ls, "receive_bast no lkb");
2927                 return;
2928         }
2929         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2930
2931         r = lkb->lkb_resource;
2932
2933         hold_rsb(r);
2934         lock_rsb(r);
2935
2936         queue_bast(r, lkb, ms->m_bastmode);
2937
2938         unlock_rsb(r);
2939         put_rsb(r);
2940         dlm_put_lkb(lkb);
2941 }
2942
2943 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2944 {
2945         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2946
2947         from_nodeid = ms->m_header.h_nodeid;
2948         our_nodeid = dlm_our_nodeid();
2949
2950         len = receive_extralen(ms);
2951
2952         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2953         if (dir_nodeid != our_nodeid) {
2954                 log_error(ls, "lookup dir_nodeid %d from %d",
2955                           dir_nodeid, from_nodeid);
2956                 error = -EINVAL;
2957                 ret_nodeid = -1;
2958                 goto out;
2959         }
2960
2961         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2962
2963         /* Optimization: we're master so treat lookup as a request */
2964         if (!error && ret_nodeid == our_nodeid) {
2965                 receive_request(ls, ms);
2966                 return;
2967         }
2968  out:
2969         send_lookup_reply(ls, ms, ret_nodeid, error);
2970 }
2971
2972 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2973 {
2974         int len, dir_nodeid, from_nodeid;
2975
2976         from_nodeid = ms->m_header.h_nodeid;
2977
2978         len = receive_extralen(ms);
2979
2980         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2981         if (dir_nodeid != dlm_our_nodeid()) {
2982                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2983                           dir_nodeid, from_nodeid);
2984                 return;
2985         }
2986
2987         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2988 }
2989
2990 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2991 {
2992         struct dlm_lkb *lkb;
2993         struct dlm_rsb *r;
2994         int error, mstype, result;
2995
2996         error = find_lkb(ls, ms->m_remid, &lkb);
2997         if (error) {
2998                 log_error(ls, "receive_request_reply no lkb");
2999                 return;
3000         }
3001         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3002
3003         r = lkb->lkb_resource;
3004         hold_rsb(r);
3005         lock_rsb(r);
3006
3007         mstype = lkb->lkb_wait_type;
3008         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3009         if (error)
3010                 goto out;
3011
3012         /* Optimization: the dir node was also the master, so it took our
3013            lookup as a request and sent request reply instead of lookup reply */
3014         if (mstype == DLM_MSG_LOOKUP) {
3015                 r->res_nodeid = ms->m_header.h_nodeid;
3016                 lkb->lkb_nodeid = r->res_nodeid;
3017         }
3018
3019         /* this is the value returned from do_request() on the master */
3020         result = ms->m_result;
3021
3022         switch (result) {
3023         case -EAGAIN:
3024                 /* request would block (be queued) on remote master */
3025                 queue_cast(r, lkb, -EAGAIN);
3026                 confirm_master(r, -EAGAIN);
3027                 unhold_lkb(lkb); /* undoes create_lkb() */
3028                 break;
3029
3030         case -EINPROGRESS:
3031         case 0:
3032                 /* request was queued or granted on remote master */
3033                 receive_flags_reply(lkb, ms);
3034                 lkb->lkb_remid = ms->m_lkid;
3035                 if (result)
3036                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3037                 else {
3038                         grant_lock_pc(r, lkb, ms);
3039                         queue_cast(r, lkb, 0);
3040                 }
3041                 confirm_master(r, result);
3042                 break;
3043
3044         case -EBADR:
3045         case -ENOTBLK:
3046                 /* find_rsb failed to find rsb or rsb wasn't master */
3047                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3048                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3049                 r->res_nodeid = -1;
3050                 lkb->lkb_nodeid = -1;
3051
3052                 if (is_overlap(lkb)) {
3053                         /* we'll ignore error in cancel/unlock reply */
3054                         queue_cast_overlap(r, lkb);
3055                         unhold_lkb(lkb); /* undoes create_lkb() */
3056                 } else
3057                         _request_lock(r, lkb);
3058                 break;
3059
3060         default:
3061                 log_error(ls, "receive_request_reply %x error %d",
3062                           lkb->lkb_id, result);
3063         }
3064
3065         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3066                 log_debug(ls, "receive_request_reply %x result %d unlock",
3067                           lkb->lkb_id, result);
3068                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3069                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3070                 send_unlock(r, lkb);
3071         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3072                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3073                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3074                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3075                 send_cancel(r, lkb);
3076         } else {
3077                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3078                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3079         }
3080  out:
3081         unlock_rsb(r);
3082         put_rsb(r);
3083         dlm_put_lkb(lkb);
3084 }
3085
3086 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3087                                     struct dlm_message *ms)
3088 {
3089         /* this is the value returned from do_convert() on the master */
3090         switch (ms->m_result) {
3091         case -EAGAIN:
3092                 /* convert would block (be queued) on remote master */
3093                 queue_cast(r, lkb, -EAGAIN);
3094                 break;
3095
3096         case -EINPROGRESS:
3097                 /* convert was queued on remote master */
3098                 del_lkb(r, lkb);
3099                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3100                 break;
3101
3102         case 0:
3103                 /* convert was granted on remote master */
3104                 receive_flags_reply(lkb, ms);
3105                 grant_lock_pc(r, lkb, ms);
3106                 queue_cast(r, lkb, 0);
3107                 break;
3108
3109         default:
3110                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3111                           lkb->lkb_id, ms->m_result);
3112         }
3113 }
3114
3115 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3116 {
3117         struct dlm_rsb *r = lkb->lkb_resource;
3118         int error;
3119
3120         hold_rsb(r);
3121         lock_rsb(r);
3122
3123         /* stub reply can happen with waiters_mutex held */
3124         error = remove_from_waiters_ms(lkb, ms);
3125         if (error)
3126                 goto out;
3127
3128         __receive_convert_reply(r, lkb, ms);
3129  out:
3130         unlock_rsb(r);
3131         put_rsb(r);
3132 }
3133
3134 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3135 {
3136         struct dlm_lkb *lkb;
3137         int error;
3138
3139         error = find_lkb(ls, ms->m_remid, &lkb);
3140         if (error) {
3141                 log_error(ls, "receive_convert_reply no lkb");
3142                 return;
3143         }
3144         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3145
3146         _receive_convert_reply(lkb, ms);
3147         dlm_put_lkb(lkb);
3148 }
3149
3150 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3151 {
3152         struct dlm_rsb *r = lkb->lkb_resource;
3153         int error;
3154
3155         hold_rsb(r);
3156         lock_rsb(r);
3157
3158         /* stub reply can happen with waiters_mutex held */
3159         error = remove_from_waiters_ms(lkb, ms);
3160         if (error)
3161                 goto out;
3162
3163         /* this is the value returned from do_unlock() on the master */
3164
3165         switch (ms->m_result) {
3166         case -DLM_EUNLOCK:
3167                 receive_flags_reply(lkb, ms);
3168                 remove_lock_pc(r, lkb);
3169                 queue_cast(r, lkb, -DLM_EUNLOCK);
3170                 break;
3171         case -ENOENT:
3172                 break;
3173         default:
3174                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3175                           lkb->lkb_id, ms->m_result);
3176         }
3177  out:
3178         unlock_rsb(r);
3179         put_rsb(r);
3180 }
3181
3182 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3183 {
3184         struct dlm_lkb *lkb;
3185         int error;
3186
3187         error = find_lkb(ls, ms->m_remid, &lkb);
3188         if (error) {
3189                 log_error(ls, "receive_unlock_reply no lkb");
3190                 return;
3191         }
3192         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3193
3194         _receive_unlock_reply(lkb, ms);
3195         dlm_put_lkb(lkb);
3196 }
3197
3198 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3199 {
3200         struct dlm_rsb *r = lkb->lkb_resource;
3201         int error;
3202
3203         hold_rsb(r);
3204         lock_rsb(r);
3205
3206         /* stub reply can happen with waiters_mutex held */
3207         error = remove_from_waiters_ms(lkb, ms);
3208         if (error)
3209                 goto out;
3210
3211         /* this is the value returned from do_cancel() on the master */
3212
3213         switch (ms->m_result) {
3214         case -DLM_ECANCEL:
3215                 receive_flags_reply(lkb, ms);
3216                 revert_lock_pc(r, lkb);
3217                 if (ms->m_result)
3218                         queue_cast(r, lkb, -DLM_ECANCEL);
3219                 break;
3220         case 0:
3221                 break;
3222         default:
3223                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3224                           lkb->lkb_id, ms->m_result);
3225         }
3226  out:
3227         unlock_rsb(r);
3228         put_rsb(r);
3229 }
3230
3231 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3232 {
3233         struct dlm_lkb *lkb;
3234         int error;
3235
3236         error = find_lkb(ls, ms->m_remid, &lkb);
3237         if (error) {
3238                 log_error(ls, "receive_cancel_reply no lkb");
3239                 return;
3240         }
3241         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3242
3243         _receive_cancel_reply(lkb, ms);
3244         dlm_put_lkb(lkb);
3245 }
3246
3247 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3248 {
3249         struct dlm_lkb *lkb;
3250         struct dlm_rsb *r;
3251         int error, ret_nodeid;
3252
3253         error = find_lkb(ls, ms->m_lkid, &lkb);
3254         if (error) {
3255                 log_error(ls, "receive_lookup_reply no lkb");
3256                 return;
3257         }
3258
3259         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3260            FIXME: will a non-zero error ever be returned? */
3261
3262         r = lkb->lkb_resource;
3263         hold_rsb(r);
3264         lock_rsb(r);
3265
3266         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3267         if (error)
3268                 goto out;
3269
3270         ret_nodeid = ms->m_nodeid;
3271         if (ret_nodeid == dlm_our_nodeid()) {
3272                 r->res_nodeid = 0;
3273                 ret_nodeid = 0;
3274                 r->res_first_lkid = 0;
3275         } else {
3276                 /* set_master() will copy res_nodeid to lkb_nodeid */
3277                 r->res_nodeid = ret_nodeid;
3278         }
3279
3280         if (is_overlap(lkb)) {
3281                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3282                           lkb->lkb_id, lkb->lkb_flags);
3283                 queue_cast_overlap(r, lkb);
3284                 unhold_lkb(lkb); /* undoes create_lkb() */
3285                 goto out_list;
3286         }
3287
3288         _request_lock(r, lkb);
3289
3290  out_list:
3291         if (!ret_nodeid)
3292                 process_lookup_list(r);
3293  out:
3294         unlock_rsb(r);
3295         put_rsb(r);
3296         dlm_put_lkb(lkb);
3297 }
3298
3299 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3300 {
3301         struct dlm_message *ms = (struct dlm_message *) hd;
3302         struct dlm_ls *ls;
3303         int error = 0;
3304
3305         if (!recovery)
3306                 dlm_message_in(ms);
3307
3308         ls = dlm_find_lockspace_global(hd->h_lockspace);
3309         if (!ls) {
3310                 log_print("drop message %d from %d for unknown lockspace %d",
3311                           ms->m_type, nodeid, hd->h_lockspace);
3312                 return -EINVAL;
3313         }
3314
3315         /* recovery may have just ended leaving a bunch of backed-up requests
3316            in the requestqueue; wait while dlm_recoverd clears them */
3317
3318         if (!recovery)
3319                 dlm_wait_requestqueue(ls);
3320
3321         /* recovery may have just started while there were a bunch of
3322            in-flight requests -- save them in requestqueue to be processed
3323            after recovery.  we can't let dlm_recvd block on the recovery
3324            lock.  if dlm_recoverd is calling this function to clear the
3325            requestqueue, it needs to be interrupted (-EINTR) if another
3326            recovery operation is starting. */
3327
3328         while (1) {
3329                 if (dlm_locking_stopped(ls)) {
3330                         if (recovery) {
3331                                 error = -EINTR;
3332                                 goto out;
3333                         }
3334                         error = dlm_add_requestqueue(ls, nodeid, hd);
3335                         if (error == -EAGAIN)
3336                                 continue;
3337                         else {
3338                                 error = -EINTR;
3339                                 goto out;
3340                         }
3341                 }
3342
3343                 if (lock_recovery_try(ls))
3344                         break;
3345                 schedule();
3346         }
3347
3348         switch (ms->m_type) {
3349
3350         /* messages sent to a master node */
3351
3352         case DLM_MSG_REQUEST:
3353                 receive_request(ls, ms);
3354                 break;
3355
3356         case DLM_MSG_CONVERT:
3357                 receive_convert(ls, ms);
3358                 break;
3359
3360         case DLM_MSG_UNLOCK:
3361                 receive_unlock(ls, ms);
3362                 break;
3363
3364         case DLM_MSG_CANCEL:
3365                 receive_cancel(ls, ms);
3366                 break;
3367
3368         /* messages sent from a master node (replies to above) */
3369
3370         case DLM_MSG_REQUEST_REPLY:
3371                 receive_request_reply(ls, ms);
3372                 break;
3373
3374         case DLM_MSG_CONVERT_REPLY:
3375                 receive_convert_reply(ls, ms);
3376                 break;
3377
3378         case DLM_MSG_UNLOCK_REPLY:
3379                 receive_unlock_reply(ls, ms);
3380                 break;
3381
3382         case DLM_MSG_CANCEL_REPLY:
3383                 receive_cancel_reply(ls, ms);
3384                 break;
3385
3386         /* messages sent from a master node (only two types of async msg) */
3387
3388         case DLM_MSG_GRANT:
3389                 receive_grant(ls, ms);
3390                 break;
3391
3392         case DLM_MSG_BAST:
3393                 receive_bast(ls, ms);
3394                 break;
3395
3396         /* messages sent to a dir node */
3397
3398         case DLM_MSG_LOOKUP:
3399                 receive_lookup(ls, ms);
3400                 break;
3401
3402         case DLM_MSG_REMOVE:
3403                 receive_remove(ls, ms);
3404                 break;
3405
3406         /* messages sent from a dir node (remove has no reply) */
3407
3408         case DLM_MSG_LOOKUP_REPLY:
3409                 receive_lookup_reply(ls, ms);
3410                 break;
3411
3412         default:
3413                 log_error(ls, "unknown message type %d", ms->m_type);
3414         }
3415
3416         unlock_recovery(ls);
3417  out:
3418         dlm_put_lockspace(ls);
3419         dlm_astd_wake();
3420         return error;
3421 }
3422
3423
3424 /*
3425  * Recovery related
3426  */
3427
3428 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3429 {
3430         if (middle_conversion(lkb)) {
3431                 hold_lkb(lkb);
3432                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3433                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3434                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3435                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3436
3437                 /* Same special case as in receive_rcom_lock_args() */
3438                 lkb->lkb_grmode = DLM_LOCK_IV;
3439                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3440                 unhold_lkb(lkb);
3441
3442         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3443                 lkb->lkb_flags |= DLM_IFL_RESEND;
3444         }
3445
3446         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3447            conversions are async; there's no reply from the remote master */
3448 }
3449
3450 /* A waiting lkb needs recovery if the master node has failed, or
3451    the master node is changing (only when no directory is used) */
3452
3453 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3454 {
3455         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3456                 return 1;
3457
3458         if (!dlm_no_directory(ls))
3459                 return 0;
3460
3461         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3462                 return 1;
3463
3464         return 0;
3465 }
3466
3467 /* Recovery for locks that are waiting for replies from nodes that are now
3468    gone.  We can just complete unlocks and cancels by faking a reply from the
3469    dead node.  Requests and up-conversions we flag to be resent after
3470    recovery.  Down-conversions can just be completed with a fake reply like
3471    unlocks.  Conversions between PR and CW need special attention. */
3472
3473 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3474 {
3475         struct dlm_lkb *lkb, *safe;
3476
3477         mutex_lock(&ls->ls_waiters_mutex);
3478
3479         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3480                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3481                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3482
3483                 /* all outstanding lookups, regardless of destination  will be
3484                    resent after recovery is done */
3485
3486                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3487                         lkb->lkb_flags |= DLM_IFL_RESEND;
3488                         continue;
3489                 }
3490
3491                 if (!waiter_needs_recovery(ls, lkb))
3492                         continue;
3493
3494                 switch (lkb->lkb_wait_type) {
3495
3496                 case DLM_MSG_REQUEST:
3497                         lkb->lkb_flags |= DLM_IFL_RESEND;
3498                         break;
3499
3500                 case DLM_MSG_CONVERT:
3501                         recover_convert_waiter(ls, lkb);
3502                         break;
3503
3504                 case DLM_MSG_UNLOCK:
3505                         hold_lkb(lkb);
3506                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3507                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3508                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3509                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3510                         dlm_put_lkb(lkb);
3511                         break;
3512
3513                 case DLM_MSG_CANCEL:
3514                         hold_lkb(lkb);
3515                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3516                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3517                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3518                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3519                         dlm_put_lkb(lkb);
3520                         break;
3521
3522                 default:
3523                         log_error(ls, "invalid lkb wait_type %d",
3524                                   lkb->lkb_wait_type);
3525                 }
3526                 schedule();
3527         }
3528         mutex_unlock(&ls->ls_waiters_mutex);
3529 }
3530
3531 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3532 {
3533         struct dlm_lkb *lkb;
3534         int found = 0;
3535
3536         mutex_lock(&ls->ls_waiters_mutex);
3537         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3538                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3539                         hold_lkb(lkb);
3540                         found = 1;
3541                         break;
3542                 }
3543         }
3544         mutex_unlock(&ls->ls_waiters_mutex);
3545
3546         if (!found)
3547                 lkb = NULL;
3548         return lkb;
3549 }
3550
3551 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3552    master or dir-node for r.  Processing the lkb may result in it being placed
3553    back on waiters. */
3554
3555 /* We do this after normal locking has been enabled and any saved messages
3556    (in requestqueue) have been processed.  We should be confident that at
3557    this point we won't get or process a reply to any of these waiting
3558    operations.  But, new ops may be coming in on the rsbs/locks here from
3559    userspace or remotely. */
3560
3561 /* there may have been an overlap unlock/cancel prior to recovery or after
3562    recovery.  if before, the lkb may still have a pos wait_count; if after, the
3563    overlap flag would just have been set and nothing new sent.  we can be
3564    confident here than any replies to either the initial op or overlap ops
3565    prior to recovery have been received. */
3566
3567 int dlm_recover_waiters_post(struct dlm_ls *ls)
3568 {
3569         struct dlm_lkb *lkb;
3570         struct dlm_rsb *r;
3571         int error = 0, mstype, err, oc, ou;
3572
3573         while (1) {
3574                 if (dlm_locking_stopped(ls)) {
3575                         log_debug(ls, "recover_waiters_post aborted");
3576                         error = -EINTR;
3577                         break;
3578                 }
3579
3580                 lkb = find_resend_waiter(ls);
3581                 if (!lkb)
3582                         break;
3583
3584                 r = lkb->lkb_resource;
3585                 hold_rsb(r);
3586                 lock_rsb(r);
3587
3588                 mstype = lkb->lkb_wait_type;
3589                 oc = is_overlap_cancel(lkb);
3590                 ou = is_overlap_unlock(lkb);
3591                 err = 0;
3592
3593                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3594                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3595
3596                 /* At this point we assume that we won't get a reply to any
3597                    previous op or overlap op on this lock.  First, do a big
3598                    remove_from_waiters() for all previous ops. */
3599
3600                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3601                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3602                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3603                 lkb->lkb_wait_type = 0;
3604                 lkb->lkb_wait_count = 0;
3605                 mutex_lock(&ls->ls_waiters_mutex);
3606                 list_del_init(&lkb->lkb_wait_reply);
3607                 mutex_unlock(&ls->ls_waiters_mutex);
3608                 unhold_lkb(lkb); /* for waiters list */
3609
3610                 if (oc || ou) {
3611                         /* do an unlock or cancel instead of resending */
3612                         switch (mstype) {
3613                         case DLM_MSG_LOOKUP:
3614                         case DLM_MSG_REQUEST:
3615                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3616                                                         -DLM_ECANCEL);
3617                                 unhold_lkb(lkb); /* undoes create_lkb() */
3618                                 break;
3619                         case DLM_MSG_CONVERT:
3620                                 if (oc) {
3621                                         queue_cast(r, lkb, -DLM_ECANCEL);
3622                                 } else {
3623                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3624                                         _unlock_lock(r, lkb);
3625                                 }
3626                                 break;
3627                         default:
3628                                 err = 1;
3629                         }
3630                 } else {
3631                         switch (mstype) {
3632                         case DLM_MSG_LOOKUP:
3633                         case DLM_MSG_REQUEST:
3634                                 _request_lock(r, lkb);
3635                                 if (is_master(r))
3636                                         confirm_master(r, 0);
3637                                 break;
3638                         case DLM_MSG_CONVERT:
3639                                 _convert_lock(r, lkb);
3640                                 break;
3641                         default:
3642                                 err = 1;
3643                         }
3644                 }
3645
3646                 if (err)
3647                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
3648                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3649                 unlock_rsb(r);
3650                 put_rsb(r);
3651                 dlm_put_lkb(lkb);
3652         }
3653
3654         return error;
3655 }
3656
3657 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3658                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3659 {
3660         struct dlm_ls *ls = r->res_ls;
3661         struct dlm_lkb *lkb, *safe;
3662
3663         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3664                 if (test(ls, lkb)) {
3665                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3666                         del_lkb(r, lkb);
3667                         /* this put should free the lkb */
3668                         if (!dlm_put_lkb(lkb))
3669                                 log_error(ls, "purged lkb not released");
3670                 }
3671         }
3672 }
3673
3674 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3675 {
3676         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3677 }
3678
3679 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3680 {
3681         return is_master_copy(lkb);
3682 }
3683
3684 static void purge_dead_locks(struct dlm_rsb *r)
3685 {
3686         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3687         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3688         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3689 }
3690
3691 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3692 {
3693         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3694         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3695         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3696 }
3697
3698 /* Get rid of locks held by nodes that are gone. */
3699
3700 int dlm_purge_locks(struct dlm_ls *ls)
3701 {
3702         struct dlm_rsb *r;
3703
3704         log_debug(ls, "dlm_purge_locks");
3705
3706         down_write(&ls->ls_root_sem);
3707         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3708                 hold_rsb(r);
3709                 lock_rsb(r);
3710                 if (is_master(r))
3711                         purge_dead_locks(r);
3712                 unlock_rsb(r);
3713                 unhold_rsb(r);
3714
3715                 schedule();
3716         }
3717         up_write(&ls->ls_root_sem);
3718
3719         return 0;
3720 }
3721
3722 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3723 {
3724         struct dlm_rsb *r, *r_ret = NULL;
3725
3726         read_lock(&ls->ls_rsbtbl[bucket].lock);
3727         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3728                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3729                         continue;
3730                 hold_rsb(r);
3731                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3732                 r_ret = r;
3733                 break;
3734         }
3735         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3736         return r_ret;
3737 }
3738
3739 void dlm_grant_after_purge(struct dlm_ls *ls)
3740 {
3741         struct dlm_rsb *r;
3742         int bucket = 0;
3743
3744         while (1) {
3745                 r = find_purged_rsb(ls, bucket);
3746                 if (!r) {
3747                         if (bucket == ls->ls_rsbtbl_size - 1)
3748                                 break;
3749                         bucket++;
3750                         continue;
3751                 }
3752                 lock_rsb(r);
3753                 if (is_master(r)) {
3754                         grant_pending_locks(r);
3755                         confirm_master(r, 0);
3756                 }
3757                 unlock_rsb(r);
3758                 put_rsb(r);
3759                 schedule();
3760         }
3761 }
3762
3763 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3764                                          uint32_t remid)
3765 {
3766         struct dlm_lkb *lkb;
3767
3768         list_for_each_entry(lkb, head, lkb_statequeue) {
3769                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3770                         return lkb;
3771         }
3772         return NULL;
3773 }
3774
3775 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3776                                     uint32_t remid)
3777 {
3778         struct dlm_lkb *lkb;
3779
3780         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3781         if (lkb)
3782                 return lkb;
3783         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3784         if (lkb)
3785                 return lkb;
3786         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3787         if (lkb)
3788                 return lkb;
3789         return NULL;
3790 }
3791
3792 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3793                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3794 {
3795         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3796         int lvblen;
3797
3798         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3799         lkb->lkb_ownpid = rl->rl_ownpid;
3800         lkb->lkb_remid = rl->rl_lkid;
3801         lkb->lkb_exflags = rl->rl_exflags;
3802         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3803         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3804         lkb->lkb_lvbseq = rl->rl_lvbseq;
3805         lkb->lkb_rqmode = rl->rl_rqmode;
3806         lkb->lkb_grmode = rl->rl_grmode;
3807         /* don't set lkb_status because add_lkb wants to itself */
3808
3809         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3810         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3811
3812         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3813                 lkb->lkb_lvbptr = allocate_lvb(ls);
3814                 if (!lkb->lkb_lvbptr)
3815                         return -ENOMEM;
3816                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3817                          sizeof(struct rcom_lock);
3818                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3819         }
3820
3821         /* Conversions between PR and CW (middle modes) need special handling.
3822            The real granted mode of these converting locks cannot be determined
3823            until all locks have been rebuilt on the rsb (recover_conversion) */
3824
3825         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3826                 rl->rl_status = DLM_LKSTS_CONVERT;
3827                 lkb->lkb_grmode = DLM_LOCK_IV;
3828                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3829         }
3830
3831         return 0;
3832 }
3833
3834 /* This lkb may have been recovered in a previous aborted recovery so we need
3835    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3836    If so we just send back a standard reply.  If not, we create a new lkb with
3837    the given values and send back our lkid.  We send back our lkid by sending
3838    back the rcom_lock struct we got but with the remid field filled in. */
3839
3840 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3841 {
3842         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3843         struct dlm_rsb *r;
3844         struct dlm_lkb *lkb;
3845         int error;
3846
3847         if (rl->rl_parent_lkid) {
3848                 error = -EOPNOTSUPP;
3849                 goto out;
3850         }
3851
3852         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3853         if (error)
3854                 goto out;
3855
3856         lock_rsb(r);
3857
3858         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3859         if (lkb) {
3860                 error = -EEXIST;
3861                 goto out_remid;
3862         }
3863
3864         error = create_lkb(ls, &lkb);
3865         if (error)
3866                 goto out_unlock;
3867
3868         error = receive_rcom_lock_args(ls, lkb, r, rc);
3869         if (error) {
3870                 __put_lkb(ls, lkb);
3871                 goto out_unlock;
3872         }
3873
3874         attach_lkb(r, lkb);
3875         add_lkb(r, lkb, rl->rl_status);
3876         error = 0;
3877
3878  out_remid:
3879         /* this is the new value returned to the lock holder for
3880            saving in its process-copy lkb */
3881         rl->rl_remid = lkb->lkb_id;
3882
3883  out_unlock:
3884         unlock_rsb(r);
3885         put_rsb(r);
3886  out:
3887         if (error)
3888                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3889         rl->rl_result = error;
3890         return error;
3891 }
3892
3893 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3894 {
3895         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3896         struct dlm_rsb *r;
3897         struct dlm_lkb *lkb;
3898         int error;
3899
3900         error = find_lkb(ls, rl->rl_lkid, &lkb);
3901         if (error) {
3902                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3903                 return error;
3904         }
3905
3906         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3907
3908         error = rl->rl_result;
3909
3910         r = lkb->lkb_resource;
3911         hold_rsb(r);
3912         lock_rsb(r);
3913
3914         switch (error) {
3915         case -EBADR:
3916                 /* There's a chance the new master received our lock before
3917                    dlm_recover_master_reply(), this wouldn't happen if we did
3918                    a barrier between recover_masters and recover_locks. */
3919                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3920                           (unsigned long)r, r->res_name);
3921                 dlm_send_rcom_lock(r, lkb);
3922                 goto out;
3923         case -EEXIST:
3924                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3925                 /* fall through */
3926         case 0:
3927                 lkb->lkb_remid = rl->rl_remid;
3928                 break;
3929         default:
3930                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3931                           error, lkb->lkb_id);
3932         }
3933
3934         /* an ack for dlm_recover_locks() which waits for replies from
3935            all the locks it sends to new masters */
3936         dlm_recovered_lock(r);
3937  out:
3938         unlock_rsb(r);
3939         put_rsb(r);
3940         dlm_put_lkb(lkb);
3941
3942         return 0;
3943 }
3944
3945 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3946                      int mode, uint32_t flags, void *name, unsigned int namelen,
3947                      uint32_t parent_lkid)
3948 {
3949         struct dlm_lkb *lkb;
3950         struct dlm_args args;
3951         int error;
3952
3953         lock_recovery(ls);
3954
3955         error = create_lkb(ls, &lkb);
3956         if (error) {
3957                 kfree(ua);
3958                 goto out;
3959         }
3960
3961         if (flags & DLM_LKF_VALBLK) {
3962                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3963                 if (!ua->lksb.sb_lvbptr) {
3964                         kfree(ua);
3965                         __put_lkb(ls, lkb);
3966                         error = -ENOMEM;
3967                         goto out;
3968                 }
3969         }
3970
3971         /* After ua is attached to lkb it will be freed by free_lkb().
3972            When DLM_IFL_USER is set, the dlm knows that this is a userspace
3973            lock and that lkb_astparam is the dlm_user_args structure. */
3974
3975         error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3976                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
3977         lkb->lkb_flags |= DLM_IFL_USER;
3978         ua->old_mode = DLM_LOCK_IV;
3979
3980         if (error) {
3981                 __put_lkb(ls, lkb);
3982                 goto out;
3983         }
3984
3985         error = request_lock(ls, lkb, name, namelen, &args);
3986
3987         switch (error) {
3988         case 0:
3989                 break;
3990         case -EINPROGRESS:
3991                 error = 0;
3992                 break;
3993         case -EAGAIN:
3994                 error = 0;
3995                 /* fall through */
3996         default:
3997                 __put_lkb(ls, lkb);
3998                 goto out;
3999         }
4000
4001         /* add this new lkb to the per-process list of locks */
4002         spin_lock(&ua->proc->locks_spin);
4003         hold_lkb(lkb);
4004         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4005         spin_unlock(&ua->proc->locks_spin);
4006  out:
4007         unlock_recovery(ls);
4008         return error;
4009 }
4010
4011 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4012                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4013 {
4014         struct dlm_lkb *lkb;
4015         struct dlm_args args;
4016         struct dlm_user_args *ua;
4017         int error;
4018
4019         lock_recovery(ls);
4020
4021         error = find_lkb(ls, lkid, &lkb);
4022         if (error)
4023                 goto out;
4024
4025         /* user can change the params on its lock when it converts it, or
4026            add an lvb that didn't exist before */
4027
4028         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4029
4030         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4031                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4032                 if (!ua->lksb.sb_lvbptr) {
4033                         error = -ENOMEM;
4034                         goto out_put;
4035                 }
4036         }
4037         if (lvb_in && ua->lksb.sb_lvbptr)
4038                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4039
4040         ua->castparam = ua_tmp->castparam;
4041         ua->castaddr = ua_tmp->castaddr;
4042         ua->bastparam = ua_tmp->bastparam;
4043         ua->bastaddr = ua_tmp->bastaddr;
4044         ua->user_lksb = ua_tmp->user_lksb;
4045         ua->old_mode = lkb->lkb_grmode;
4046
4047         error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4048                               ua, DLM_FAKE_USER_AST, &args);
4049         if (error)
4050                 goto out_put;
4051
4052         error = convert_lock(ls, lkb, &args);
4053
4054         if (error == -EINPROGRESS || error == -EAGAIN)
4055                 error = 0;
4056  out_put:
4057         dlm_put_lkb(lkb);
4058  out:
4059         unlock_recovery(ls);
4060         kfree(ua_tmp);
4061         return error;
4062 }
4063
4064 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4065                     uint32_t flags, uint32_t lkid, char *lvb_in)
4066 {
4067         struct dlm_lkb *lkb;
4068         struct dlm_args args;
4069         struct dlm_user_args *ua;
4070         int error;
4071
4072         lock_recovery(ls);
4073
4074         error = find_lkb(ls, lkid, &lkb);
4075         if (error)
4076                 goto out;
4077
4078         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4079
4080         if (lvb_in && ua->lksb.sb_lvbptr)
4081                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4082         ua->castparam = ua_tmp->castparam;
4083         ua->user_lksb = ua_tmp->user_lksb;
4084
4085         error = set_unlock_args(flags, ua, &args);
4086         if (error)
4087                 goto out_put;
4088
4089         error = unlock_lock(ls, lkb, &args);
4090
4091         if (error == -DLM_EUNLOCK)
4092                 error = 0;
4093         /* from validate_unlock_args() */
4094         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4095                 error = 0;
4096         if (error)
4097                 goto out_put;
4098
4099         spin_lock(&ua->proc->locks_spin);
4100         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4101         if (!list_empty(&lkb->lkb_ownqueue))
4102                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4103         spin_unlock(&ua->proc->locks_spin);
4104  out_put:
4105         dlm_put_lkb(lkb);
4106  out:
4107         unlock_recovery(ls);
4108         kfree(ua_tmp);
4109         return error;
4110 }
4111
4112 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4113                     uint32_t flags, uint32_t lkid)
4114 {
4115         struct dlm_lkb *lkb;
4116         struct dlm_args args;
4117         struct dlm_user_args *ua;
4118         int error;
4119
4120         lock_recovery(ls);
4121
4122         error = find_lkb(ls, lkid, &lkb);
4123         if (error)
4124                 goto out;
4125
4126         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4127         ua->castparam = ua_tmp->castparam;
4128         ua->user_lksb = ua_tmp->user_lksb;
4129
4130         error = set_unlock_args(flags, ua, &args);
4131         if (error)
4132                 goto out_put;
4133
4134         error = cancel_lock(ls, lkb, &args);
4135
4136         if (error == -DLM_ECANCEL)
4137                 error = 0;
4138         /* from validate_unlock_args() */
4139         if (error == -EBUSY)
4140                 error = 0;
4141  out_put:
4142         dlm_put_lkb(lkb);
4143  out:
4144         unlock_recovery(ls);
4145         kfree(ua_tmp);
4146         return error;
4147 }
4148
4149 /* lkb's that are removed from the waiters list by revert are just left on the
4150    orphans list with the granted orphan locks, to be freed by purge */
4151
4152 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4153 {
4154         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4155         struct dlm_args args;
4156         int error;
4157
4158         hold_lkb(lkb);
4159         mutex_lock(&ls->ls_orphans_mutex);
4160         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4161         mutex_unlock(&ls->ls_orphans_mutex);
4162
4163         set_unlock_args(0, ua, &args);
4164
4165         error = cancel_lock(ls, lkb, &args);
4166         if (error == -DLM_ECANCEL)
4167                 error = 0;
4168         return error;
4169 }
4170
4171 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4172    Regardless of what rsb queue the lock is on, it's removed and freed. */
4173
4174 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4175 {
4176         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4177         struct dlm_args args;
4178         int error;
4179
4180         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4181
4182         error = unlock_lock(ls, lkb, &args);
4183         if (error == -DLM_EUNLOCK)
4184                 error = 0;
4185         return error;
4186 }
4187
4188 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4189    (which does lock_rsb) due to deadlock with receiving a message that does
4190    lock_rsb followed by dlm_user_add_ast() */
4191
4192 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4193                                      struct dlm_user_proc *proc)
4194 {
4195         struct dlm_lkb *lkb = NULL;
4196
4197         mutex_lock(&ls->ls_clear_proc_locks);
4198         if (list_empty(&proc->locks))
4199                 goto out;
4200
4201         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4202         list_del_init(&lkb->lkb_ownqueue);
4203
4204         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4205                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4206         else
4207                 lkb->lkb_flags |= DLM_IFL_DEAD;
4208  out:
4209         mutex_unlock(&ls->ls_clear_proc_locks);
4210         return lkb;
4211 }
4212
4213 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4214    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4215    which we clear here. */
4216
4217 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4218    list, and no more device_writes should add lkb's to proc->locks list; so we
4219    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4220    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4221    them ourself. */
4222
4223 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4224 {
4225         struct dlm_lkb *lkb, *safe;
4226
4227         lock_recovery(ls);
4228
4229         while (1) {
4230                 lkb = del_proc_lock(ls, proc);
4231                 if (!lkb)
4232                         break;
4233                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4234                         orphan_proc_lock(ls, lkb);
4235                 else
4236                         unlock_proc_lock(ls, lkb);
4237
4238                 /* this removes the reference for the proc->locks list
4239                    added by dlm_user_request, it may result in the lkb
4240                    being freed */
4241
4242                 dlm_put_lkb(lkb);
4243         }
4244
4245         mutex_lock(&ls->ls_clear_proc_locks);
4246
4247         /* in-progress unlocks */
4248         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4249                 list_del_init(&lkb->lkb_ownqueue);
4250                 lkb->lkb_flags |= DLM_IFL_DEAD;
4251                 dlm_put_lkb(lkb);
4252         }
4253
4254         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4255                 list_del(&lkb->lkb_astqueue);
4256                 dlm_put_lkb(lkb);
4257         }
4258
4259         mutex_unlock(&ls->ls_clear_proc_locks);
4260         unlock_recovery(ls);
4261 }
4262