int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
 void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
 int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
 
 int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+                         u8 nodenum, u8 *real_master);
+int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                              struct dlm_lock_resource *res, u8 *real_master);
+
 
 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
                               struct dlm_lock_resource *res,
 
                                          res->lockname.len)) {
                        kick_thread = 1;
                        call_ast = 1;
+               } else {
+                       mlog(0, "%s: returning DLM_NORMAL to "
+                            "node %u for reco lock\n", dlm->name,
+                            lock->ml.node);
                }
        } else {
                /* for NOQUEUE request, unless we get the
                 * lock right away, return DLM_NOTQUEUED */
-               if (flags & LKM_NOQUEUE)
+               if (flags & LKM_NOQUEUE) {
                        status = DLM_NOTQUEUED;
-               else {
+                       if (dlm_is_recovery_lock(res->lockname.name,
+                                                res->lockname.len)) {
+                               mlog(0, "%s: returning NOTQUEUED to "
+                                    "node %u for reco lock\n", dlm->name,
+                                    lock->ml.node);
+                       }
+               } else {
                        dlm_lock_get(lock);
                        list_add_tail(&lock->list, &res->blocked);
                        kick_thread = 1;
 
 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
                                       struct dlm_lock_resource *res,
                                       u8 target);
+static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
+                                      struct dlm_lock_resource *res);
 
 
 int dlm_is_host_down(int errno)
        struct dlm_node_iter iter;
        unsigned int namelen;
        int tries = 0;
+       int bit, wait_on_recovery = 0;
 
        BUG_ON(!lockid);
 
                dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
                set_bit(dlm->node_num, mle->maybe_map);
                list_add(&mle->list, &dlm->master_list);
+
+               /* still holding the dlm spinlock, check the recovery map
+                * to see if there are any nodes that still need to be 
+                * considered.  these will not appear in the mle nodemap
+                * but they might own this lockres.  wait on them. */
+               bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
+               if (bit < O2NM_MAX_NODES) {
+                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+                            "recover before lock mastery can begin\n",
+                            dlm->name, namelen, (char *)lockid, bit);
+                       wait_on_recovery = 1;
+               }
        }
 
        /* at this point there is either a DLM_MLE_BLOCK or a
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
+       while (wait_on_recovery) {
+               /* any cluster changes that occurred after dropping the
+                * dlm spinlock would be detectable be a change on the mle,
+                * so we only need to clear out the recovery map once. */
+               if (dlm_is_recovery_lock(lockid, namelen)) {
+                       mlog(ML_NOTICE, "%s: recovery map is not empty, but "
+                            "must master $RECOVERY lock now\n", dlm->name);
+                       if (!dlm_pre_master_reco_lockres(dlm, res))
+                               wait_on_recovery = 0;
+                       else {
+                               mlog(0, "%s: waiting 500ms for heartbeat state "
+                                   "change\n", dlm->name);
+                               msleep(500);
+                       }
+                       continue;
+               } 
+
+               dlm_kick_recovery_thread(dlm);
+               msleep(100);
+               dlm_wait_for_recovery(dlm);
+
+               spin_lock(&dlm->spinlock);
+               bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
+               if (bit < O2NM_MAX_NODES) {
+                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+                            "recover before lock mastery can begin\n",
+                            dlm->name, namelen, (char *)lockid, bit);
+                       wait_on_recovery = 1;
+               } else
+                       wait_on_recovery = 0;
+               spin_unlock(&dlm->spinlock);
+       }
+
        /* must wait for lock to be mastered elsewhere */
        if (blocked)
                goto wait;
        mlog(0, "finished with dlm_assert_master_worker\n");
 }
 
+/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
+ * We cannot wait for node recovery to complete to begin mastering this
+ * lockres because this lockres is used to kick off recovery! ;-)
+ * So, do a pre-check on all living nodes to see if any of those nodes
+ * think that $RECOVERY is currently mastered by a dead node.  If so,
+ * we wait a short time to allow that node to get notified by its own
+ * heartbeat stack, then check again.  All $RECOVERY lock resources
+ * mastered by dead nodes are purged when the hearbeat callback is 
+ * fired, so we can know for sure that it is safe to continue once
+ * the node returns a live node or no node.  */
+static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
+                                      struct dlm_lock_resource *res)
+{
+       struct dlm_node_iter iter;
+       int nodenum;
+       int ret = 0;
+       u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+       spin_lock(&dlm->spinlock);
+       dlm_node_iter_init(dlm->domain_map, &iter);
+       spin_unlock(&dlm->spinlock);
+
+       while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+               /* do not send to self */
+               if (nodenum == dlm->node_num)
+                       continue;
+               ret = dlm_do_master_requery(dlm, res, nodenum, &master);
+               if (ret < 0) {
+                       mlog_errno(ret);
+                       if (!dlm_is_host_down(ret))
+                               BUG();
+                       /* host is down, so answer for that node would be
+                        * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
+               }
+
+               if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
+                       /* check to see if this master is in the recovery map */
+                       spin_lock(&dlm->spinlock);
+                       if (test_bit(master, dlm->recovery_map)) {
+                               mlog(ML_NOTICE, "%s: node %u has not seen "
+                                    "node %u go down yet, and thinks the "
+                                    "dead node is mastering the recovery "
+                                    "lock.  must wait.\n", dlm->name,
+                                    nodenum, master);
+                               ret = -EAGAIN;
+                       }
+                       spin_unlock(&dlm->spinlock);
+                       mlog(0, "%s: reco lock master is %u\n", dlm->name, 
+                            master);
+                       break;
+               }
+       }
+       return ret;
+}
+
 
 /*
  * DLM_MIGRATE_LOCKRES
 
 static int dlm_recovery_thread(void *data);
 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 static int dlm_do_recovery(struct dlm_ctxt *dlm);
 
 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
                                    u8 send_to,
                                    struct dlm_lock_resource *res,
                                    int total_locks);
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-                                     struct dlm_lock_resource *res,
-                                     u8 *real_master);
 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                                     struct dlm_lock_resource *res,
                                     struct dlm_migratable_lockres *mres);
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
-                                struct dlm_lock_resource *res,
-                                u8 nodenum, u8 *real_master);
 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
                                 u8 dead_node, u8 send_to);
  * RECOVERY THREAD
  */
 
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
 {
        /* wake the recovery thread
         * this will wake the reco thread in one of three places
 
 
 
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-                                     struct dlm_lock_resource *res,
-                                     u8 *real_master)
+int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                              struct dlm_lock_resource *res, u8 *real_master)
 {
        struct dlm_node_iter iter;
        int nodenum;
                ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
                if (ret < 0) {
                        mlog_errno(ret);
-                       BUG();
-                       /* TODO: need to figure a way to restart this */
+                       if (!dlm_is_host_down(ret))
+                               BUG();
+                       /* host is down, so answer for that node would be
+                        * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
                }
                if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
                        mlog(0, "lock master is %u\n", *real_master);
 }
 
 
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
-                                struct dlm_lock_resource *res,
-                                u8 nodenum, u8 *real_master)
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+                         u8 nodenum, u8 *real_master)
 {
        int ret = -EINVAL;
        struct dlm_master_requery req;
                                } else
                                        continue;
 
+                               if (!list_empty(&res->recovering)) {
+                                       mlog(0, "%s:%.*s: lockres was "
+                                            "marked RECOVERING, owner=%u\n",
+                                            dlm->name, res->lockname.len,
+                                            res->lockname.name, res->owner);
+                                       list_del_init(&res->recovering);
+                               }
                                spin_lock(&res->spinlock);
                                dlm_change_lockres_owner(dlm, res, new_master);
                                res->state &= ~DLM_LOCK_RES_RECOVERING;
                        mlog(0, "%u not in domain/live_nodes map "
                             "so setting it in reco map manually\n",
                             br->dead_node);
-               set_bit(br->dead_node, dlm->recovery_map);
+               /* force the recovery cleanup in __dlm_hb_node_down
+                * both of these will be cleared in a moment */
+               set_bit(br->dead_node, dlm->domain_map);
+               set_bit(br->dead_node, dlm->live_nodes_map);
                __dlm_hb_node_down(dlm, br->dead_node);
        }
        spin_unlock(&dlm->spinlock);