]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blobdiff - arch/ia64/sn/kernel/xpc_channel.c
[IA64-SGI] get XPC to cleanly disengage from remote memory references
[linux-2.6-omap-h63xx.git] / arch / ia64 / sn / kernel / xpc_channel.c
index 94698bea7be045e7f2aec0430bb7a2e046907a66..195ac1b8e2620e06c06d9cc89f7e3c50a689c35f 100644 (file)
@@ -57,6 +57,7 @@ xpc_initialize_channels(struct xpc_partition *part, partid_t partid)
 
                spin_lock_init(&ch->lock);
                sema_init(&ch->msg_to_pull_sema, 1);    /* mutex */
+               sema_init(&ch->wdisconnect_sema, 0);    /* event wait */
 
                atomic_set(&ch->n_on_msg_allocate_wq, 0);
                init_waitqueue_head(&ch->msg_allocate_wq);
@@ -166,6 +167,7 @@ xpc_setup_infrastructure(struct xpc_partition *part)
        xpc_initialize_channels(part, partid);
 
        atomic_set(&part->nchannels_active, 0);
+       atomic_set(&part->nchannels_engaged, 0);
 
 
        /* local_IPI_amo were set to 0 by an earlier memset() */
@@ -555,8 +557,6 @@ xpc_allocate_msgqueues(struct xpc_channel *ch)
                sema_init(&ch->notify_queue[i].sema, 0);
        }
 
-       sema_init(&ch->teardown_sema, 0);       /* event wait */
-
        spin_lock_irqsave(&ch->lock, irq_flags);
        ch->flags |= XPC_C_SETUP;
        spin_unlock_irqrestore(&ch->lock, irq_flags);
@@ -625,6 +625,55 @@ xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
 }
 
 
+/*
+ * Notify those who wanted to be notified upon delivery of their message.
+ */
+static void
+xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put)
+{
+       struct xpc_notify *notify;
+       u8 notify_type;
+       s64 get = ch->w_remote_GP.get - 1;
+
+
+       while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
+
+               notify = &ch->notify_queue[get % ch->local_nentries];
+
+               /*
+                * See if the notify entry indicates it was associated with
+                * a message who's sender wants to be notified. It is possible
+                * that it is, but someone else is doing or has done the
+                * notification.
+                */
+               notify_type = notify->type;
+               if (notify_type == 0 ||
+                               cmpxchg(&notify->type, notify_type, 0) !=
+                                                               notify_type) {
+                       continue;
+               }
+
+               DBUG_ON(notify_type != XPC_N_CALL);
+
+               atomic_dec(&ch->n_to_notify);
+
+               if (notify->func != NULL) {
+                       dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
+                               "msg_number=%ld, partid=%d, channel=%d\n",
+                               (void *) notify, get, ch->partid, ch->number);
+
+                       notify->func(reason, ch->partid, ch->number,
+                                                               notify->key);
+
+                       dev_dbg(xpc_chan, "notify->func() returned, "
+                               "notify=0x%p, msg_number=%ld, partid=%d, "
+                               "channel=%d\n", (void *) notify, get,
+                               ch->partid, ch->number);
+               }
+       }
+}
+
+
 /*
  * Free up message queues and other stuff that were allocated for the specified
  * channel.
@@ -669,9 +718,6 @@ xpc_free_msgqueues(struct xpc_channel *ch)
                ch->remote_msgqueue = NULL;
                kfree(ch->notify_queue);
                ch->notify_queue = NULL;
-
-               /* in case someone is waiting for the teardown to complete */
-               up(&ch->teardown_sema);
        }
 }
 
@@ -683,7 +729,7 @@ static void
 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
 {
        struct xpc_partition *part = &xpc_partitions[ch->partid];
-       u32 ch_flags = ch->flags;
+       u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
 
 
        DBUG_ON(!spin_is_locked(&ch->lock));
@@ -701,12 +747,13 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
        }
        DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
 
-       /* it's now safe to free the channel's message queues */
-
-       xpc_free_msgqueues(ch);
-       DBUG_ON(ch->flags & XPC_C_SETUP);
+       if (part->act_state == XPC_P_DEACTIVATING) {
+               /* can't proceed until the other side disengages from us */
+               if (xpc_partition_engaged(1UL << ch->partid)) {
+                       return;
+               }
 
-       if (part->act_state != XPC_P_DEACTIVATING) {
+       } else {
 
                /* as long as the other side is up do the full protocol */
 
@@ -724,16 +771,33 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
                }
        }
 
+       /* wake those waiting for notify completion */
+       if (atomic_read(&ch->n_to_notify) > 0) {
+               /* >>> we do callout while holding ch->lock */
+               xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
+       }
+
        /* both sides are disconnected now */
 
-       ch->flags = XPC_C_DISCONNECTED; /* clear all flags, but this one */
+       /* it's now safe to free the channel's message queues */
+       xpc_free_msgqueues(ch);
+
+       /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
+       ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
 
        atomic_dec(&part->nchannels_active);
 
-       if (ch_flags & XPC_C_WASCONNECTED) {
+       if (channel_was_connected) {
                dev_info(xpc_chan, "channel %d to partition %d disconnected, "
                        "reason=%d\n", ch->number, ch->partid, ch->reason);
        }
+
+       /* wake the thread that is waiting for this channel to disconnect */
+       if (ch->flags & XPC_C_WDISCONNECT) {
+               spin_unlock_irqrestore(&ch->lock, *irq_flags);
+               up(&ch->wdisconnect_sema);
+               spin_lock_irqsave(&ch->lock, *irq_flags);
+       }
 }
 
 
@@ -764,7 +828,7 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
                /*
                 * If RCLOSEREQUEST is set, we're probably waiting for
                 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
-                * with this RCLOSEQREUQEST in the IPI_flags.
+                * with this RCLOSEREQUEST in the IPI_flags.
                 */
 
                if (ch->flags & XPC_C_RCLOSEREQUEST) {
@@ -852,7 +916,7 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
                        "channel=%d\n", args->msg_size, args->local_nentries,
                        ch->partid, ch->number);
 
-               if ((ch->flags & XPC_C_DISCONNECTING) ||
+               if ((ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) ||
                                        part->act_state == XPC_P_DEACTIVATING) {
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
                        return;
@@ -1039,55 +1103,6 @@ xpc_connect_channel(struct xpc_channel *ch)
 }
 
 
-/*
- * Notify those who wanted to be notified upon delivery of their message.
- */
-static void
-xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put)
-{
-       struct xpc_notify *notify;
-       u8 notify_type;
-       s64 get = ch->w_remote_GP.get - 1;
-
-
-       while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
-
-               notify = &ch->notify_queue[get % ch->local_nentries];
-
-               /*
-                * See if the notify entry indicates it was associated with
-                * a message who's sender wants to be notified. It is possible
-                * that it is, but someone else is doing or has done the
-                * notification.
-                */
-               notify_type = notify->type;
-               if (notify_type == 0 ||
-                               cmpxchg(&notify->type, notify_type, 0) !=
-                                                               notify_type) {
-                       continue;
-               }
-
-               DBUG_ON(notify_type != XPC_N_CALL);
-
-               atomic_dec(&ch->n_to_notify);
-
-               if (notify->func != NULL) {
-                       dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
-                               "msg_number=%ld, partid=%d, channel=%d\n",
-                               (void *) notify, get, ch->partid, ch->number);
-
-                       notify->func(reason, ch->partid, ch->number,
-                                                               notify->key);
-
-                       dev_dbg(xpc_chan, "notify->func() returned, "
-                               "notify=0x%p, msg_number=%ld, partid=%d, "
-                               "channel=%d\n", (void *) notify, get,
-                               ch->partid, ch->number);
-               }
-       }
-}
-
-
 /*
  * Clear some of the msg flags in the local message queue.
  */
@@ -1240,6 +1255,7 @@ xpc_process_channel_activity(struct xpc_partition *part)
        u64 IPI_amo, IPI_flags;
        struct xpc_channel *ch;
        int ch_number;
+       u32 ch_flags;
 
 
        IPI_amo = xpc_get_IPI_flags(part);
@@ -1266,8 +1282,9 @@ xpc_process_channel_activity(struct xpc_partition *part)
                        xpc_process_openclose_IPI(part, ch_number, IPI_flags);
                }
 
+               ch_flags = ch->flags;   /* need an atomic snapshot of flags */
 
-               if (ch->flags & XPC_C_DISCONNECTING) {
+               if (ch_flags & XPC_C_DISCONNECTING) {
                        spin_lock_irqsave(&ch->lock, irq_flags);
                        xpc_process_disconnect(ch, &irq_flags);
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
@@ -1278,9 +1295,9 @@ xpc_process_channel_activity(struct xpc_partition *part)
                        continue;
                }
 
-               if (!(ch->flags & XPC_C_CONNECTED)) {
-                       if (!(ch->flags & XPC_C_OPENREQUEST)) {
-                               DBUG_ON(ch->flags & XPC_C_SETUP);
+               if (!(ch_flags & XPC_C_CONNECTED)) {
+                       if (!(ch_flags & XPC_C_OPENREQUEST)) {
+                               DBUG_ON(ch_flags & XPC_C_SETUP);
                                (void) xpc_connect_channel(ch);
                        } else {
                                spin_lock_irqsave(&ch->lock, irq_flags);
@@ -1305,8 +1322,8 @@ xpc_process_channel_activity(struct xpc_partition *part)
 
 
 /*
- * XPC's heartbeat code calls this function to inform XPC that a partition has
- * gone down.  XPC responds by tearing down the XPartition Communication
+ * XPC's heartbeat code calls this function to inform XPC that a partition is
+ * going down.  XPC responds by tearing down the XPartition Communication
  * infrastructure used for the just downed partition.
  *
  * XPC's heartbeat code will never call this function and xpc_partition_up()
@@ -1314,7 +1331,7 @@ xpc_process_channel_activity(struct xpc_partition *part)
  * at the same time.
  */
 void
-xpc_partition_down(struct xpc_partition *part, enum xpc_retval reason)
+xpc_partition_going_down(struct xpc_partition *part, enum xpc_retval reason)
 {
        unsigned long irq_flags;
        int ch_number;
@@ -1330,12 +1347,11 @@ xpc_partition_down(struct xpc_partition *part, enum xpc_retval reason)
        }
 
 
-       /* disconnect all channels associated with the downed partition */
+       /* disconnect channels associated with the partition going down */
 
        for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
                ch = &part->channels[ch_number];
 
-
                xpc_msgqueue_ref(ch);
                spin_lock_irqsave(&ch->lock, irq_flags);
 
@@ -1370,6 +1386,7 @@ xpc_teardown_infrastructure(struct xpc_partition *part)
         * this partition.
         */
 
+       DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
        DBUG_ON(atomic_read(&part->nchannels_active) != 0);
        DBUG_ON(part->setup_state != XPC_P_SETUP);
        part->setup_state = XPC_P_WTEARDOWN;
@@ -1506,8 +1523,12 @@ xpc_initiate_disconnect(int ch_number)
 
                        spin_lock_irqsave(&ch->lock, irq_flags);
 
-                       XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering,
+                       if (!(ch->flags & XPC_C_DISCONNECTED)) {
+                               ch->flags |= XPC_C_WDISCONNECT;
+
+                               XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering,
                                                                &irq_flags);
+                       }
 
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
 
@@ -1523,8 +1544,9 @@ xpc_initiate_disconnect(int ch_number)
 /*
  * To disconnect a channel, and reflect it back to all who may be waiting.
  *
- * >>> An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
- * >>> xpc_free_msgqueues().
+ * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
+ * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
+ * xpc_disconnect_wait().
  *
  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
  */
@@ -1532,7 +1554,7 @@ void
 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
                        enum xpc_retval reason, unsigned long *irq_flags)
 {
-       u32 flags;
+       u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
 
 
        DBUG_ON(!spin_is_locked(&ch->lock));
@@ -1547,61 +1569,53 @@ xpc_disconnect_channel(const int line, struct xpc_channel *ch,
 
        XPC_SET_REASON(ch, reason, line);
 
-       flags = ch->flags;
+       ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
        /* some of these may not have been set */
        ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
                        XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
                        XPC_C_CONNECTING | XPC_C_CONNECTED);
 
-       ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
        xpc_IPI_send_closerequest(ch, irq_flags);
 
-       if (flags & XPC_C_CONNECTED) {
+       if (channel_was_connected) {
                ch->flags |= XPC_C_WASCONNECTED;
        }
 
+       spin_unlock_irqrestore(&ch->lock, *irq_flags);
+
+       /* wake all idle kthreads so they can exit */
        if (atomic_read(&ch->kthreads_idle) > 0) {
-               /* wake all idle kthreads so they can exit */
                wake_up_all(&ch->idle_wq);
        }
 
-       spin_unlock_irqrestore(&ch->lock, *irq_flags);
-
-
        /* wake those waiting to allocate an entry from the local msg queue */
-
        if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) {
                wake_up(&ch->msg_allocate_wq);
        }
 
-       /* wake those waiting for notify completion */
-
-       if (atomic_read(&ch->n_to_notify) > 0) {
-               xpc_notify_senders(ch, reason, ch->w_local_GP.put);
-       }
-
        spin_lock_irqsave(&ch->lock, *irq_flags);
 }
 
 
 void
-xpc_disconnected_callout(struct xpc_channel *ch)
+xpc_disconnecting_callout(struct xpc_channel *ch)
 {
        /*
-        * Let the channel's registerer know that the channel is now
+        * Let the channel's registerer know that the channel is being
         * disconnected. We don't want to do this if the registerer was never
-        * informed of a connection being made, unless the disconnect was for
-        * abnormal reasons.
+        * informed of a connection being made.
         */
 
        if (ch->func != NULL) {
-               dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
-                       "channel=%d\n", ch->reason, ch->partid, ch->number);
+               dev_dbg(xpc_chan, "ch->func() called, reason=xpcDisconnecting,"
+                       " partid=%d, channel=%d\n", ch->partid, ch->number);
 
-               ch->func(ch->reason, ch->partid, ch->number, NULL, ch->key);
+               ch->func(xpcDisconnecting, ch->partid, ch->number, NULL,
+                                                               ch->key);
 
-               dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
-                       "channel=%d\n", ch->reason, ch->partid, ch->number);
+               dev_dbg(xpc_chan, "ch->func() returned, reason="
+                       "xpcDisconnecting, partid=%d, channel=%d\n",
+                       ch->partid, ch->number);
        }
 }
 
@@ -1848,7 +1862,7 @@ xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
                        xpc_notify_func func, void *key)
 {
        enum xpc_retval ret = xpcSuccess;
-       struct xpc_notify *notify = NULL;   // >>> to keep the compiler happy!!
+       struct xpc_notify *notify = notify;
        s64 put, msg_number = msg->number;