]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - fs/dlm/lock.c
9d26b3a396718c902b500b8c4f51551b1ef1d6ff
[linux-2.6-omap-h63xx.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86                                     struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
89
90 /*
91  * Lock compatibilty matrix - thanks Steve
92  * UN = Unlocked state. Not really a state, used as a flag
93  * PD = Padding. Used to make the matrix a nice power of two in size
94  * Other states are the same as the VMS DLM.
95  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
96  */
97
98 static const int __dlm_compat_matrix[8][8] = {
99       /* UN NL CR CW PR PW EX PD */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
101         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
102         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
103         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
104         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
105         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
106         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
107         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
108 };
109
110 /*
111  * This defines the direction of transfer of LVB data.
112  * Granted mode is the row; requested mode is the column.
113  * Usage: matrix[grmode+1][rqmode+1]
114  * 1 = LVB is returned to the caller
115  * 0 = LVB is written to the resource
116  * -1 = nothing happens to the LVB
117  */
118
119 const int dlm_lvb_operations[8][8] = {
120         /* UN   NL  CR  CW  PR  PW  EX  PD*/
121         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
122         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
123         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
124         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
125         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
126         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
128         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
129 };
130
131 #define modes_compat(gr, rq) \
132         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134 int dlm_modes_compat(int mode1, int mode2)
135 {
136         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137 }
138
139 /*
140  * Compatibility matrix for conversions with QUECVT set.
141  * Granted mode is the row; requested mode is the column.
142  * Usage: matrix[grmode+1][rqmode+1]
143  */
144
145 static const int __quecvt_compat_matrix[8][8] = {
146       /* UN NL CR CW PR PW EX PD */
147         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
148         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
149         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
150         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
151         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
152         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
153         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
154         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
155 };
156
157 void dlm_print_lkb(struct dlm_lkb *lkb)
158 {
159         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164 }
165
166 void dlm_print_rsb(struct dlm_rsb *r)
167 {
168         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169                r->res_nodeid, r->res_flags, r->res_first_lkid,
170                r->res_recover_locks_count, r->res_name);
171 }
172
173 void dlm_dump_rsb(struct dlm_rsb *r)
174 {
175         struct dlm_lkb *lkb;
176
177         dlm_print_rsb(r);
178
179         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181         printk(KERN_ERR "rsb lookup list\n");
182         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183                 dlm_print_lkb(lkb);
184         printk(KERN_ERR "rsb grant queue:\n");
185         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb convert queue:\n");
188         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb wait queue:\n");
191         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193 }
194
195 /* Threads cannot use the lockspace while it's being recovered */
196
197 static inline void lock_recovery(struct dlm_ls *ls)
198 {
199         down_read(&ls->ls_in_recovery);
200 }
201
202 static inline void unlock_recovery(struct dlm_ls *ls)
203 {
204         up_read(&ls->ls_in_recovery);
205 }
206
207 static inline int lock_recovery_try(struct dlm_ls *ls)
208 {
209         return down_read_trylock(&ls->ls_in_recovery);
210 }
211
212 static inline int can_be_queued(struct dlm_lkb *lkb)
213 {
214         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215 }
216
217 static inline int force_blocking_asts(struct dlm_lkb *lkb)
218 {
219         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220 }
221
222 static inline int is_demoted(struct dlm_lkb *lkb)
223 {
224         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225 }
226
227 static inline int is_remote(struct dlm_rsb *r)
228 {
229         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
230         return !!r->res_nodeid;
231 }
232
233 static inline int is_process_copy(struct dlm_lkb *lkb)
234 {
235         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
236 }
237
238 static inline int is_master_copy(struct dlm_lkb *lkb)
239 {
240         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
241                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
242         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
243 }
244
245 static inline int middle_conversion(struct dlm_lkb *lkb)
246 {
247         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
248             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
249                 return 1;
250         return 0;
251 }
252
253 static inline int down_conversion(struct dlm_lkb *lkb)
254 {
255         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
256 }
257
258 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
259 {
260         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
261 }
262
263 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
264 {
265         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
266 }
267
268 static inline int is_overlap(struct dlm_lkb *lkb)
269 {
270         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
271                                   DLM_IFL_OVERLAP_CANCEL));
272 }
273
274 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
275 {
276         if (is_master_copy(lkb))
277                 return;
278
279         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
280
281         lkb->lkb_lksb->sb_status = rv;
282         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
283
284         dlm_add_ast(lkb, AST_COMP);
285 }
286
287 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
288 {
289         queue_cast(r, lkb,
290                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
291 }
292
293 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
294 {
295         if (is_master_copy(lkb))
296                 send_bast(r, lkb, rqmode);
297         else {
298                 lkb->lkb_bastmode = rqmode;
299                 dlm_add_ast(lkb, AST_BAST);
300         }
301 }
302
303 /*
304  * Basic operations on rsb's and lkb's
305  */
306
307 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
308 {
309         struct dlm_rsb *r;
310
311         r = allocate_rsb(ls, len);
312         if (!r)
313                 return NULL;
314
315         r->res_ls = ls;
316         r->res_length = len;
317         memcpy(r->res_name, name, len);
318         mutex_init(&r->res_mutex);
319
320         INIT_LIST_HEAD(&r->res_lookup);
321         INIT_LIST_HEAD(&r->res_grantqueue);
322         INIT_LIST_HEAD(&r->res_convertqueue);
323         INIT_LIST_HEAD(&r->res_waitqueue);
324         INIT_LIST_HEAD(&r->res_root_list);
325         INIT_LIST_HEAD(&r->res_recover_list);
326
327         return r;
328 }
329
330 static int search_rsb_list(struct list_head *head, char *name, int len,
331                            unsigned int flags, struct dlm_rsb **r_ret)
332 {
333         struct dlm_rsb *r;
334         int error = 0;
335
336         list_for_each_entry(r, head, res_hashchain) {
337                 if (len == r->res_length && !memcmp(name, r->res_name, len))
338                         goto found;
339         }
340         return -EBADR;
341
342  found:
343         if (r->res_nodeid && (flags & R_MASTER))
344                 error = -ENOTBLK;
345         *r_ret = r;
346         return error;
347 }
348
349 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
350                        unsigned int flags, struct dlm_rsb **r_ret)
351 {
352         struct dlm_rsb *r;
353         int error;
354
355         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
356         if (!error) {
357                 kref_get(&r->res_ref);
358                 goto out;
359         }
360         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
361         if (error)
362                 goto out;
363
364         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
365
366         if (dlm_no_directory(ls))
367                 goto out;
368
369         if (r->res_nodeid == -1) {
370                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
371                 r->res_first_lkid = 0;
372         } else if (r->res_nodeid > 0) {
373                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
374                 r->res_first_lkid = 0;
375         } else {
376                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
377                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
378         }
379  out:
380         *r_ret = r;
381         return error;
382 }
383
384 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
385                       unsigned int flags, struct dlm_rsb **r_ret)
386 {
387         int error;
388         write_lock(&ls->ls_rsbtbl[b].lock);
389         error = _search_rsb(ls, name, len, b, flags, r_ret);
390         write_unlock(&ls->ls_rsbtbl[b].lock);
391         return error;
392 }
393
394 /*
395  * Find rsb in rsbtbl and potentially create/add one
396  *
397  * Delaying the release of rsb's has a similar benefit to applications keeping
398  * NL locks on an rsb, but without the guarantee that the cached master value
399  * will still be valid when the rsb is reused.  Apps aren't always smart enough
400  * to keep NL locks on an rsb that they may lock again shortly; this can lead
401  * to excessive master lookups and removals if we don't delay the release.
402  *
403  * Searching for an rsb means looking through both the normal list and toss
404  * list.  When found on the toss list the rsb is moved to the normal list with
405  * ref count of 1; when found on normal list the ref count is incremented.
406  */
407
408 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
409                     unsigned int flags, struct dlm_rsb **r_ret)
410 {
411         struct dlm_rsb *r, *tmp;
412         uint32_t hash, bucket;
413         int error = 0;
414
415         if (dlm_no_directory(ls))
416                 flags |= R_CREATE;
417
418         hash = jhash(name, namelen, 0);
419         bucket = hash & (ls->ls_rsbtbl_size - 1);
420
421         error = search_rsb(ls, name, namelen, bucket, flags, &r);
422         if (!error)
423                 goto out;
424
425         if (error == -EBADR && !(flags & R_CREATE))
426                 goto out;
427
428         /* the rsb was found but wasn't a master copy */
429         if (error == -ENOTBLK)
430                 goto out;
431
432         error = -ENOMEM;
433         r = create_rsb(ls, name, namelen);
434         if (!r)
435                 goto out;
436
437         r->res_hash = hash;
438         r->res_bucket = bucket;
439         r->res_nodeid = -1;
440         kref_init(&r->res_ref);
441
442         /* With no directory, the master can be set immediately */
443         if (dlm_no_directory(ls)) {
444                 int nodeid = dlm_dir_nodeid(r);
445                 if (nodeid == dlm_our_nodeid())
446                         nodeid = 0;
447                 r->res_nodeid = nodeid;
448         }
449
450         write_lock(&ls->ls_rsbtbl[bucket].lock);
451         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
452         if (!error) {
453                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
454                 free_rsb(r);
455                 r = tmp;
456                 goto out;
457         }
458         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
459         write_unlock(&ls->ls_rsbtbl[bucket].lock);
460         error = 0;
461  out:
462         *r_ret = r;
463         return error;
464 }
465
466 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
467                  unsigned int flags, struct dlm_rsb **r_ret)
468 {
469         return find_rsb(ls, name, namelen, flags, r_ret);
470 }
471
472 /* This is only called to add a reference when the code already holds
473    a valid reference to the rsb, so there's no need for locking. */
474
475 static inline void hold_rsb(struct dlm_rsb *r)
476 {
477         kref_get(&r->res_ref);
478 }
479
480 void dlm_hold_rsb(struct dlm_rsb *r)
481 {
482         hold_rsb(r);
483 }
484
485 static void toss_rsb(struct kref *kref)
486 {
487         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
488         struct dlm_ls *ls = r->res_ls;
489
490         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
491         kref_init(&r->res_ref);
492         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
493         r->res_toss_time = jiffies;
494         if (r->res_lvbptr) {
495                 free_lvb(r->res_lvbptr);
496                 r->res_lvbptr = NULL;
497         }
498 }
499
500 /* When all references to the rsb are gone it's transfered to
501    the tossed list for later disposal. */
502
503 static void put_rsb(struct dlm_rsb *r)
504 {
505         struct dlm_ls *ls = r->res_ls;
506         uint32_t bucket = r->res_bucket;
507
508         write_lock(&ls->ls_rsbtbl[bucket].lock);
509         kref_put(&r->res_ref, toss_rsb);
510         write_unlock(&ls->ls_rsbtbl[bucket].lock);
511 }
512
513 void dlm_put_rsb(struct dlm_rsb *r)
514 {
515         put_rsb(r);
516 }
517
518 /* See comment for unhold_lkb */
519
520 static void unhold_rsb(struct dlm_rsb *r)
521 {
522         int rv;
523         rv = kref_put(&r->res_ref, toss_rsb);
524         DLM_ASSERT(!rv, dlm_dump_rsb(r););
525 }
526
527 static void kill_rsb(struct kref *kref)
528 {
529         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
530
531         /* All work is done after the return from kref_put() so we
532            can release the write_lock before the remove and free. */
533
534         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
535         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
536         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
537         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
538         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
539         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
540 }
541
542 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
543    The rsb must exist as long as any lkb's for it do. */
544
545 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
546 {
547         hold_rsb(r);
548         lkb->lkb_resource = r;
549 }
550
551 static void detach_lkb(struct dlm_lkb *lkb)
552 {
553         if (lkb->lkb_resource) {
554                 put_rsb(lkb->lkb_resource);
555                 lkb->lkb_resource = NULL;
556         }
557 }
558
559 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
560 {
561         struct dlm_lkb *lkb, *tmp;
562         uint32_t lkid = 0;
563         uint16_t bucket;
564
565         lkb = allocate_lkb(ls);
566         if (!lkb)
567                 return -ENOMEM;
568
569         lkb->lkb_nodeid = -1;
570         lkb->lkb_grmode = DLM_LOCK_IV;
571         kref_init(&lkb->lkb_ref);
572         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
573         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
574
575         get_random_bytes(&bucket, sizeof(bucket));
576         bucket &= (ls->ls_lkbtbl_size - 1);
577
578         write_lock(&ls->ls_lkbtbl[bucket].lock);
579
580         /* counter can roll over so we must verify lkid is not in use */
581
582         while (lkid == 0) {
583                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
584
585                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
586                                     lkb_idtbl_list) {
587                         if (tmp->lkb_id != lkid)
588                                 continue;
589                         lkid = 0;
590                         break;
591                 }
592         }
593
594         lkb->lkb_id = lkid;
595         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
596         write_unlock(&ls->ls_lkbtbl[bucket].lock);
597
598         *lkb_ret = lkb;
599         return 0;
600 }
601
602 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
603 {
604         uint16_t bucket = lkid & 0xFFFF;
605         struct dlm_lkb *lkb;
606
607         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
608                 if (lkb->lkb_id == lkid)
609                         return lkb;
610         }
611         return NULL;
612 }
613
614 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
615 {
616         struct dlm_lkb *lkb;
617         uint16_t bucket = lkid & 0xFFFF;
618
619         if (bucket >= ls->ls_lkbtbl_size)
620                 return -EBADSLT;
621
622         read_lock(&ls->ls_lkbtbl[bucket].lock);
623         lkb = __find_lkb(ls, lkid);
624         if (lkb)
625                 kref_get(&lkb->lkb_ref);
626         read_unlock(&ls->ls_lkbtbl[bucket].lock);
627
628         *lkb_ret = lkb;
629         return lkb ? 0 : -ENOENT;
630 }
631
632 static void kill_lkb(struct kref *kref)
633 {
634         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
635
636         /* All work is done after the return from kref_put() so we
637            can release the write_lock before the detach_lkb */
638
639         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
640 }
641
642 /* __put_lkb() is used when an lkb may not have an rsb attached to
643    it so we need to provide the lockspace explicitly */
644
645 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
646 {
647         uint16_t bucket = lkb->lkb_id & 0xFFFF;
648
649         write_lock(&ls->ls_lkbtbl[bucket].lock);
650         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
651                 list_del(&lkb->lkb_idtbl_list);
652                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654                 detach_lkb(lkb);
655
656                 /* for local/process lkbs, lvbptr points to caller's lksb */
657                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
658                         free_lvb(lkb->lkb_lvbptr);
659                 free_lkb(lkb);
660                 return 1;
661         } else {
662                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
663                 return 0;
664         }
665 }
666
667 int dlm_put_lkb(struct dlm_lkb *lkb)
668 {
669         struct dlm_ls *ls;
670
671         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
672         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
673
674         ls = lkb->lkb_resource->res_ls;
675         return __put_lkb(ls, lkb);
676 }
677
678 /* This is only called to add a reference when the code already holds
679    a valid reference to the lkb, so there's no need for locking. */
680
681 static inline void hold_lkb(struct dlm_lkb *lkb)
682 {
683         kref_get(&lkb->lkb_ref);
684 }
685
686 /* This is called when we need to remove a reference and are certain
687    it's not the last ref.  e.g. del_lkb is always called between a
688    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
689    put_lkb would work fine, but would involve unnecessary locking */
690
691 static inline void unhold_lkb(struct dlm_lkb *lkb)
692 {
693         int rv;
694         rv = kref_put(&lkb->lkb_ref, kill_lkb);
695         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
696 }
697
698 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
699                             int mode)
700 {
701         struct dlm_lkb *lkb = NULL;
702
703         list_for_each_entry(lkb, head, lkb_statequeue)
704                 if (lkb->lkb_rqmode < mode)
705                         break;
706
707         if (!lkb)
708                 list_add_tail(new, head);
709         else
710                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
711 }
712
713 /* add/remove lkb to rsb's grant/convert/wait queue */
714
715 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
716 {
717         kref_get(&lkb->lkb_ref);
718
719         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
720
721         lkb->lkb_status = status;
722
723         switch (status) {
724         case DLM_LKSTS_WAITING:
725                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
726                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
727                 else
728                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
729                 break;
730         case DLM_LKSTS_GRANTED:
731                 /* convention says granted locks kept in order of grmode */
732                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
733                                 lkb->lkb_grmode);
734                 break;
735         case DLM_LKSTS_CONVERT:
736                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
737                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
738                 else
739                         list_add_tail(&lkb->lkb_statequeue,
740                                       &r->res_convertqueue);
741                 break;
742         default:
743                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
744         }
745 }
746
747 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
748 {
749         lkb->lkb_status = 0;
750         list_del(&lkb->lkb_statequeue);
751         unhold_lkb(lkb);
752 }
753
754 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
755 {
756         hold_lkb(lkb);
757         del_lkb(r, lkb);
758         add_lkb(r, lkb, sts);
759         unhold_lkb(lkb);
760 }
761
762 static int msg_reply_type(int mstype)
763 {
764         switch (mstype) {
765         case DLM_MSG_REQUEST:
766                 return DLM_MSG_REQUEST_REPLY;
767         case DLM_MSG_CONVERT:
768                 return DLM_MSG_CONVERT_REPLY;
769         case DLM_MSG_UNLOCK:
770                 return DLM_MSG_UNLOCK_REPLY;
771         case DLM_MSG_CANCEL:
772                 return DLM_MSG_CANCEL_REPLY;
773         case DLM_MSG_LOOKUP:
774                 return DLM_MSG_LOOKUP_REPLY;
775         }
776         return -1;
777 }
778
779 /* add/remove lkb from global waiters list of lkb's waiting for
780    a reply from a remote node */
781
782 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
783 {
784         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
785         int error = 0;
786
787         mutex_lock(&ls->ls_waiters_mutex);
788
789         if (is_overlap_unlock(lkb) ||
790             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
791                 error = -EINVAL;
792                 goto out;
793         }
794
795         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
796                 switch (mstype) {
797                 case DLM_MSG_UNLOCK:
798                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
799                         break;
800                 case DLM_MSG_CANCEL:
801                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
802                         break;
803                 default:
804                         error = -EBUSY;
805                         goto out;
806                 }
807                 lkb->lkb_wait_count++;
808                 hold_lkb(lkb);
809
810                 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
811                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
812                           lkb->lkb_wait_count, lkb->lkb_flags);
813                 goto out;
814         }
815
816         DLM_ASSERT(!lkb->lkb_wait_count,
817                    dlm_print_lkb(lkb);
818                    printk("wait_count %d\n", lkb->lkb_wait_count););
819
820         lkb->lkb_wait_count++;
821         lkb->lkb_wait_type = mstype;
822         hold_lkb(lkb);
823         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
824  out:
825         if (error)
826                 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
827                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
828                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
829         mutex_unlock(&ls->ls_waiters_mutex);
830         return error;
831 }
832
833 /* We clear the RESEND flag because we might be taking an lkb off the waiters
834    list as part of process_requestqueue (e.g. a lookup that has an optimized
835    request reply on the requestqueue) between dlm_recover_waiters_pre() which
836    set RESEND and dlm_recover_waiters_post() */
837
838 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
839 {
840         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
841         int overlap_done = 0;
842
843         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
844                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
845                 overlap_done = 1;
846                 goto out_del;
847         }
848
849         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
850                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
851                 overlap_done = 1;
852                 goto out_del;
853         }
854
855         /* N.B. type of reply may not always correspond to type of original
856            msg due to lookup->request optimization, verify others? */
857
858         if (lkb->lkb_wait_type) {
859                 lkb->lkb_wait_type = 0;
860                 goto out_del;
861         }
862
863         log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
864                   lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
865         return -1;
866
867  out_del:
868         /* the force-unlock/cancel has completed and we haven't recvd a reply
869            to the op that was in progress prior to the unlock/cancel; we
870            give up on any reply to the earlier op.  FIXME: not sure when/how
871            this would happen */
872
873         if (overlap_done && lkb->lkb_wait_type) {
874                 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
875                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
876                 lkb->lkb_wait_count--;
877                 lkb->lkb_wait_type = 0;
878         }
879
880         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
881
882         lkb->lkb_flags &= ~DLM_IFL_RESEND;
883         lkb->lkb_wait_count--;
884         if (!lkb->lkb_wait_count)
885                 list_del_init(&lkb->lkb_wait_reply);
886         unhold_lkb(lkb);
887         return 0;
888 }
889
890 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
891 {
892         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
893         int error;
894
895         mutex_lock(&ls->ls_waiters_mutex);
896         error = _remove_from_waiters(lkb, mstype);
897         mutex_unlock(&ls->ls_waiters_mutex);
898         return error;
899 }
900
901 /* Handles situations where we might be processing a "fake" or "stub" reply in
902    which we can't try to take waiters_mutex again. */
903
904 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
905 {
906         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
907         int error;
908
909         if (ms != &ls->ls_stub_ms)
910                 mutex_lock(&ls->ls_waiters_mutex);
911         error = _remove_from_waiters(lkb, ms->m_type);
912         if (ms != &ls->ls_stub_ms)
913                 mutex_unlock(&ls->ls_waiters_mutex);
914         return error;
915 }
916
917 static void dir_remove(struct dlm_rsb *r)
918 {
919         int to_nodeid;
920
921         if (dlm_no_directory(r->res_ls))
922                 return;
923
924         to_nodeid = dlm_dir_nodeid(r);
925         if (to_nodeid != dlm_our_nodeid())
926                 send_remove(r);
927         else
928                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
929                                      r->res_name, r->res_length);
930 }
931
932 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
933    found since they are in order of newest to oldest? */
934
935 static int shrink_bucket(struct dlm_ls *ls, int b)
936 {
937         struct dlm_rsb *r;
938         int count = 0, found;
939
940         for (;;) {
941                 found = 0;
942                 write_lock(&ls->ls_rsbtbl[b].lock);
943                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
944                                             res_hashchain) {
945                         if (!time_after_eq(jiffies, r->res_toss_time +
946                                            dlm_config.ci_toss_secs * HZ))
947                                 continue;
948                         found = 1;
949                         break;
950                 }
951
952                 if (!found) {
953                         write_unlock(&ls->ls_rsbtbl[b].lock);
954                         break;
955                 }
956
957                 if (kref_put(&r->res_ref, kill_rsb)) {
958                         list_del(&r->res_hashchain);
959                         write_unlock(&ls->ls_rsbtbl[b].lock);
960
961                         if (is_master(r))
962                                 dir_remove(r);
963                         free_rsb(r);
964                         count++;
965                 } else {
966                         write_unlock(&ls->ls_rsbtbl[b].lock);
967                         log_error(ls, "tossed rsb in use %s", r->res_name);
968                 }
969         }
970
971         return count;
972 }
973
974 void dlm_scan_rsbs(struct dlm_ls *ls)
975 {
976         int i;
977
978         if (dlm_locking_stopped(ls))
979                 return;
980
981         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
982                 shrink_bucket(ls, i);
983                 cond_resched();
984         }
985 }
986
987 /* lkb is master or local copy */
988
989 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
990 {
991         int b, len = r->res_ls->ls_lvblen;
992
993         /* b=1 lvb returned to caller
994            b=0 lvb written to rsb or invalidated
995            b=-1 do nothing */
996
997         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
998
999         if (b == 1) {
1000                 if (!lkb->lkb_lvbptr)
1001                         return;
1002
1003                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1004                         return;
1005
1006                 if (!r->res_lvbptr)
1007                         return;
1008
1009                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1010                 lkb->lkb_lvbseq = r->res_lvbseq;
1011
1012         } else if (b == 0) {
1013                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1014                         rsb_set_flag(r, RSB_VALNOTVALID);
1015                         return;
1016                 }
1017
1018                 if (!lkb->lkb_lvbptr)
1019                         return;
1020
1021                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1022                         return;
1023
1024                 if (!r->res_lvbptr)
1025                         r->res_lvbptr = allocate_lvb(r->res_ls);
1026
1027                 if (!r->res_lvbptr)
1028                         return;
1029
1030                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1031                 r->res_lvbseq++;
1032                 lkb->lkb_lvbseq = r->res_lvbseq;
1033                 rsb_clear_flag(r, RSB_VALNOTVALID);
1034         }
1035
1036         if (rsb_flag(r, RSB_VALNOTVALID))
1037                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1038 }
1039
1040 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1041 {
1042         if (lkb->lkb_grmode < DLM_LOCK_PW)
1043                 return;
1044
1045         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1046                 rsb_set_flag(r, RSB_VALNOTVALID);
1047                 return;
1048         }
1049
1050         if (!lkb->lkb_lvbptr)
1051                 return;
1052
1053         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1054                 return;
1055
1056         if (!r->res_lvbptr)
1057                 r->res_lvbptr = allocate_lvb(r->res_ls);
1058
1059         if (!r->res_lvbptr)
1060                 return;
1061
1062         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1063         r->res_lvbseq++;
1064         rsb_clear_flag(r, RSB_VALNOTVALID);
1065 }
1066
1067 /* lkb is process copy (pc) */
1068
1069 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1070                             struct dlm_message *ms)
1071 {
1072         int b;
1073
1074         if (!lkb->lkb_lvbptr)
1075                 return;
1076
1077         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1078                 return;
1079
1080         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1081         if (b == 1) {
1082                 int len = receive_extralen(ms);
1083                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1084                 lkb->lkb_lvbseq = ms->m_lvbseq;
1085         }
1086 }
1087
1088 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1089    remove_lock -- used for unlock, removes lkb from granted
1090    revert_lock -- used for cancel, moves lkb from convert to granted
1091    grant_lock  -- used for request and convert, adds lkb to granted or
1092                   moves lkb from convert or waiting to granted
1093
1094    Each of these is used for master or local copy lkb's.  There is
1095    also a _pc() variation used to make the corresponding change on
1096    a process copy (pc) lkb. */
1097
1098 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1099 {
1100         del_lkb(r, lkb);
1101         lkb->lkb_grmode = DLM_LOCK_IV;
1102         /* this unhold undoes the original ref from create_lkb()
1103            so this leads to the lkb being freed */
1104         unhold_lkb(lkb);
1105 }
1106
1107 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1108 {
1109         set_lvb_unlock(r, lkb);
1110         _remove_lock(r, lkb);
1111 }
1112
1113 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1114 {
1115         _remove_lock(r, lkb);
1116 }
1117
1118 /* returns: 0 did nothing
1119             1 moved lock to granted
1120            -1 removed lock */
1121
1122 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1123 {
1124         int rv = 0;
1125
1126         lkb->lkb_rqmode = DLM_LOCK_IV;
1127
1128         switch (lkb->lkb_status) {
1129         case DLM_LKSTS_GRANTED:
1130                 break;
1131         case DLM_LKSTS_CONVERT:
1132                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1133                 rv = 1;
1134                 break;
1135         case DLM_LKSTS_WAITING:
1136                 del_lkb(r, lkb);
1137                 lkb->lkb_grmode = DLM_LOCK_IV;
1138                 /* this unhold undoes the original ref from create_lkb()
1139                    so this leads to the lkb being freed */
1140                 unhold_lkb(lkb);
1141                 rv = -1;
1142                 break;
1143         default:
1144                 log_print("invalid status for revert %d", lkb->lkb_status);
1145         }
1146         return rv;
1147 }
1148
1149 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1150 {
1151         return revert_lock(r, lkb);
1152 }
1153
1154 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1155 {
1156         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1157                 lkb->lkb_grmode = lkb->lkb_rqmode;
1158                 if (lkb->lkb_status)
1159                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1160                 else
1161                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1162         }
1163
1164         lkb->lkb_rqmode = DLM_LOCK_IV;
1165 }
1166
1167 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1168 {
1169         set_lvb_lock(r, lkb);
1170         _grant_lock(r, lkb);
1171         lkb->lkb_highbast = 0;
1172 }
1173
1174 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1175                           struct dlm_message *ms)
1176 {
1177         set_lvb_lock_pc(r, lkb, ms);
1178         _grant_lock(r, lkb);
1179 }
1180
1181 /* called by grant_pending_locks() which means an async grant message must
1182    be sent to the requesting node in addition to granting the lock if the
1183    lkb belongs to a remote node. */
1184
1185 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1186 {
1187         grant_lock(r, lkb);
1188         if (is_master_copy(lkb))
1189                 send_grant(r, lkb);
1190         else
1191                 queue_cast(r, lkb, 0);
1192 }
1193
1194 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1195 {
1196         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1197                                            lkb_statequeue);
1198         if (lkb->lkb_id == first->lkb_id)
1199                 return 1;
1200
1201         return 0;
1202 }
1203
1204 /* Check if the given lkb conflicts with another lkb on the queue. */
1205
1206 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1207 {
1208         struct dlm_lkb *this;
1209
1210         list_for_each_entry(this, head, lkb_statequeue) {
1211                 if (this == lkb)
1212                         continue;
1213                 if (!modes_compat(this, lkb))
1214                         return 1;
1215         }
1216         return 0;
1217 }
1218
1219 /*
1220  * "A conversion deadlock arises with a pair of lock requests in the converting
1221  * queue for one resource.  The granted mode of each lock blocks the requested
1222  * mode of the other lock."
1223  *
1224  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1225  * convert queue from being granted, then demote lkb (set grmode to NL).
1226  * This second form requires that we check for conv-deadlk even when
1227  * now == 0 in _can_be_granted().
1228  *
1229  * Example:
1230  * Granted Queue: empty
1231  * Convert Queue: NL->EX (first lock)
1232  *                PR->EX (second lock)
1233  *
1234  * The first lock can't be granted because of the granted mode of the second
1235  * lock and the second lock can't be granted because it's not first in the
1236  * list.  We demote the granted mode of the second lock (the lkb passed to this
1237  * function).
1238  *
1239  * After the resolution, the "grant pending" function needs to go back and try
1240  * to grant locks on the convert queue again since the first lock can now be
1241  * granted.
1242  */
1243
1244 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1245 {
1246         struct dlm_lkb *this, *first = NULL, *self = NULL;
1247
1248         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1249                 if (!first)
1250                         first = this;
1251                 if (this == lkb) {
1252                         self = lkb;
1253                         continue;
1254                 }
1255
1256                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1257                         return 1;
1258         }
1259
1260         /* if lkb is on the convert queue and is preventing the first
1261            from being granted, then there's deadlock and we demote lkb.
1262            multiple converting locks may need to do this before the first
1263            converting lock can be granted. */
1264
1265         if (self && self != first) {
1266                 if (!modes_compat(lkb, first) &&
1267                     !queue_conflict(&rsb->res_grantqueue, first))
1268                         return 1;
1269         }
1270
1271         return 0;
1272 }
1273
1274 /*
1275  * Return 1 if the lock can be granted, 0 otherwise.
1276  * Also detect and resolve conversion deadlocks.
1277  *
1278  * lkb is the lock to be granted
1279  *
1280  * now is 1 if the function is being called in the context of the
1281  * immediate request, it is 0 if called later, after the lock has been
1282  * queued.
1283  *
1284  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1285  */
1286
1287 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1288 {
1289         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1290
1291         /*
1292          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1293          * a new request for a NL mode lock being blocked.
1294          *
1295          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1296          * request, then it would be granted.  In essence, the use of this flag
1297          * tells the Lock Manager to expedite theis request by not considering
1298          * what may be in the CONVERTING or WAITING queues...  As of this
1299          * writing, the EXPEDITE flag can be used only with new requests for NL
1300          * mode locks.  This flag is not valid for conversion requests.
1301          *
1302          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1303          * conversion or used with a non-NL requested mode.  We also know an
1304          * EXPEDITE request is always granted immediately, so now must always
1305          * be 1.  The full condition to grant an expedite request: (now &&
1306          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1307          * therefore be shortened to just checking the flag.
1308          */
1309
1310         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1311                 return 1;
1312
1313         /*
1314          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1315          * added to the remaining conditions.
1316          */
1317
1318         if (queue_conflict(&r->res_grantqueue, lkb))
1319                 goto out;
1320
1321         /*
1322          * 6-3: By default, a conversion request is immediately granted if the
1323          * requested mode is compatible with the modes of all other granted
1324          * locks
1325          */
1326
1327         if (queue_conflict(&r->res_convertqueue, lkb))
1328                 goto out;
1329
1330         /*
1331          * 6-5: But the default algorithm for deciding whether to grant or
1332          * queue conversion requests does not by itself guarantee that such
1333          * requests are serviced on a "first come first serve" basis.  This, in
1334          * turn, can lead to a phenomenon known as "indefinate postponement".
1335          *
1336          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1337          * the system service employed to request a lock conversion.  This flag
1338          * forces certain conversion requests to be queued, even if they are
1339          * compatible with the granted modes of other locks on the same
1340          * resource.  Thus, the use of this flag results in conversion requests
1341          * being ordered on a "first come first servce" basis.
1342          *
1343          * DCT: This condition is all about new conversions being able to occur
1344          * "in place" while the lock remains on the granted queue (assuming
1345          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1346          * doesn't _have_ to go onto the convert queue where it's processed in
1347          * order.  The "now" variable is necessary to distinguish converts
1348          * being received and processed for the first time now, because once a
1349          * convert is moved to the conversion queue the condition below applies
1350          * requiring fifo granting.
1351          */
1352
1353         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1354                 return 1;
1355
1356         /*
1357          * The NOORDER flag is set to avoid the standard vms rules on grant
1358          * order.
1359          */
1360
1361         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1362                 return 1;
1363
1364         /*
1365          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1366          * granted until all other conversion requests ahead of it are granted
1367          * and/or canceled.
1368          */
1369
1370         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1371                 return 1;
1372
1373         /*
1374          * 6-4: By default, a new request is immediately granted only if all
1375          * three of the following conditions are satisfied when the request is
1376          * issued:
1377          * - The queue of ungranted conversion requests for the resource is
1378          *   empty.
1379          * - The queue of ungranted new requests for the resource is empty.
1380          * - The mode of the new request is compatible with the most
1381          *   restrictive mode of all granted locks on the resource.
1382          */
1383
1384         if (now && !conv && list_empty(&r->res_convertqueue) &&
1385             list_empty(&r->res_waitqueue))
1386                 return 1;
1387
1388         /*
1389          * 6-4: Once a lock request is in the queue of ungranted new requests,
1390          * it cannot be granted until the queue of ungranted conversion
1391          * requests is empty, all ungranted new requests ahead of it are
1392          * granted and/or canceled, and it is compatible with the granted mode
1393          * of the most restrictive lock granted on the resource.
1394          */
1395
1396         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1397             first_in_list(lkb, &r->res_waitqueue))
1398                 return 1;
1399
1400  out:
1401         /*
1402          * The following, enabled by CONVDEADLK, departs from VMS.
1403          */
1404
1405         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1406             conversion_deadlock_detect(r, lkb)) {
1407                 lkb->lkb_grmode = DLM_LOCK_NL;
1408                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1409         }
1410
1411         return 0;
1412 }
1413
1414 /*
1415  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1416  * simple way to provide a big optimization to applications that can use them.
1417  */
1418
1419 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1420 {
1421         uint32_t flags = lkb->lkb_exflags;
1422         int rv;
1423         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1424
1425         rv = _can_be_granted(r, lkb, now);
1426         if (rv)
1427                 goto out;
1428
1429         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1430                 goto out;
1431
1432         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1433                 alt = DLM_LOCK_PR;
1434         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1435                 alt = DLM_LOCK_CW;
1436
1437         if (alt) {
1438                 lkb->lkb_rqmode = alt;
1439                 rv = _can_be_granted(r, lkb, now);
1440                 if (rv)
1441                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1442                 else
1443                         lkb->lkb_rqmode = rqmode;
1444         }
1445  out:
1446         return rv;
1447 }
1448
1449 static int grant_pending_convert(struct dlm_rsb *r, int high)
1450 {
1451         struct dlm_lkb *lkb, *s;
1452         int hi, demoted, quit, grant_restart, demote_restart;
1453
1454         quit = 0;
1455  restart:
1456         grant_restart = 0;
1457         demote_restart = 0;
1458         hi = DLM_LOCK_IV;
1459
1460         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1461                 demoted = is_demoted(lkb);
1462                 if (can_be_granted(r, lkb, 0)) {
1463                         grant_lock_pending(r, lkb);
1464                         grant_restart = 1;
1465                 } else {
1466                         hi = max_t(int, lkb->lkb_rqmode, hi);
1467                         if (!demoted && is_demoted(lkb))
1468                                 demote_restart = 1;
1469                 }
1470         }
1471
1472         if (grant_restart)
1473                 goto restart;
1474         if (demote_restart && !quit) {
1475                 quit = 1;
1476                 goto restart;
1477         }
1478
1479         return max_t(int, high, hi);
1480 }
1481
1482 static int grant_pending_wait(struct dlm_rsb *r, int high)
1483 {
1484         struct dlm_lkb *lkb, *s;
1485
1486         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1487                 if (can_be_granted(r, lkb, 0))
1488                         grant_lock_pending(r, lkb);
1489                 else
1490                         high = max_t(int, lkb->lkb_rqmode, high);
1491         }
1492
1493         return high;
1494 }
1495
1496 static void grant_pending_locks(struct dlm_rsb *r)
1497 {
1498         struct dlm_lkb *lkb, *s;
1499         int high = DLM_LOCK_IV;
1500
1501         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1502
1503         high = grant_pending_convert(r, high);
1504         high = grant_pending_wait(r, high);
1505
1506         if (high == DLM_LOCK_IV)
1507                 return;
1508
1509         /*
1510          * If there are locks left on the wait/convert queue then send blocking
1511          * ASTs to granted locks based on the largest requested mode (high)
1512          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1513          */
1514
1515         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1516                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1517                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1518                         queue_bast(r, lkb, high);
1519                         lkb->lkb_highbast = high;
1520                 }
1521         }
1522 }
1523
1524 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1525                             struct dlm_lkb *lkb)
1526 {
1527         struct dlm_lkb *gr;
1528
1529         list_for_each_entry(gr, head, lkb_statequeue) {
1530                 if (gr->lkb_bastaddr &&
1531                     gr->lkb_highbast < lkb->lkb_rqmode &&
1532                     !modes_compat(gr, lkb)) {
1533                         queue_bast(r, gr, lkb->lkb_rqmode);
1534                         gr->lkb_highbast = lkb->lkb_rqmode;
1535                 }
1536         }
1537 }
1538
1539 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1540 {
1541         send_bast_queue(r, &r->res_grantqueue, lkb);
1542 }
1543
1544 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1545 {
1546         send_bast_queue(r, &r->res_grantqueue, lkb);
1547         send_bast_queue(r, &r->res_convertqueue, lkb);
1548 }
1549
1550 /* set_master(r, lkb) -- set the master nodeid of a resource
1551
1552    The purpose of this function is to set the nodeid field in the given
1553    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1554    known, it can just be copied to the lkb and the function will return
1555    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1556    before it can be copied to the lkb.
1557
1558    When the rsb nodeid is being looked up remotely, the initial lkb
1559    causing the lookup is kept on the ls_waiters list waiting for the
1560    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1561    on the rsb's res_lookup list until the master is verified.
1562
1563    Return values:
1564    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1565    1: the rsb master is not available and the lkb has been placed on
1566       a wait queue
1567 */
1568
1569 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1570 {
1571         struct dlm_ls *ls = r->res_ls;
1572         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1573
1574         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1575                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1576                 r->res_first_lkid = lkb->lkb_id;
1577                 lkb->lkb_nodeid = r->res_nodeid;
1578                 return 0;
1579         }
1580
1581         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1582                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1583                 return 1;
1584         }
1585
1586         if (r->res_nodeid == 0) {
1587                 lkb->lkb_nodeid = 0;
1588                 return 0;
1589         }
1590
1591         if (r->res_nodeid > 0) {
1592                 lkb->lkb_nodeid = r->res_nodeid;
1593                 return 0;
1594         }
1595
1596         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1597
1598         dir_nodeid = dlm_dir_nodeid(r);
1599
1600         if (dir_nodeid != our_nodeid) {
1601                 r->res_first_lkid = lkb->lkb_id;
1602                 send_lookup(r, lkb);
1603                 return 1;
1604         }
1605
1606         for (;;) {
1607                 /* It's possible for dlm_scand to remove an old rsb for
1608                    this same resource from the toss list, us to create
1609                    a new one, look up the master locally, and find it
1610                    already exists just before dlm_scand does the
1611                    dir_remove() on the previous rsb. */
1612
1613                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1614                                        r->res_length, &ret_nodeid);
1615                 if (!error)
1616                         break;
1617                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1618                 schedule();
1619         }
1620
1621         if (ret_nodeid == our_nodeid) {
1622                 r->res_first_lkid = 0;
1623                 r->res_nodeid = 0;
1624                 lkb->lkb_nodeid = 0;
1625         } else {
1626                 r->res_first_lkid = lkb->lkb_id;
1627                 r->res_nodeid = ret_nodeid;
1628                 lkb->lkb_nodeid = ret_nodeid;
1629         }
1630         return 0;
1631 }
1632
1633 static void process_lookup_list(struct dlm_rsb *r)
1634 {
1635         struct dlm_lkb *lkb, *safe;
1636
1637         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1638                 list_del_init(&lkb->lkb_rsb_lookup);
1639                 _request_lock(r, lkb);
1640                 schedule();
1641         }
1642 }
1643
1644 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1645
1646 static void confirm_master(struct dlm_rsb *r, int error)
1647 {
1648         struct dlm_lkb *lkb;
1649
1650         if (!r->res_first_lkid)
1651                 return;
1652
1653         switch (error) {
1654         case 0:
1655         case -EINPROGRESS:
1656                 r->res_first_lkid = 0;
1657                 process_lookup_list(r);
1658                 break;
1659
1660         case -EAGAIN:
1661                 /* the remote master didn't queue our NOQUEUE request;
1662                    make a waiting lkb the first_lkid */
1663
1664                 r->res_first_lkid = 0;
1665
1666                 if (!list_empty(&r->res_lookup)) {
1667                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1668                                          lkb_rsb_lookup);
1669                         list_del_init(&lkb->lkb_rsb_lookup);
1670                         r->res_first_lkid = lkb->lkb_id;
1671                         _request_lock(r, lkb);
1672                 } else
1673                         r->res_nodeid = -1;
1674                 break;
1675
1676         default:
1677                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1678         }
1679 }
1680
1681 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1682                          int namelen, uint32_t parent_lkid, void *ast,
1683                          void *astarg, void *bast, struct dlm_args *args)
1684 {
1685         int rv = -EINVAL;
1686
1687         /* check for invalid arg usage */
1688
1689         if (mode < 0 || mode > DLM_LOCK_EX)
1690                 goto out;
1691
1692         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1693                 goto out;
1694
1695         if (flags & DLM_LKF_CANCEL)
1696                 goto out;
1697
1698         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1699                 goto out;
1700
1701         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1702                 goto out;
1703
1704         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1705                 goto out;
1706
1707         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1708                 goto out;
1709
1710         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1711                 goto out;
1712
1713         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1714                 goto out;
1715
1716         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1717                 goto out;
1718
1719         if (!ast || !lksb)
1720                 goto out;
1721
1722         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1723                 goto out;
1724
1725         /* parent/child locks not yet supported */
1726         if (parent_lkid)
1727                 goto out;
1728
1729         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1730                 goto out;
1731
1732         /* these args will be copied to the lkb in validate_lock_args,
1733            it cannot be done now because when converting locks, fields in
1734            an active lkb cannot be modified before locking the rsb */
1735
1736         args->flags = flags;
1737         args->astaddr = ast;
1738         args->astparam = (long) astarg;
1739         args->bastaddr = bast;
1740         args->mode = mode;
1741         args->lksb = lksb;
1742         rv = 0;
1743  out:
1744         return rv;
1745 }
1746
1747 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1748 {
1749         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1750                       DLM_LKF_FORCEUNLOCK))
1751                 return -EINVAL;
1752
1753         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1754                 return -EINVAL;
1755
1756         args->flags = flags;
1757         args->astparam = (long) astarg;
1758         return 0;
1759 }
1760
1761 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1762                               struct dlm_args *args)
1763 {
1764         int rv = -EINVAL;
1765
1766         if (args->flags & DLM_LKF_CONVERT) {
1767                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1768                         goto out;
1769
1770                 if (args->flags & DLM_LKF_QUECVT &&
1771                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1772                         goto out;
1773
1774                 rv = -EBUSY;
1775                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1776                         goto out;
1777
1778                 if (lkb->lkb_wait_type)
1779                         goto out;
1780
1781                 if (is_overlap(lkb))
1782                         goto out;
1783         }
1784
1785         lkb->lkb_exflags = args->flags;
1786         lkb->lkb_sbflags = 0;
1787         lkb->lkb_astaddr = args->astaddr;
1788         lkb->lkb_astparam = args->astparam;
1789         lkb->lkb_bastaddr = args->bastaddr;
1790         lkb->lkb_rqmode = args->mode;
1791         lkb->lkb_lksb = args->lksb;
1792         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1793         lkb->lkb_ownpid = (int) current->pid;
1794         rv = 0;
1795  out:
1796         return rv;
1797 }
1798
1799 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1800    for success */
1801
1802 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1803    because there may be a lookup in progress and it's valid to do
1804    cancel/unlockf on it */
1805
1806 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1807 {
1808         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1809         int rv = -EINVAL;
1810
1811         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1812                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1813                 dlm_print_lkb(lkb);
1814                 goto out;
1815         }
1816
1817         /* an lkb may still exist even though the lock is EOL'ed due to a
1818            cancel, unlock or failed noqueue request; an app can't use these
1819            locks; return same error as if the lkid had not been found at all */
1820
1821         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1822                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1823                 rv = -ENOENT;
1824                 goto out;
1825         }
1826
1827         /* an lkb may be waiting for an rsb lookup to complete where the
1828            lookup was initiated by another lock */
1829
1830         if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1831                 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1832                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1833                         list_del_init(&lkb->lkb_rsb_lookup);
1834                         queue_cast(lkb->lkb_resource, lkb,
1835                                    args->flags & DLM_LKF_CANCEL ?
1836                                    -DLM_ECANCEL : -DLM_EUNLOCK);
1837                         unhold_lkb(lkb); /* undoes create_lkb() */
1838                         rv = -EBUSY;
1839                         goto out;
1840                 }
1841         }
1842
1843         /* cancel not allowed with another cancel/unlock in progress */
1844
1845         if (args->flags & DLM_LKF_CANCEL) {
1846                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1847                         goto out;
1848
1849                 if (is_overlap(lkb))
1850                         goto out;
1851
1852                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1853                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1854                         rv = -EBUSY;
1855                         goto out;
1856                 }
1857
1858                 switch (lkb->lkb_wait_type) {
1859                 case DLM_MSG_LOOKUP:
1860                 case DLM_MSG_REQUEST:
1861                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1862                         rv = -EBUSY;
1863                         goto out;
1864                 case DLM_MSG_UNLOCK:
1865                 case DLM_MSG_CANCEL:
1866                         goto out;
1867                 }
1868                 /* add_to_waiters() will set OVERLAP_CANCEL */
1869                 goto out_ok;
1870         }
1871
1872         /* do we need to allow a force-unlock if there's a normal unlock
1873            already in progress?  in what conditions could the normal unlock
1874            fail such that we'd want to send a force-unlock to be sure? */
1875
1876         if (args->flags & DLM_LKF_FORCEUNLOCK) {
1877                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1878                         goto out;
1879
1880                 if (is_overlap_unlock(lkb))
1881                         goto out;
1882
1883                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1884                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1885                         rv = -EBUSY;
1886                         goto out;
1887                 }
1888
1889                 switch (lkb->lkb_wait_type) {
1890                 case DLM_MSG_LOOKUP:
1891                 case DLM_MSG_REQUEST:
1892                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1893                         rv = -EBUSY;
1894                         goto out;
1895                 case DLM_MSG_UNLOCK:
1896                         goto out;
1897                 }
1898                 /* add_to_waiters() will set OVERLAP_UNLOCK */
1899                 goto out_ok;
1900         }
1901
1902         /* normal unlock not allowed if there's any op in progress */
1903         rv = -EBUSY;
1904         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1905                 goto out;
1906
1907  out_ok:
1908         /* an overlapping op shouldn't blow away exflags from other op */
1909         lkb->lkb_exflags |= args->flags;
1910         lkb->lkb_sbflags = 0;
1911         lkb->lkb_astparam = args->astparam;
1912         rv = 0;
1913  out:
1914         if (rv)
1915                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1916                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1917                           args->flags, lkb->lkb_wait_type,
1918                           lkb->lkb_resource->res_name);
1919         return rv;
1920 }
1921
1922 /*
1923  * Four stage 4 varieties:
1924  * do_request(), do_convert(), do_unlock(), do_cancel()
1925  * These are called on the master node for the given lock and
1926  * from the central locking logic.
1927  */
1928
1929 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1930 {
1931         int error = 0;
1932
1933         if (can_be_granted(r, lkb, 1)) {
1934                 grant_lock(r, lkb);
1935                 queue_cast(r, lkb, 0);
1936                 goto out;
1937         }
1938
1939         if (can_be_queued(lkb)) {
1940                 error = -EINPROGRESS;
1941                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1942                 send_blocking_asts(r, lkb);
1943                 goto out;
1944         }
1945
1946         error = -EAGAIN;
1947         if (force_blocking_asts(lkb))
1948                 send_blocking_asts_all(r, lkb);
1949         queue_cast(r, lkb, -EAGAIN);
1950
1951  out:
1952         return error;
1953 }
1954
1955 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1956 {
1957         int error = 0;
1958
1959         /* changing an existing lock may allow others to be granted */
1960
1961         if (can_be_granted(r, lkb, 1)) {
1962                 grant_lock(r, lkb);
1963                 queue_cast(r, lkb, 0);
1964                 grant_pending_locks(r);
1965                 goto out;
1966         }
1967
1968         if (can_be_queued(lkb)) {
1969                 if (is_demoted(lkb))
1970                         grant_pending_locks(r);
1971                 error = -EINPROGRESS;
1972                 del_lkb(r, lkb);
1973                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1974                 send_blocking_asts(r, lkb);
1975                 goto out;
1976         }
1977
1978         error = -EAGAIN;
1979         if (force_blocking_asts(lkb))
1980                 send_blocking_asts_all(r, lkb);
1981         queue_cast(r, lkb, -EAGAIN);
1982
1983  out:
1984         return error;
1985 }
1986
1987 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1988 {
1989         remove_lock(r, lkb);
1990         queue_cast(r, lkb, -DLM_EUNLOCK);
1991         grant_pending_locks(r);
1992         return -DLM_EUNLOCK;
1993 }
1994
1995 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
1996  
1997 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1998 {
1999         int error;
2000
2001         error = revert_lock(r, lkb);
2002         if (error) {
2003                 queue_cast(r, lkb, -DLM_ECANCEL);
2004                 grant_pending_locks(r);
2005                 return -DLM_ECANCEL;
2006         }
2007         return 0;
2008 }
2009
2010 /*
2011  * Four stage 3 varieties:
2012  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2013  */
2014
2015 /* add a new lkb to a possibly new rsb, called by requesting process */
2016
2017 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2018 {
2019         int error;
2020
2021         /* set_master: sets lkb nodeid from r */
2022
2023         error = set_master(r, lkb);
2024         if (error < 0)
2025                 goto out;
2026         if (error) {
2027                 error = 0;
2028                 goto out;
2029         }
2030
2031         if (is_remote(r))
2032                 /* receive_request() calls do_request() on remote node */
2033                 error = send_request(r, lkb);
2034         else
2035                 error = do_request(r, lkb);
2036  out:
2037         return error;
2038 }
2039
2040 /* change some property of an existing lkb, e.g. mode */
2041
2042 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2043 {
2044         int error;
2045
2046         if (is_remote(r))
2047                 /* receive_convert() calls do_convert() on remote node */
2048                 error = send_convert(r, lkb);
2049         else
2050                 error = do_convert(r, lkb);
2051
2052         return error;
2053 }
2054
2055 /* remove an existing lkb from the granted queue */
2056
2057 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2058 {
2059         int error;
2060
2061         if (is_remote(r))
2062                 /* receive_unlock() calls do_unlock() on remote node */
2063                 error = send_unlock(r, lkb);
2064         else
2065                 error = do_unlock(r, lkb);
2066
2067         return error;
2068 }
2069
2070 /* remove an existing lkb from the convert or wait queue */
2071
2072 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2073 {
2074         int error;
2075
2076         if (is_remote(r))
2077                 /* receive_cancel() calls do_cancel() on remote node */
2078                 error = send_cancel(r, lkb);
2079         else
2080                 error = do_cancel(r, lkb);
2081
2082         return error;
2083 }
2084
2085 /*
2086  * Four stage 2 varieties:
2087  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2088  */
2089
2090 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2091                         int len, struct dlm_args *args)
2092 {
2093         struct dlm_rsb *r;
2094         int error;
2095
2096         error = validate_lock_args(ls, lkb, args);
2097         if (error)
2098                 goto out;
2099
2100         error = find_rsb(ls, name, len, R_CREATE, &r);
2101         if (error)
2102                 goto out;
2103
2104         lock_rsb(r);
2105
2106         attach_lkb(r, lkb);
2107         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2108
2109         error = _request_lock(r, lkb);
2110
2111         unlock_rsb(r);
2112         put_rsb(r);
2113
2114  out:
2115         return error;
2116 }
2117
2118 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2119                         struct dlm_args *args)
2120 {
2121         struct dlm_rsb *r;
2122         int error;
2123
2124         r = lkb->lkb_resource;
2125
2126         hold_rsb(r);
2127         lock_rsb(r);
2128
2129         error = validate_lock_args(ls, lkb, args);
2130         if (error)
2131                 goto out;
2132
2133         error = _convert_lock(r, lkb);
2134  out:
2135         unlock_rsb(r);
2136         put_rsb(r);
2137         return error;
2138 }
2139
2140 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2141                        struct dlm_args *args)
2142 {
2143         struct dlm_rsb *r;
2144         int error;
2145
2146         r = lkb->lkb_resource;
2147
2148         hold_rsb(r);
2149         lock_rsb(r);
2150
2151         error = validate_unlock_args(lkb, args);
2152         if (error)
2153                 goto out;
2154
2155         error = _unlock_lock(r, lkb);
2156  out:
2157         unlock_rsb(r);
2158         put_rsb(r);
2159         return error;
2160 }
2161
2162 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2163                        struct dlm_args *args)
2164 {
2165         struct dlm_rsb *r;
2166         int error;
2167
2168         r = lkb->lkb_resource;
2169
2170         hold_rsb(r);
2171         lock_rsb(r);
2172
2173         error = validate_unlock_args(lkb, args);
2174         if (error)
2175                 goto out;
2176
2177         error = _cancel_lock(r, lkb);
2178  out:
2179         unlock_rsb(r);
2180         put_rsb(r);
2181         return error;
2182 }
2183
2184 /*
2185  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2186  */
2187
2188 int dlm_lock(dlm_lockspace_t *lockspace,
2189              int mode,
2190              struct dlm_lksb *lksb,
2191              uint32_t flags,
2192              void *name,
2193              unsigned int namelen,
2194              uint32_t parent_lkid,
2195              void (*ast) (void *astarg),
2196              void *astarg,
2197              void (*bast) (void *astarg, int mode))
2198 {
2199         struct dlm_ls *ls;
2200         struct dlm_lkb *lkb;
2201         struct dlm_args args;
2202         int error, convert = flags & DLM_LKF_CONVERT;
2203
2204         ls = dlm_find_lockspace_local(lockspace);
2205         if (!ls)
2206                 return -EINVAL;
2207
2208         lock_recovery(ls);
2209
2210         if (convert)
2211                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2212         else
2213                 error = create_lkb(ls, &lkb);
2214
2215         if (error)
2216                 goto out;
2217
2218         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
2219                               astarg, bast, &args);
2220         if (error)
2221                 goto out_put;
2222
2223         if (convert)
2224                 error = convert_lock(ls, lkb, &args);
2225         else
2226                 error = request_lock(ls, lkb, name, namelen, &args);
2227
2228         if (error == -EINPROGRESS)
2229                 error = 0;
2230  out_put:
2231         if (convert || error)
2232                 __put_lkb(ls, lkb);
2233         if (error == -EAGAIN)
2234                 error = 0;
2235  out:
2236         unlock_recovery(ls);
2237         dlm_put_lockspace(ls);
2238         return error;
2239 }
2240
2241 int dlm_unlock(dlm_lockspace_t *lockspace,
2242                uint32_t lkid,
2243                uint32_t flags,
2244                struct dlm_lksb *lksb,
2245                void *astarg)
2246 {
2247         struct dlm_ls *ls;
2248         struct dlm_lkb *lkb;
2249         struct dlm_args args;
2250         int error;
2251
2252         ls = dlm_find_lockspace_local(lockspace);
2253         if (!ls)
2254                 return -EINVAL;
2255
2256         lock_recovery(ls);
2257
2258         error = find_lkb(ls, lkid, &lkb);
2259         if (error)
2260                 goto out;
2261
2262         error = set_unlock_args(flags, astarg, &args);
2263         if (error)
2264                 goto out_put;
2265
2266         if (flags & DLM_LKF_CANCEL)
2267                 error = cancel_lock(ls, lkb, &args);
2268         else
2269                 error = unlock_lock(ls, lkb, &args);
2270
2271         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2272                 error = 0;
2273         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2274                 error = 0;
2275  out_put:
2276         dlm_put_lkb(lkb);
2277  out:
2278         unlock_recovery(ls);
2279         dlm_put_lockspace(ls);
2280         return error;
2281 }
2282
2283 /*
2284  * send/receive routines for remote operations and replies
2285  *
2286  * send_args
2287  * send_common
2288  * send_request                 receive_request
2289  * send_convert                 receive_convert
2290  * send_unlock                  receive_unlock
2291  * send_cancel                  receive_cancel
2292  * send_grant                   receive_grant
2293  * send_bast                    receive_bast
2294  * send_lookup                  receive_lookup
2295  * send_remove                  receive_remove
2296  *
2297  *                              send_common_reply
2298  * receive_request_reply        send_request_reply
2299  * receive_convert_reply        send_convert_reply
2300  * receive_unlock_reply         send_unlock_reply
2301  * receive_cancel_reply         send_cancel_reply
2302  * receive_lookup_reply         send_lookup_reply
2303  */
2304
2305 static int _create_message(struct dlm_ls *ls, int mb_len,
2306                            int to_nodeid, int mstype,
2307                            struct dlm_message **ms_ret,
2308                            struct dlm_mhandle **mh_ret)
2309 {
2310         struct dlm_message *ms;
2311         struct dlm_mhandle *mh;
2312         char *mb;
2313
2314         /* get_buffer gives us a message handle (mh) that we need to
2315            pass into lowcomms_commit and a message buffer (mb) that we
2316            write our data into */
2317
2318         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2319         if (!mh)
2320                 return -ENOBUFS;
2321
2322         memset(mb, 0, mb_len);
2323
2324         ms = (struct dlm_message *) mb;
2325
2326         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2327         ms->m_header.h_lockspace = ls->ls_global_id;
2328         ms->m_header.h_nodeid = dlm_our_nodeid();
2329         ms->m_header.h_length = mb_len;
2330         ms->m_header.h_cmd = DLM_MSG;
2331
2332         ms->m_type = mstype;
2333
2334         *mh_ret = mh;
2335         *ms_ret = ms;
2336         return 0;
2337 }
2338
2339 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2340                           int to_nodeid, int mstype,
2341                           struct dlm_message **ms_ret,
2342                           struct dlm_mhandle **mh_ret)
2343 {
2344         int mb_len = sizeof(struct dlm_message);
2345
2346         switch (mstype) {
2347         case DLM_MSG_REQUEST:
2348         case DLM_MSG_LOOKUP:
2349         case DLM_MSG_REMOVE:
2350                 mb_len += r->res_length;
2351                 break;
2352         case DLM_MSG_CONVERT:
2353         case DLM_MSG_UNLOCK:
2354         case DLM_MSG_REQUEST_REPLY:
2355         case DLM_MSG_CONVERT_REPLY:
2356         case DLM_MSG_GRANT:
2357                 if (lkb && lkb->lkb_lvbptr)
2358                         mb_len += r->res_ls->ls_lvblen;
2359                 break;
2360         }
2361
2362         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2363                                ms_ret, mh_ret);
2364 }
2365
2366 /* further lowcomms enhancements or alternate implementations may make
2367    the return value from this function useful at some point */
2368
2369 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2370 {
2371         dlm_message_out(ms);
2372         dlm_lowcomms_commit_buffer(mh);
2373         return 0;
2374 }
2375
2376 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2377                       struct dlm_message *ms)
2378 {
2379         ms->m_nodeid   = lkb->lkb_nodeid;
2380         ms->m_pid      = lkb->lkb_ownpid;
2381         ms->m_lkid     = lkb->lkb_id;
2382         ms->m_remid    = lkb->lkb_remid;
2383         ms->m_exflags  = lkb->lkb_exflags;
2384         ms->m_sbflags  = lkb->lkb_sbflags;
2385         ms->m_flags    = lkb->lkb_flags;
2386         ms->m_lvbseq   = lkb->lkb_lvbseq;
2387         ms->m_status   = lkb->lkb_status;
2388         ms->m_grmode   = lkb->lkb_grmode;
2389         ms->m_rqmode   = lkb->lkb_rqmode;
2390         ms->m_hash     = r->res_hash;
2391
2392         /* m_result and m_bastmode are set from function args,
2393            not from lkb fields */
2394
2395         if (lkb->lkb_bastaddr)
2396                 ms->m_asts |= AST_BAST;
2397         if (lkb->lkb_astaddr)
2398                 ms->m_asts |= AST_COMP;
2399
2400         /* compare with switch in create_message; send_remove() doesn't
2401            use send_args() */
2402
2403         switch (ms->m_type) {
2404         case DLM_MSG_REQUEST:
2405         case DLM_MSG_LOOKUP:
2406                 memcpy(ms->m_extra, r->res_name, r->res_length);
2407                 break;
2408         case DLM_MSG_CONVERT:
2409         case DLM_MSG_UNLOCK:
2410         case DLM_MSG_REQUEST_REPLY:
2411         case DLM_MSG_CONVERT_REPLY:
2412         case DLM_MSG_GRANT:
2413                 if (!lkb->lkb_lvbptr)
2414                         break;
2415                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2416                 break;
2417         }
2418 }
2419
2420 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2421 {
2422         struct dlm_message *ms;
2423         struct dlm_mhandle *mh;
2424         int to_nodeid, error;
2425
2426         error = add_to_waiters(lkb, mstype);
2427         if (error)
2428                 return error;
2429
2430         to_nodeid = r->res_nodeid;
2431
2432         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2433         if (error)
2434                 goto fail;
2435
2436         send_args(r, lkb, ms);
2437
2438         error = send_message(mh, ms);
2439         if (error)
2440                 goto fail;
2441         return 0;
2442
2443  fail:
2444         remove_from_waiters(lkb, msg_reply_type(mstype));
2445         return error;
2446 }
2447
2448 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2449 {
2450         return send_common(r, lkb, DLM_MSG_REQUEST);
2451 }
2452
2453 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2454 {
2455         int error;
2456
2457         error = send_common(r, lkb, DLM_MSG_CONVERT);
2458
2459         /* down conversions go without a reply from the master */
2460         if (!error && down_conversion(lkb)) {
2461                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2462                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2463                 r->res_ls->ls_stub_ms.m_result = 0;
2464                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2465                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2466         }
2467
2468         return error;
2469 }
2470
2471 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2472    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2473    that the master is still correct. */
2474
2475 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2476 {
2477         return send_common(r, lkb, DLM_MSG_UNLOCK);
2478 }
2479
2480 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2481 {
2482         return send_common(r, lkb, DLM_MSG_CANCEL);
2483 }
2484
2485 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2486 {
2487         struct dlm_message *ms;
2488         struct dlm_mhandle *mh;
2489         int to_nodeid, error;
2490
2491         to_nodeid = lkb->lkb_nodeid;
2492
2493         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2494         if (error)
2495                 goto out;
2496
2497         send_args(r, lkb, ms);
2498
2499         ms->m_result = 0;
2500
2501         error = send_message(mh, ms);
2502  out:
2503         return error;
2504 }
2505
2506 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2507 {
2508         struct dlm_message *ms;
2509         struct dlm_mhandle *mh;
2510         int to_nodeid, error;
2511
2512         to_nodeid = lkb->lkb_nodeid;
2513
2514         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2515         if (error)
2516                 goto out;
2517
2518         send_args(r, lkb, ms);
2519
2520         ms->m_bastmode = mode;
2521
2522         error = send_message(mh, ms);
2523  out:
2524         return error;
2525 }
2526
2527 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2528 {
2529         struct dlm_message *ms;
2530         struct dlm_mhandle *mh;
2531         int to_nodeid, error;
2532
2533         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2534         if (error)
2535                 return error;
2536
2537         to_nodeid = dlm_dir_nodeid(r);
2538
2539         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2540         if (error)
2541                 goto fail;
2542
2543         send_args(r, lkb, ms);
2544
2545         error = send_message(mh, ms);
2546         if (error)
2547                 goto fail;
2548         return 0;
2549
2550  fail:
2551         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2552         return error;
2553 }
2554
2555 static int send_remove(struct dlm_rsb *r)
2556 {
2557         struct dlm_message *ms;
2558         struct dlm_mhandle *mh;
2559         int to_nodeid, error;
2560
2561         to_nodeid = dlm_dir_nodeid(r);
2562
2563         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2564         if (error)
2565                 goto out;
2566
2567         memcpy(ms->m_extra, r->res_name, r->res_length);
2568         ms->m_hash = r->res_hash;
2569
2570         error = send_message(mh, ms);
2571  out:
2572         return error;
2573 }
2574
2575 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2576                              int mstype, int rv)
2577 {
2578         struct dlm_message *ms;
2579         struct dlm_mhandle *mh;
2580         int to_nodeid, error;
2581
2582         to_nodeid = lkb->lkb_nodeid;
2583
2584         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2585         if (error)
2586                 goto out;
2587
2588         send_args(r, lkb, ms);
2589
2590         ms->m_result = rv;
2591
2592         error = send_message(mh, ms);
2593  out:
2594         return error;
2595 }
2596
2597 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2598 {
2599         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2600 }
2601
2602 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2603 {
2604         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2605 }
2606
2607 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2608 {
2609         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2610 }
2611
2612 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2613 {
2614         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2615 }
2616
2617 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2618                              int ret_nodeid, int rv)
2619 {
2620         struct dlm_rsb *r = &ls->ls_stub_rsb;
2621         struct dlm_message *ms;
2622         struct dlm_mhandle *mh;
2623         int error, nodeid = ms_in->m_header.h_nodeid;
2624
2625         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2626         if (error)
2627                 goto out;
2628
2629         ms->m_lkid = ms_in->m_lkid;
2630         ms->m_result = rv;
2631         ms->m_nodeid = ret_nodeid;
2632
2633         error = send_message(mh, ms);
2634  out:
2635         return error;
2636 }
2637
2638 /* which args we save from a received message depends heavily on the type
2639    of message, unlike the send side where we can safely send everything about
2640    the lkb for any type of message */
2641
2642 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2643 {
2644         lkb->lkb_exflags = ms->m_exflags;
2645         lkb->lkb_sbflags = ms->m_sbflags;
2646         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2647                          (ms->m_flags & 0x0000FFFF);
2648 }
2649
2650 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2651 {
2652         lkb->lkb_sbflags = ms->m_sbflags;
2653         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2654                          (ms->m_flags & 0x0000FFFF);
2655 }
2656
2657 static int receive_extralen(struct dlm_message *ms)
2658 {
2659         return (ms->m_header.h_length - sizeof(struct dlm_message));
2660 }
2661
2662 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2663                        struct dlm_message *ms)
2664 {
2665         int len;
2666
2667         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2668                 if (!lkb->lkb_lvbptr)
2669                         lkb->lkb_lvbptr = allocate_lvb(ls);
2670                 if (!lkb->lkb_lvbptr)
2671                         return -ENOMEM;
2672                 len = receive_extralen(ms);
2673                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2674         }
2675         return 0;
2676 }
2677
2678 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2679                                 struct dlm_message *ms)
2680 {
2681         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2682         lkb->lkb_ownpid = ms->m_pid;
2683         lkb->lkb_remid = ms->m_lkid;
2684         lkb->lkb_grmode = DLM_LOCK_IV;
2685         lkb->lkb_rqmode = ms->m_rqmode;
2686         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2687         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2688
2689         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2690
2691         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2692                 /* lkb was just created so there won't be an lvb yet */
2693                 lkb->lkb_lvbptr = allocate_lvb(ls);
2694                 if (!lkb->lkb_lvbptr)
2695                         return -ENOMEM;
2696         }
2697
2698         return 0;
2699 }
2700
2701 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2702                                 struct dlm_message *ms)
2703 {
2704         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2705                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2706                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2707                           lkb->lkb_id, lkb->lkb_remid);
2708                 return -EINVAL;
2709         }
2710
2711         if (!is_master_copy(lkb))
2712                 return -EINVAL;
2713
2714         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2715                 return -EBUSY;
2716
2717         if (receive_lvb(ls, lkb, ms))
2718                 return -ENOMEM;
2719
2720         lkb->lkb_rqmode = ms->m_rqmode;
2721         lkb->lkb_lvbseq = ms->m_lvbseq;
2722
2723         return 0;
2724 }
2725
2726 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2727                                struct dlm_message *ms)
2728 {
2729         if (!is_master_copy(lkb))
2730                 return -EINVAL;
2731         if (receive_lvb(ls, lkb, ms))
2732                 return -ENOMEM;
2733         return 0;
2734 }
2735
2736 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2737    uses to send a reply and that the remote end uses to process the reply. */
2738
2739 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2740 {
2741         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2742         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2743         lkb->lkb_remid = ms->m_lkid;
2744 }
2745
2746 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2747 {
2748         struct dlm_lkb *lkb;
2749         struct dlm_rsb *r;
2750         int error, namelen;
2751
2752         error = create_lkb(ls, &lkb);
2753         if (error)
2754                 goto fail;
2755
2756         receive_flags(lkb, ms);
2757         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2758         error = receive_request_args(ls, lkb, ms);
2759         if (error) {
2760                 __put_lkb(ls, lkb);
2761                 goto fail;
2762         }
2763
2764         namelen = receive_extralen(ms);
2765
2766         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2767         if (error) {
2768                 __put_lkb(ls, lkb);
2769                 goto fail;
2770         }
2771
2772         lock_rsb(r);
2773
2774         attach_lkb(r, lkb);
2775         error = do_request(r, lkb);
2776         send_request_reply(r, lkb, error);
2777
2778         unlock_rsb(r);
2779         put_rsb(r);
2780
2781         if (error == -EINPROGRESS)
2782                 error = 0;
2783         if (error)
2784                 dlm_put_lkb(lkb);
2785         return;
2786
2787  fail:
2788         setup_stub_lkb(ls, ms);
2789         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2790 }
2791
2792 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2793 {
2794         struct dlm_lkb *lkb;
2795         struct dlm_rsb *r;
2796         int error, reply = 1;
2797
2798         error = find_lkb(ls, ms->m_remid, &lkb);
2799         if (error)
2800                 goto fail;
2801
2802         r = lkb->lkb_resource;
2803
2804         hold_rsb(r);
2805         lock_rsb(r);
2806
2807         receive_flags(lkb, ms);
2808         error = receive_convert_args(ls, lkb, ms);
2809         if (error)
2810                 goto out;
2811         reply = !down_conversion(lkb);
2812
2813         error = do_convert(r, lkb);
2814  out:
2815         if (reply)
2816                 send_convert_reply(r, lkb, error);
2817
2818         unlock_rsb(r);
2819         put_rsb(r);
2820         dlm_put_lkb(lkb);
2821         return;
2822
2823  fail:
2824         setup_stub_lkb(ls, ms);
2825         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2826 }
2827
2828 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2829 {
2830         struct dlm_lkb *lkb;
2831         struct dlm_rsb *r;
2832         int error;
2833
2834         error = find_lkb(ls, ms->m_remid, &lkb);
2835         if (error)
2836                 goto fail;
2837
2838         r = lkb->lkb_resource;
2839
2840         hold_rsb(r);
2841         lock_rsb(r);
2842
2843         receive_flags(lkb, ms);
2844         error = receive_unlock_args(ls, lkb, ms);
2845         if (error)
2846                 goto out;
2847
2848         error = do_unlock(r, lkb);
2849  out:
2850         send_unlock_reply(r, lkb, error);
2851
2852         unlock_rsb(r);
2853         put_rsb(r);
2854         dlm_put_lkb(lkb);
2855         return;
2856
2857  fail:
2858         setup_stub_lkb(ls, ms);
2859         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2860 }
2861
2862 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2863 {
2864         struct dlm_lkb *lkb;
2865         struct dlm_rsb *r;
2866         int error;
2867
2868         error = find_lkb(ls, ms->m_remid, &lkb);
2869         if (error)
2870                 goto fail;
2871
2872         receive_flags(lkb, ms);
2873
2874         r = lkb->lkb_resource;
2875
2876         hold_rsb(r);
2877         lock_rsb(r);
2878
2879         error = do_cancel(r, lkb);
2880         send_cancel_reply(r, lkb, error);
2881
2882         unlock_rsb(r);
2883         put_rsb(r);
2884         dlm_put_lkb(lkb);
2885         return;
2886
2887  fail:
2888         setup_stub_lkb(ls, ms);
2889         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2890 }
2891
2892 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2893 {
2894         struct dlm_lkb *lkb;
2895         struct dlm_rsb *r;
2896         int error;
2897
2898         error = find_lkb(ls, ms->m_remid, &lkb);
2899         if (error) {
2900                 log_error(ls, "receive_grant no lkb");
2901                 return;
2902         }
2903         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2904
2905         r = lkb->lkb_resource;
2906
2907         hold_rsb(r);
2908         lock_rsb(r);
2909
2910         receive_flags_reply(lkb, ms);
2911         grant_lock_pc(r, lkb, ms);
2912         queue_cast(r, lkb, 0);
2913
2914         unlock_rsb(r);
2915         put_rsb(r);
2916         dlm_put_lkb(lkb);
2917 }
2918
2919 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2920 {
2921         struct dlm_lkb *lkb;
2922         struct dlm_rsb *r;
2923         int error;
2924
2925         error = find_lkb(ls, ms->m_remid, &lkb);
2926         if (error) {
2927                 log_error(ls, "receive_bast no lkb");
2928                 return;
2929         }
2930         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2931
2932         r = lkb->lkb_resource;
2933
2934         hold_rsb(r);
2935         lock_rsb(r);
2936
2937         queue_bast(r, lkb, ms->m_bastmode);
2938
2939         unlock_rsb(r);
2940         put_rsb(r);
2941         dlm_put_lkb(lkb);
2942 }
2943
2944 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2945 {
2946         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2947
2948         from_nodeid = ms->m_header.h_nodeid;
2949         our_nodeid = dlm_our_nodeid();
2950
2951         len = receive_extralen(ms);
2952
2953         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2954         if (dir_nodeid != our_nodeid) {
2955                 log_error(ls, "lookup dir_nodeid %d from %d",
2956                           dir_nodeid, from_nodeid);
2957                 error = -EINVAL;
2958                 ret_nodeid = -1;
2959                 goto out;
2960         }
2961
2962         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2963
2964         /* Optimization: we're master so treat lookup as a request */
2965         if (!error && ret_nodeid == our_nodeid) {
2966                 receive_request(ls, ms);
2967                 return;
2968         }
2969  out:
2970         send_lookup_reply(ls, ms, ret_nodeid, error);
2971 }
2972
2973 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2974 {
2975         int len, dir_nodeid, from_nodeid;
2976
2977         from_nodeid = ms->m_header.h_nodeid;
2978
2979         len = receive_extralen(ms);
2980
2981         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2982         if (dir_nodeid != dlm_our_nodeid()) {
2983                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2984                           dir_nodeid, from_nodeid);
2985                 return;
2986         }
2987
2988         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2989 }
2990
2991 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
2992 {
2993         do_purge(ls, ms->m_nodeid, ms->m_pid);
2994 }
2995
2996 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2997 {
2998         struct dlm_lkb *lkb;
2999         struct dlm_rsb *r;
3000         int error, mstype, result;
3001
3002         error = find_lkb(ls, ms->m_remid, &lkb);
3003         if (error) {
3004                 log_error(ls, "receive_request_reply no lkb");
3005                 return;
3006         }
3007         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3008
3009         r = lkb->lkb_resource;
3010         hold_rsb(r);
3011         lock_rsb(r);
3012
3013         mstype = lkb->lkb_wait_type;
3014         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3015         if (error)
3016                 goto out;
3017
3018         /* Optimization: the dir node was also the master, so it took our
3019            lookup as a request and sent request reply instead of lookup reply */
3020         if (mstype == DLM_MSG_LOOKUP) {
3021                 r->res_nodeid = ms->m_header.h_nodeid;
3022                 lkb->lkb_nodeid = r->res_nodeid;
3023         }
3024
3025         /* this is the value returned from do_request() on the master */
3026         result = ms->m_result;
3027
3028         switch (result) {
3029         case -EAGAIN:
3030                 /* request would block (be queued) on remote master */
3031                 queue_cast(r, lkb, -EAGAIN);
3032                 confirm_master(r, -EAGAIN);
3033                 unhold_lkb(lkb); /* undoes create_lkb() */
3034                 break;
3035
3036         case -EINPROGRESS:
3037         case 0:
3038                 /* request was queued or granted on remote master */
3039                 receive_flags_reply(lkb, ms);
3040                 lkb->lkb_remid = ms->m_lkid;
3041                 if (result)
3042                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3043                 else {
3044                         grant_lock_pc(r, lkb, ms);
3045                         queue_cast(r, lkb, 0);
3046                 }
3047                 confirm_master(r, result);
3048                 break;
3049
3050         case -EBADR:
3051         case -ENOTBLK:
3052                 /* find_rsb failed to find rsb or rsb wasn't master */
3053                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3054                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3055                 r->res_nodeid = -1;
3056                 lkb->lkb_nodeid = -1;
3057
3058                 if (is_overlap(lkb)) {
3059                         /* we'll ignore error in cancel/unlock reply */
3060                         queue_cast_overlap(r, lkb);
3061                         unhold_lkb(lkb); /* undoes create_lkb() */
3062                 } else
3063                         _request_lock(r, lkb);
3064                 break;
3065
3066         default:
3067                 log_error(ls, "receive_request_reply %x error %d",
3068                           lkb->lkb_id, result);
3069         }
3070
3071         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3072                 log_debug(ls, "receive_request_reply %x result %d unlock",
3073                           lkb->lkb_id, result);
3074                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3075                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3076                 send_unlock(r, lkb);
3077         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3078                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3079                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3080                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3081                 send_cancel(r, lkb);
3082         } else {
3083                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3084                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3085         }
3086  out:
3087         unlock_rsb(r);
3088         put_rsb(r);
3089         dlm_put_lkb(lkb);
3090 }
3091
3092 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3093                                     struct dlm_message *ms)
3094 {
3095         /* this is the value returned from do_convert() on the master */
3096         switch (ms->m_result) {
3097         case -EAGAIN:
3098                 /* convert would block (be queued) on remote master */
3099                 queue_cast(r, lkb, -EAGAIN);
3100                 break;
3101
3102         case -EINPROGRESS:
3103                 /* convert was queued on remote master */
3104                 del_lkb(r, lkb);
3105                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3106                 break;
3107
3108         case 0:
3109                 /* convert was granted on remote master */
3110                 receive_flags_reply(lkb, ms);
3111                 grant_lock_pc(r, lkb, ms);
3112                 queue_cast(r, lkb, 0);
3113                 break;
3114
3115         default:
3116                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3117                           lkb->lkb_id, ms->m_result);
3118         }
3119 }
3120
3121 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3122 {
3123         struct dlm_rsb *r = lkb->lkb_resource;
3124         int error;
3125
3126         hold_rsb(r);
3127         lock_rsb(r);
3128
3129         /* stub reply can happen with waiters_mutex held */
3130         error = remove_from_waiters_ms(lkb, ms);
3131         if (error)
3132                 goto out;
3133
3134         __receive_convert_reply(r, lkb, ms);
3135  out:
3136         unlock_rsb(r);
3137         put_rsb(r);
3138 }
3139
3140 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3141 {
3142         struct dlm_lkb *lkb;
3143         int error;
3144
3145         error = find_lkb(ls, ms->m_remid, &lkb);
3146         if (error) {
3147                 log_error(ls, "receive_convert_reply no lkb");
3148                 return;
3149         }
3150         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3151
3152         _receive_convert_reply(lkb, ms);
3153         dlm_put_lkb(lkb);
3154 }
3155
3156 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3157 {
3158         struct dlm_rsb *r = lkb->lkb_resource;
3159         int error;
3160
3161         hold_rsb(r);
3162         lock_rsb(r);
3163
3164         /* stub reply can happen with waiters_mutex held */
3165         error = remove_from_waiters_ms(lkb, ms);
3166         if (error)
3167                 goto out;
3168
3169         /* this is the value returned from do_unlock() on the master */
3170
3171         switch (ms->m_result) {
3172         case -DLM_EUNLOCK:
3173                 receive_flags_reply(lkb, ms);
3174                 remove_lock_pc(r, lkb);
3175                 queue_cast(r, lkb, -DLM_EUNLOCK);
3176                 break;
3177         case -ENOENT:
3178                 break;
3179         default:
3180                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3181                           lkb->lkb_id, ms->m_result);
3182         }
3183  out:
3184         unlock_rsb(r);
3185         put_rsb(r);
3186 }
3187
3188 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3189 {
3190         struct dlm_lkb *lkb;
3191         int error;
3192
3193         error = find_lkb(ls, ms->m_remid, &lkb);
3194         if (error) {
3195                 log_error(ls, "receive_unlock_reply no lkb");
3196                 return;
3197         }
3198         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3199
3200         _receive_unlock_reply(lkb, ms);
3201         dlm_put_lkb(lkb);
3202 }
3203
3204 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3205 {
3206         struct dlm_rsb *r = lkb->lkb_resource;
3207         int error;
3208
3209         hold_rsb(r);
3210         lock_rsb(r);
3211
3212         /* stub reply can happen with waiters_mutex held */
3213         error = remove_from_waiters_ms(lkb, ms);
3214         if (error)
3215                 goto out;
3216
3217         /* this is the value returned from do_cancel() on the master */
3218
3219         switch (ms->m_result) {
3220         case -DLM_ECANCEL:
3221                 receive_flags_reply(lkb, ms);
3222                 revert_lock_pc(r, lkb);
3223                 if (ms->m_result)
3224                         queue_cast(r, lkb, -DLM_ECANCEL);
3225                 break;
3226         case 0:
3227                 break;
3228         default:
3229                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3230                           lkb->lkb_id, ms->m_result);
3231         }
3232  out:
3233         unlock_rsb(r);
3234         put_rsb(r);
3235 }
3236
3237 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3238 {
3239         struct dlm_lkb *lkb;
3240         int error;
3241
3242         error = find_lkb(ls, ms->m_remid, &lkb);
3243         if (error) {
3244                 log_error(ls, "receive_cancel_reply no lkb");
3245                 return;
3246         }
3247         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3248
3249         _receive_cancel_reply(lkb, ms);
3250         dlm_put_lkb(lkb);
3251 }
3252
3253 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3254 {
3255         struct dlm_lkb *lkb;
3256         struct dlm_rsb *r;
3257         int error, ret_nodeid;
3258
3259         error = find_lkb(ls, ms->m_lkid, &lkb);
3260         if (error) {
3261                 log_error(ls, "receive_lookup_reply no lkb");
3262                 return;
3263         }
3264
3265         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3266            FIXME: will a non-zero error ever be returned? */
3267
3268         r = lkb->lkb_resource;
3269         hold_rsb(r);
3270         lock_rsb(r);
3271
3272         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3273         if (error)
3274                 goto out;
3275
3276         ret_nodeid = ms->m_nodeid;
3277         if (ret_nodeid == dlm_our_nodeid()) {
3278                 r->res_nodeid = 0;
3279                 ret_nodeid = 0;
3280                 r->res_first_lkid = 0;
3281         } else {
3282                 /* set_master() will copy res_nodeid to lkb_nodeid */
3283                 r->res_nodeid = ret_nodeid;
3284         }
3285
3286         if (is_overlap(lkb)) {
3287                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3288                           lkb->lkb_id, lkb->lkb_flags);
3289                 queue_cast_overlap(r, lkb);
3290                 unhold_lkb(lkb); /* undoes create_lkb() */
3291                 goto out_list;
3292         }
3293
3294         _request_lock(r, lkb);
3295
3296  out_list:
3297         if (!ret_nodeid)
3298                 process_lookup_list(r);
3299  out:
3300         unlock_rsb(r);
3301         put_rsb(r);
3302         dlm_put_lkb(lkb);
3303 }
3304
3305 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3306 {
3307         struct dlm_message *ms = (struct dlm_message *) hd;
3308         struct dlm_ls *ls;
3309         int error = 0;
3310
3311         if (!recovery)
3312                 dlm_message_in(ms);
3313
3314         ls = dlm_find_lockspace_global(hd->h_lockspace);
3315         if (!ls) {
3316                 log_print("drop message %d from %d for unknown lockspace %d",
3317                           ms->m_type, nodeid, hd->h_lockspace);
3318                 return -EINVAL;
3319         }
3320
3321         /* recovery may have just ended leaving a bunch of backed-up requests
3322            in the requestqueue; wait while dlm_recoverd clears them */
3323
3324         if (!recovery)
3325                 dlm_wait_requestqueue(ls);
3326
3327         /* recovery may have just started while there were a bunch of
3328            in-flight requests -- save them in requestqueue to be processed
3329            after recovery.  we can't let dlm_recvd block on the recovery
3330            lock.  if dlm_recoverd is calling this function to clear the
3331            requestqueue, it needs to be interrupted (-EINTR) if another
3332            recovery operation is starting. */
3333
3334         while (1) {
3335                 if (dlm_locking_stopped(ls)) {
3336                         if (recovery) {
3337                                 error = -EINTR;
3338                                 goto out;
3339                         }
3340                         error = dlm_add_requestqueue(ls, nodeid, hd);
3341                         if (error == -EAGAIN)
3342                                 continue;
3343                         else {
3344                                 error = -EINTR;
3345                                 goto out;
3346                         }
3347                 }
3348
3349                 if (lock_recovery_try(ls))
3350                         break;
3351                 schedule();
3352         }
3353
3354         switch (ms->m_type) {
3355
3356         /* messages sent to a master node */
3357
3358         case DLM_MSG_REQUEST:
3359                 receive_request(ls, ms);
3360                 break;
3361
3362         case DLM_MSG_CONVERT:
3363                 receive_convert(ls, ms);
3364                 break;
3365
3366         case DLM_MSG_UNLOCK:
3367                 receive_unlock(ls, ms);
3368                 break;
3369
3370         case DLM_MSG_CANCEL:
3371                 receive_cancel(ls, ms);
3372                 break;
3373
3374         /* messages sent from a master node (replies to above) */
3375
3376         case DLM_MSG_REQUEST_REPLY:
3377                 receive_request_reply(ls, ms);
3378                 break;
3379
3380         case DLM_MSG_CONVERT_REPLY:
3381                 receive_convert_reply(ls, ms);
3382                 break;
3383
3384         case DLM_MSG_UNLOCK_REPLY:
3385                 receive_unlock_reply(ls, ms);
3386                 break;
3387
3388         case DLM_MSG_CANCEL_REPLY:
3389                 receive_cancel_reply(ls, ms);
3390                 break;
3391
3392         /* messages sent from a master node (only two types of async msg) */
3393
3394         case DLM_MSG_GRANT:
3395                 receive_grant(ls, ms);
3396                 break;
3397
3398         case DLM_MSG_BAST:
3399                 receive_bast(ls, ms);
3400                 break;
3401
3402         /* messages sent to a dir node */
3403
3404         case DLM_MSG_LOOKUP:
3405                 receive_lookup(ls, ms);
3406                 break;
3407
3408         case DLM_MSG_REMOVE:
3409                 receive_remove(ls, ms);
3410                 break;
3411
3412         /* messages sent from a dir node (remove has no reply) */
3413
3414         case DLM_MSG_LOOKUP_REPLY:
3415                 receive_lookup_reply(ls, ms);
3416                 break;
3417
3418         /* other messages */
3419
3420         case DLM_MSG_PURGE:
3421                 receive_purge(ls, ms);
3422                 break;
3423
3424         default:
3425                 log_error(ls, "unknown message type %d", ms->m_type);
3426         }
3427
3428         unlock_recovery(ls);
3429  out:
3430         dlm_put_lockspace(ls);
3431         dlm_astd_wake();
3432         return error;
3433 }
3434
3435
3436 /*
3437  * Recovery related
3438  */
3439
3440 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3441 {
3442         if (middle_conversion(lkb)) {
3443                 hold_lkb(lkb);
3444                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3445                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3446                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3447                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3448
3449                 /* Same special case as in receive_rcom_lock_args() */
3450                 lkb->lkb_grmode = DLM_LOCK_IV;
3451                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3452                 unhold_lkb(lkb);
3453
3454         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3455                 lkb->lkb_flags |= DLM_IFL_RESEND;
3456         }
3457
3458         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3459            conversions are async; there's no reply from the remote master */
3460 }
3461
3462 /* A waiting lkb needs recovery if the master node has failed, or
3463    the master node is changing (only when no directory is used) */
3464
3465 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3466 {
3467         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3468                 return 1;
3469
3470         if (!dlm_no_directory(ls))
3471                 return 0;
3472
3473         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3474                 return 1;
3475
3476         return 0;
3477 }
3478
3479 /* Recovery for locks that are waiting for replies from nodes that are now
3480    gone.  We can just complete unlocks and cancels by faking a reply from the
3481    dead node.  Requests and up-conversions we flag to be resent after
3482    recovery.  Down-conversions can just be completed with a fake reply like
3483    unlocks.  Conversions between PR and CW need special attention. */
3484
3485 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3486 {
3487         struct dlm_lkb *lkb, *safe;
3488
3489         mutex_lock(&ls->ls_waiters_mutex);
3490
3491         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3492                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3493                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3494
3495                 /* all outstanding lookups, regardless of destination  will be
3496                    resent after recovery is done */
3497
3498                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3499                         lkb->lkb_flags |= DLM_IFL_RESEND;
3500                         continue;
3501                 }
3502
3503                 if (!waiter_needs_recovery(ls, lkb))
3504                         continue;
3505
3506                 switch (lkb->lkb_wait_type) {
3507
3508                 case DLM_MSG_REQUEST:
3509                         lkb->lkb_flags |= DLM_IFL_RESEND;
3510                         break;
3511
3512                 case DLM_MSG_CONVERT:
3513                         recover_convert_waiter(ls, lkb);
3514                         break;
3515
3516                 case DLM_MSG_UNLOCK:
3517                         hold_lkb(lkb);
3518                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3519                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3520                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3521                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3522                         dlm_put_lkb(lkb);
3523                         break;
3524
3525                 case DLM_MSG_CANCEL:
3526                         hold_lkb(lkb);
3527                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3528                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3529                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3530                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3531                         dlm_put_lkb(lkb);
3532                         break;
3533
3534                 default:
3535                         log_error(ls, "invalid lkb wait_type %d",
3536                                   lkb->lkb_wait_type);
3537                 }
3538                 schedule();
3539         }
3540         mutex_unlock(&ls->ls_waiters_mutex);
3541 }
3542
3543 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3544 {
3545         struct dlm_lkb *lkb;
3546         int found = 0;
3547
3548         mutex_lock(&ls->ls_waiters_mutex);
3549         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3550                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3551                         hold_lkb(lkb);
3552                         found = 1;
3553                         break;
3554                 }
3555         }
3556         mutex_unlock(&ls->ls_waiters_mutex);
3557
3558         if (!found)
3559                 lkb = NULL;
3560         return lkb;
3561 }
3562
3563 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3564    master or dir-node for r.  Processing the lkb may result in it being placed
3565    back on waiters. */
3566
3567 /* We do this after normal locking has been enabled and any saved messages
3568    (in requestqueue) have been processed.  We should be confident that at
3569    this point we won't get or process a reply to any of these waiting
3570    operations.  But, new ops may be coming in on the rsbs/locks here from
3571    userspace or remotely. */
3572
3573 /* there may have been an overlap unlock/cancel prior to recovery or after
3574    recovery.  if before, the lkb may still have a pos wait_count; if after, the
3575    overlap flag would just have been set and nothing new sent.  we can be
3576    confident here than any replies to either the initial op or overlap ops
3577    prior to recovery have been received. */
3578
3579 int dlm_recover_waiters_post(struct dlm_ls *ls)
3580 {
3581         struct dlm_lkb *lkb;
3582         struct dlm_rsb *r;
3583         int error = 0, mstype, err, oc, ou;
3584
3585         while (1) {
3586                 if (dlm_locking_stopped(ls)) {
3587                         log_debug(ls, "recover_waiters_post aborted");
3588                         error = -EINTR;
3589                         break;
3590                 }
3591
3592                 lkb = find_resend_waiter(ls);
3593                 if (!lkb)
3594                         break;
3595
3596                 r = lkb->lkb_resource;
3597                 hold_rsb(r);
3598                 lock_rsb(r);
3599
3600                 mstype = lkb->lkb_wait_type;
3601                 oc = is_overlap_cancel(lkb);
3602                 ou = is_overlap_unlock(lkb);
3603                 err = 0;
3604
3605                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3606                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3607
3608                 /* At this point we assume that we won't get a reply to any
3609                    previous op or overlap op on this lock.  First, do a big
3610                    remove_from_waiters() for all previous ops. */
3611
3612                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3613                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3614                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3615                 lkb->lkb_wait_type = 0;
3616                 lkb->lkb_wait_count = 0;
3617                 mutex_lock(&ls->ls_waiters_mutex);
3618                 list_del_init(&lkb->lkb_wait_reply);
3619                 mutex_unlock(&ls->ls_waiters_mutex);
3620                 unhold_lkb(lkb); /* for waiters list */
3621
3622                 if (oc || ou) {
3623                         /* do an unlock or cancel instead of resending */
3624                         switch (mstype) {
3625                         case DLM_MSG_LOOKUP:
3626                         case DLM_MSG_REQUEST:
3627                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3628                                                         -DLM_ECANCEL);
3629                                 unhold_lkb(lkb); /* undoes create_lkb() */
3630                                 break;
3631                         case DLM_MSG_CONVERT:
3632                                 if (oc) {
3633                                         queue_cast(r, lkb, -DLM_ECANCEL);
3634                                 } else {
3635                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3636                                         _unlock_lock(r, lkb);
3637                                 }
3638                                 break;
3639                         default:
3640                                 err = 1;
3641                         }
3642                 } else {
3643                         switch (mstype) {
3644                         case DLM_MSG_LOOKUP:
3645                         case DLM_MSG_REQUEST:
3646                                 _request_lock(r, lkb);
3647                                 if (is_master(r))
3648                                         confirm_master(r, 0);
3649                                 break;
3650                         case DLM_MSG_CONVERT:
3651                                 _convert_lock(r, lkb);
3652                                 break;
3653                         default:
3654                                 err = 1;
3655                         }
3656                 }
3657
3658                 if (err)
3659                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
3660                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3661                 unlock_rsb(r);
3662                 put_rsb(r);
3663                 dlm_put_lkb(lkb);
3664         }
3665
3666         return error;
3667 }
3668
3669 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3670                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3671 {
3672         struct dlm_ls *ls = r->res_ls;
3673         struct dlm_lkb *lkb, *safe;
3674
3675         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3676                 if (test(ls, lkb)) {
3677                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3678                         del_lkb(r, lkb);
3679                         /* this put should free the lkb */
3680                         if (!dlm_put_lkb(lkb))
3681                                 log_error(ls, "purged lkb not released");
3682                 }
3683         }
3684 }
3685
3686 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3687 {
3688         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3689 }
3690
3691 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3692 {
3693         return is_master_copy(lkb);
3694 }
3695
3696 static void purge_dead_locks(struct dlm_rsb *r)
3697 {
3698         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3699         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3700         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3701 }
3702
3703 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3704 {
3705         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3706         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3707         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3708 }
3709
3710 /* Get rid of locks held by nodes that are gone. */
3711
3712 int dlm_purge_locks(struct dlm_ls *ls)
3713 {
3714         struct dlm_rsb *r;
3715
3716         log_debug(ls, "dlm_purge_locks");
3717
3718         down_write(&ls->ls_root_sem);
3719         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3720                 hold_rsb(r);
3721                 lock_rsb(r);
3722                 if (is_master(r))
3723                         purge_dead_locks(r);
3724                 unlock_rsb(r);
3725                 unhold_rsb(r);
3726
3727                 schedule();
3728         }
3729         up_write(&ls->ls_root_sem);
3730
3731         return 0;
3732 }
3733
3734 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3735 {
3736         struct dlm_rsb *r, *r_ret = NULL;
3737
3738         read_lock(&ls->ls_rsbtbl[bucket].lock);
3739         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3740                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3741                         continue;
3742                 hold_rsb(r);
3743                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3744                 r_ret = r;
3745                 break;
3746         }
3747         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3748         return r_ret;
3749 }
3750
3751 void dlm_grant_after_purge(struct dlm_ls *ls)
3752 {
3753         struct dlm_rsb *r;
3754         int bucket = 0;
3755
3756         while (1) {
3757                 r = find_purged_rsb(ls, bucket);
3758                 if (!r) {
3759                         if (bucket == ls->ls_rsbtbl_size - 1)
3760                                 break;
3761                         bucket++;
3762                         continue;
3763                 }
3764                 lock_rsb(r);
3765                 if (is_master(r)) {
3766                         grant_pending_locks(r);
3767                         confirm_master(r, 0);
3768                 }
3769                 unlock_rsb(r);
3770                 put_rsb(r);
3771                 schedule();
3772         }
3773 }
3774
3775 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3776                                          uint32_t remid)
3777 {
3778         struct dlm_lkb *lkb;
3779
3780         list_for_each_entry(lkb, head, lkb_statequeue) {
3781                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3782                         return lkb;
3783         }
3784         return NULL;
3785 }
3786
3787 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3788                                     uint32_t remid)
3789 {
3790         struct dlm_lkb *lkb;
3791
3792         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3793         if (lkb)
3794                 return lkb;
3795         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3796         if (lkb)
3797                 return lkb;
3798         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3799         if (lkb)
3800                 return lkb;
3801         return NULL;
3802 }
3803
3804 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3805                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3806 {
3807         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3808         int lvblen;
3809
3810         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3811         lkb->lkb_ownpid = rl->rl_ownpid;
3812         lkb->lkb_remid = rl->rl_lkid;
3813         lkb->lkb_exflags = rl->rl_exflags;
3814         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3815         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3816         lkb->lkb_lvbseq = rl->rl_lvbseq;
3817         lkb->lkb_rqmode = rl->rl_rqmode;
3818         lkb->lkb_grmode = rl->rl_grmode;
3819         /* don't set lkb_status because add_lkb wants to itself */
3820
3821         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3822         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3823
3824         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3825                 lkb->lkb_lvbptr = allocate_lvb(ls);
3826                 if (!lkb->lkb_lvbptr)
3827                         return -ENOMEM;
3828                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3829                          sizeof(struct rcom_lock);
3830                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3831         }
3832
3833         /* Conversions between PR and CW (middle modes) need special handling.
3834            The real granted mode of these converting locks cannot be determined
3835            until all locks have been rebuilt on the rsb (recover_conversion) */
3836
3837         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3838                 rl->rl_status = DLM_LKSTS_CONVERT;
3839                 lkb->lkb_grmode = DLM_LOCK_IV;
3840                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3841         }
3842
3843         return 0;
3844 }
3845
3846 /* This lkb may have been recovered in a previous aborted recovery so we need
3847    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3848    If so we just send back a standard reply.  If not, we create a new lkb with
3849    the given values and send back our lkid.  We send back our lkid by sending
3850    back the rcom_lock struct we got but with the remid field filled in. */
3851
3852 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3853 {
3854         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3855         struct dlm_rsb *r;
3856         struct dlm_lkb *lkb;
3857         int error;
3858
3859         if (rl->rl_parent_lkid) {
3860                 error = -EOPNOTSUPP;
3861                 goto out;
3862         }
3863
3864         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3865         if (error)
3866                 goto out;
3867
3868         lock_rsb(r);
3869
3870         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3871         if (lkb) {
3872                 error = -EEXIST;
3873                 goto out_remid;
3874         }
3875
3876         error = create_lkb(ls, &lkb);
3877         if (error)
3878                 goto out_unlock;
3879
3880         error = receive_rcom_lock_args(ls, lkb, r, rc);
3881         if (error) {
3882                 __put_lkb(ls, lkb);
3883                 goto out_unlock;
3884         }
3885
3886         attach_lkb(r, lkb);
3887         add_lkb(r, lkb, rl->rl_status);
3888         error = 0;
3889
3890  out_remid:
3891         /* this is the new value returned to the lock holder for
3892            saving in its process-copy lkb */
3893         rl->rl_remid = lkb->lkb_id;
3894
3895  out_unlock:
3896         unlock_rsb(r);
3897         put_rsb(r);
3898  out:
3899         if (error)
3900                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3901         rl->rl_result = error;
3902         return error;
3903 }
3904
3905 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3906 {
3907         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3908         struct dlm_rsb *r;
3909         struct dlm_lkb *lkb;
3910         int error;
3911
3912         error = find_lkb(ls, rl->rl_lkid, &lkb);
3913         if (error) {
3914                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3915                 return error;
3916         }
3917
3918         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3919
3920         error = rl->rl_result;
3921
3922         r = lkb->lkb_resource;
3923         hold_rsb(r);
3924         lock_rsb(r);
3925
3926         switch (error) {
3927         case -EBADR:
3928                 /* There's a chance the new master received our lock before
3929                    dlm_recover_master_reply(), this wouldn't happen if we did
3930                    a barrier between recover_masters and recover_locks. */
3931                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3932                           (unsigned long)r, r->res_name);
3933                 dlm_send_rcom_lock(r, lkb);
3934                 goto out;
3935         case -EEXIST:
3936                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3937                 /* fall through */
3938         case 0:
3939                 lkb->lkb_remid = rl->rl_remid;
3940                 break;
3941         default:
3942                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3943                           error, lkb->lkb_id);
3944         }
3945
3946         /* an ack for dlm_recover_locks() which waits for replies from
3947            all the locks it sends to new masters */
3948         dlm_recovered_lock(r);
3949  out:
3950         unlock_rsb(r);
3951         put_rsb(r);
3952         dlm_put_lkb(lkb);
3953
3954         return 0;
3955 }
3956
3957 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3958                      int mode, uint32_t flags, void *name, unsigned int namelen,
3959                      uint32_t parent_lkid)
3960 {
3961         struct dlm_lkb *lkb;
3962         struct dlm_args args;
3963         int error;
3964
3965         lock_recovery(ls);
3966
3967         error = create_lkb(ls, &lkb);
3968         if (error) {
3969                 kfree(ua);
3970                 goto out;
3971         }
3972
3973         if (flags & DLM_LKF_VALBLK) {
3974                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3975                 if (!ua->lksb.sb_lvbptr) {
3976                         kfree(ua);
3977                         __put_lkb(ls, lkb);
3978                         error = -ENOMEM;
3979                         goto out;
3980                 }
3981         }
3982
3983         /* After ua is attached to lkb it will be freed by free_lkb().
3984            When DLM_IFL_USER is set, the dlm knows that this is a userspace
3985            lock and that lkb_astparam is the dlm_user_args structure. */
3986
3987         error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3988                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
3989         lkb->lkb_flags |= DLM_IFL_USER;
3990         ua->old_mode = DLM_LOCK_IV;
3991
3992         if (error) {
3993                 __put_lkb(ls, lkb);
3994                 goto out;
3995         }
3996
3997         error = request_lock(ls, lkb, name, namelen, &args);
3998
3999         switch (error) {
4000         case 0:
4001                 break;
4002         case -EINPROGRESS:
4003                 error = 0;
4004                 break;
4005         case -EAGAIN:
4006                 error = 0;
4007                 /* fall through */
4008         default:
4009                 __put_lkb(ls, lkb);
4010                 goto out;
4011         }
4012
4013         /* add this new lkb to the per-process list of locks */
4014         spin_lock(&ua->proc->locks_spin);
4015         hold_lkb(lkb);
4016         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4017         spin_unlock(&ua->proc->locks_spin);
4018  out:
4019         unlock_recovery(ls);
4020         return error;
4021 }
4022
4023 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4024                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4025 {
4026         struct dlm_lkb *lkb;
4027         struct dlm_args args;
4028         struct dlm_user_args *ua;
4029         int error;
4030
4031         lock_recovery(ls);
4032
4033         error = find_lkb(ls, lkid, &lkb);
4034         if (error)
4035                 goto out;
4036
4037         /* user can change the params on its lock when it converts it, or
4038            add an lvb that didn't exist before */
4039
4040         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4041
4042         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4043                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4044                 if (!ua->lksb.sb_lvbptr) {
4045                         error = -ENOMEM;
4046                         goto out_put;
4047                 }
4048         }
4049         if (lvb_in && ua->lksb.sb_lvbptr)
4050                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4051
4052         ua->castparam = ua_tmp->castparam;
4053         ua->castaddr = ua_tmp->castaddr;
4054         ua->bastparam = ua_tmp->bastparam;
4055         ua->bastaddr = ua_tmp->bastaddr;
4056         ua->user_lksb = ua_tmp->user_lksb;
4057         ua->old_mode = lkb->lkb_grmode;
4058
4059         error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4060                               ua, DLM_FAKE_USER_AST, &args);
4061         if (error)
4062                 goto out_put;
4063
4064         error = convert_lock(ls, lkb, &args);
4065
4066         if (error == -EINPROGRESS || error == -EAGAIN)
4067                 error = 0;
4068  out_put:
4069         dlm_put_lkb(lkb);
4070  out:
4071         unlock_recovery(ls);
4072         kfree(ua_tmp);
4073         return error;
4074 }
4075
4076 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4077                     uint32_t flags, uint32_t lkid, char *lvb_in)
4078 {
4079         struct dlm_lkb *lkb;
4080         struct dlm_args args;
4081         struct dlm_user_args *ua;
4082         int error;
4083
4084         lock_recovery(ls);
4085
4086         error = find_lkb(ls, lkid, &lkb);
4087         if (error)
4088                 goto out;
4089
4090         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4091
4092         if (lvb_in && ua->lksb.sb_lvbptr)
4093                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4094         ua->castparam = ua_tmp->castparam;
4095         ua->user_lksb = ua_tmp->user_lksb;
4096
4097         error = set_unlock_args(flags, ua, &args);
4098         if (error)
4099                 goto out_put;
4100
4101         error = unlock_lock(ls, lkb, &args);
4102
4103         if (error == -DLM_EUNLOCK)
4104                 error = 0;
4105         /* from validate_unlock_args() */
4106         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4107                 error = 0;
4108         if (error)
4109                 goto out_put;
4110
4111         spin_lock(&ua->proc->locks_spin);
4112         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4113         if (!list_empty(&lkb->lkb_ownqueue))
4114                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4115         spin_unlock(&ua->proc->locks_spin);
4116  out_put:
4117         dlm_put_lkb(lkb);
4118  out:
4119         unlock_recovery(ls);
4120         kfree(ua_tmp);
4121         return error;
4122 }
4123
4124 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4125                     uint32_t flags, uint32_t lkid)
4126 {
4127         struct dlm_lkb *lkb;
4128         struct dlm_args args;
4129         struct dlm_user_args *ua;
4130         int error;
4131
4132         lock_recovery(ls);
4133
4134         error = find_lkb(ls, lkid, &lkb);
4135         if (error)
4136                 goto out;
4137
4138         ua = (struct dlm_user_args *)lkb->lkb_astparam;
4139         ua->castparam = ua_tmp->castparam;
4140         ua->user_lksb = ua_tmp->user_lksb;
4141
4142         error = set_unlock_args(flags, ua, &args);
4143         if (error)
4144                 goto out_put;
4145
4146         error = cancel_lock(ls, lkb, &args);
4147
4148         if (error == -DLM_ECANCEL)
4149                 error = 0;
4150         /* from validate_unlock_args() */
4151         if (error == -EBUSY)
4152                 error = 0;
4153  out_put:
4154         dlm_put_lkb(lkb);
4155  out:
4156         unlock_recovery(ls);
4157         kfree(ua_tmp);
4158         return error;
4159 }
4160
4161 /* lkb's that are removed from the waiters list by revert are just left on the
4162    orphans list with the granted orphan locks, to be freed by purge */
4163
4164 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4165 {
4166         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4167         struct dlm_args args;
4168         int error;
4169
4170         hold_lkb(lkb);
4171         mutex_lock(&ls->ls_orphans_mutex);
4172         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4173         mutex_unlock(&ls->ls_orphans_mutex);
4174
4175         set_unlock_args(0, ua, &args);
4176
4177         error = cancel_lock(ls, lkb, &args);
4178         if (error == -DLM_ECANCEL)
4179                 error = 0;
4180         return error;
4181 }
4182
4183 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4184    Regardless of what rsb queue the lock is on, it's removed and freed. */
4185
4186 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4187 {
4188         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4189         struct dlm_args args;
4190         int error;
4191
4192         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4193
4194         error = unlock_lock(ls, lkb, &args);
4195         if (error == -DLM_EUNLOCK)
4196                 error = 0;
4197         return error;
4198 }
4199
4200 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4201    (which does lock_rsb) due to deadlock with receiving a message that does
4202    lock_rsb followed by dlm_user_add_ast() */
4203
4204 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4205                                      struct dlm_user_proc *proc)
4206 {
4207         struct dlm_lkb *lkb = NULL;
4208
4209         mutex_lock(&ls->ls_clear_proc_locks);
4210         if (list_empty(&proc->locks))
4211                 goto out;
4212
4213         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4214         list_del_init(&lkb->lkb_ownqueue);
4215
4216         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4217                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4218         else
4219                 lkb->lkb_flags |= DLM_IFL_DEAD;
4220  out:
4221         mutex_unlock(&ls->ls_clear_proc_locks);
4222         return lkb;
4223 }
4224
4225 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4226    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4227    which we clear here. */
4228
4229 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4230    list, and no more device_writes should add lkb's to proc->locks list; so we
4231    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4232    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4233    them ourself. */
4234
4235 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4236 {
4237         struct dlm_lkb *lkb, *safe;
4238
4239         lock_recovery(ls);
4240
4241         while (1) {
4242                 lkb = del_proc_lock(ls, proc);
4243                 if (!lkb)
4244                         break;
4245                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4246                         orphan_proc_lock(ls, lkb);
4247                 else
4248                         unlock_proc_lock(ls, lkb);
4249
4250                 /* this removes the reference for the proc->locks list
4251                    added by dlm_user_request, it may result in the lkb
4252                    being freed */
4253
4254                 dlm_put_lkb(lkb);
4255         }
4256
4257         mutex_lock(&ls->ls_clear_proc_locks);
4258
4259         /* in-progress unlocks */
4260         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4261                 list_del_init(&lkb->lkb_ownqueue);
4262                 lkb->lkb_flags |= DLM_IFL_DEAD;
4263                 dlm_put_lkb(lkb);
4264         }
4265
4266         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4267                 list_del(&lkb->lkb_astqueue);
4268                 dlm_put_lkb(lkb);
4269         }
4270
4271         mutex_unlock(&ls->ls_clear_proc_locks);
4272         unlock_recovery(ls);
4273 }
4274
4275 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4276 {
4277         struct dlm_lkb *lkb, *safe;
4278
4279         while (1) {
4280                 lkb = NULL;
4281                 spin_lock(&proc->locks_spin);
4282                 if (!list_empty(&proc->locks)) {
4283                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4284                                          lkb_ownqueue);
4285                         list_del_init(&lkb->lkb_ownqueue);
4286                 }
4287                 spin_unlock(&proc->locks_spin);
4288
4289                 if (!lkb)
4290                         break;
4291
4292                 lkb->lkb_flags |= DLM_IFL_DEAD;
4293                 unlock_proc_lock(ls, lkb);
4294                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4295         }
4296
4297         spin_lock(&proc->locks_spin);
4298         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4299                 list_del_init(&lkb->lkb_ownqueue);
4300                 lkb->lkb_flags |= DLM_IFL_DEAD;
4301                 dlm_put_lkb(lkb);
4302         }
4303         spin_unlock(&proc->locks_spin);
4304
4305         spin_lock(&proc->asts_spin);
4306         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4307                 list_del(&lkb->lkb_astqueue);
4308                 dlm_put_lkb(lkb);
4309         }
4310         spin_unlock(&proc->asts_spin);
4311 }
4312
4313 /* pid of 0 means purge all orphans */
4314
4315 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4316 {
4317         struct dlm_lkb *lkb, *safe;
4318
4319         mutex_lock(&ls->ls_orphans_mutex);
4320         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4321                 if (pid && lkb->lkb_ownpid != pid)
4322                         continue;
4323                 unlock_proc_lock(ls, lkb);
4324                 list_del_init(&lkb->lkb_ownqueue);
4325                 dlm_put_lkb(lkb);
4326         }
4327         mutex_unlock(&ls->ls_orphans_mutex);
4328 }
4329
4330 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4331 {
4332         struct dlm_message *ms;
4333         struct dlm_mhandle *mh;
4334         int error;
4335
4336         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4337                                 DLM_MSG_PURGE, &ms, &mh);
4338         if (error)
4339                 return error;
4340         ms->m_nodeid = nodeid;
4341         ms->m_pid = pid;
4342
4343         return send_message(mh, ms);
4344 }
4345
4346 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4347                    int nodeid, int pid)
4348 {
4349         int error = 0;
4350
4351         if (nodeid != dlm_our_nodeid()) {
4352                 error = send_purge(ls, nodeid, pid);
4353         } else {
4354                 lock_recovery(ls);
4355                 if (pid == current->pid)
4356                         purge_proc_locks(ls, proc);
4357                 else
4358                         do_purge(ls, nodeid, pid);
4359                 unlock_recovery(ls);
4360         }
4361         return error;
4362 }
4363