]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - drivers/misc/sgi-xp/xpc_channel.c
sgi-xp: enable building of XPC/XPNET on x86_64
[linux-2.6-omap-h63xx.git] / drivers / misc / sgi-xp / xpc_channel.c
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8
9 /*
10  * Cross Partition Communication (XPC) channel support.
11  *
12  *      This is the part of XPC that manages the channels and
13  *      sends/receives messages across them to/from other partitions.
14  *
15  */
16
17 #include <linux/device.h>
18 #include "xpc.h"
19
20 /*
21  * Process a connect message from a remote partition.
22  *
23  * Note: xpc_process_connect() is expecting to be called with the
24  * spin_lock_irqsave held and will leave it locked upon return.
25  */
26 static void
27 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
28 {
29         enum xp_retval ret;
30
31         DBUG_ON(!spin_is_locked(&ch->lock));
32
33         if (!(ch->flags & XPC_C_OPENREQUEST) ||
34             !(ch->flags & XPC_C_ROPENREQUEST)) {
35                 /* nothing more to do for now */
36                 return;
37         }
38         DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
39
40         if (!(ch->flags & XPC_C_SETUP)) {
41                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
42                 ret = xpc_allocate_msgqueues(ch);
43                 spin_lock_irqsave(&ch->lock, *irq_flags);
44
45                 if (ret != xpSuccess)
46                         XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
47
48                 ch->flags |= XPC_C_SETUP;
49
50                 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
51                         return;
52
53                 DBUG_ON(ch->local_msgqueue == NULL);
54                 DBUG_ON(ch->remote_msgqueue == NULL);
55         }
56
57         if (!(ch->flags & XPC_C_OPENREPLY)) {
58                 ch->flags |= XPC_C_OPENREPLY;
59                 xpc_send_chctl_openreply(ch, irq_flags);
60         }
61
62         if (!(ch->flags & XPC_C_ROPENREPLY))
63                 return;
64
65         DBUG_ON(ch->remote_msgqueue_pa == 0);
66
67         ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);    /* clear all else */
68
69         dev_info(xpc_chan, "channel %d to partition %d connected\n",
70                  ch->number, ch->partid);
71
72         spin_unlock_irqrestore(&ch->lock, *irq_flags);
73         xpc_create_kthreads(ch, 1, 0);
74         spin_lock_irqsave(&ch->lock, *irq_flags);
75 }
76
77 /*
78  * spin_lock_irqsave() is expected to be held on entry.
79  */
80 static void
81 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
82 {
83         struct xpc_partition *part = &xpc_partitions[ch->partid];
84         u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
85
86         DBUG_ON(!spin_is_locked(&ch->lock));
87
88         if (!(ch->flags & XPC_C_DISCONNECTING))
89                 return;
90
91         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
92
93         /* make sure all activity has settled down first */
94
95         if (atomic_read(&ch->kthreads_assigned) > 0 ||
96             atomic_read(&ch->references) > 0) {
97                 return;
98         }
99         DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
100                 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
101
102         if (part->act_state == XPC_P_DEACTIVATING) {
103                 /* can't proceed until the other side disengages from us */
104                 if (xpc_partition_engaged(ch->partid))
105                         return;
106
107         } else {
108
109                 /* as long as the other side is up do the full protocol */
110
111                 if (!(ch->flags & XPC_C_RCLOSEREQUEST))
112                         return;
113
114                 if (!(ch->flags & XPC_C_CLOSEREPLY)) {
115                         ch->flags |= XPC_C_CLOSEREPLY;
116                         xpc_send_chctl_closereply(ch, irq_flags);
117                 }
118
119                 if (!(ch->flags & XPC_C_RCLOSEREPLY))
120                         return;
121         }
122
123         /* wake those waiting for notify completion */
124         if (atomic_read(&ch->n_to_notify) > 0) {
125                 /* we do callout while holding ch->lock, callout can't block */
126                 xpc_notify_senders_of_disconnect(ch);
127         }
128
129         /* both sides are disconnected now */
130
131         if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
132                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
133                 xpc_disconnect_callout(ch, xpDisconnected);
134                 spin_lock_irqsave(&ch->lock, *irq_flags);
135         }
136
137         /* it's now safe to free the channel's message queues */
138         xpc_free_msgqueues(ch);
139
140         /*
141          * Mark the channel disconnected and clear all other flags, including
142          * XPC_C_SETUP (because of call to xpc_free_msgqueues()) but not
143          * including XPC_C_WDISCONNECT (if it was set).
144          */
145         ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
146
147         atomic_dec(&part->nchannels_active);
148
149         if (channel_was_connected) {
150                 dev_info(xpc_chan, "channel %d to partition %d disconnected, "
151                          "reason=%d\n", ch->number, ch->partid, ch->reason);
152         }
153
154         if (ch->flags & XPC_C_WDISCONNECT) {
155                 /* we won't lose the CPU since we're holding ch->lock */
156                 complete(&ch->wdisconnect_wait);
157         } else if (ch->delayed_chctl_flags) {
158                 if (part->act_state != XPC_P_DEACTIVATING) {
159                         /* time to take action on any delayed chctl flags */
160                         spin_lock(&part->chctl_lock);
161                         part->chctl.flags[ch->number] |=
162                             ch->delayed_chctl_flags;
163                         spin_unlock(&part->chctl_lock);
164                 }
165                 ch->delayed_chctl_flags = 0;
166         }
167 }
168
169 /*
170  * Process a change in the channel's remote connection state.
171  */
172 static void
173 xpc_process_openclose_chctl_flags(struct xpc_partition *part, int ch_number,
174                                   u8 chctl_flags)
175 {
176         unsigned long irq_flags;
177         struct xpc_openclose_args *args =
178             &part->remote_openclose_args[ch_number];
179         struct xpc_channel *ch = &part->channels[ch_number];
180         enum xp_retval reason;
181
182         spin_lock_irqsave(&ch->lock, irq_flags);
183
184 again:
185
186         if ((ch->flags & XPC_C_DISCONNECTED) &&
187             (ch->flags & XPC_C_WDISCONNECT)) {
188                 /*
189                  * Delay processing chctl flags until thread waiting disconnect
190                  * has had a chance to see that the channel is disconnected.
191                  */
192                 ch->delayed_chctl_flags |= chctl_flags;
193                 spin_unlock_irqrestore(&ch->lock, irq_flags);
194                 return;
195         }
196
197         if (chctl_flags & XPC_CHCTL_CLOSEREQUEST) {
198
199                 dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREQUEST (reason=%d) received "
200                         "from partid=%d, channel=%d\n", args->reason,
201                         ch->partid, ch->number);
202
203                 /*
204                  * If RCLOSEREQUEST is set, we're probably waiting for
205                  * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
206                  * with this RCLOSEREQUEST in the chctl_flags.
207                  */
208
209                 if (ch->flags & XPC_C_RCLOSEREQUEST) {
210                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
211                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
212                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
213                         DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
214
215                         DBUG_ON(!(chctl_flags & XPC_CHCTL_CLOSEREPLY));
216                         chctl_flags &= ~XPC_CHCTL_CLOSEREPLY;
217                         ch->flags |= XPC_C_RCLOSEREPLY;
218
219                         /* both sides have finished disconnecting */
220                         xpc_process_disconnect(ch, &irq_flags);
221                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
222                         goto again;
223                 }
224
225                 if (ch->flags & XPC_C_DISCONNECTED) {
226                         if (!(chctl_flags & XPC_CHCTL_OPENREQUEST)) {
227                                 if (part->chctl.flags[ch_number] &
228                                     XPC_CHCTL_OPENREQUEST) {
229
230                                         DBUG_ON(ch->delayed_chctl_flags != 0);
231                                         spin_lock(&part->chctl_lock);
232                                         part->chctl.flags[ch_number] |=
233                                             XPC_CHCTL_CLOSEREQUEST;
234                                         spin_unlock(&part->chctl_lock);
235                                 }
236                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
237                                 return;
238                         }
239
240                         XPC_SET_REASON(ch, 0, 0);
241                         ch->flags &= ~XPC_C_DISCONNECTED;
242
243                         atomic_inc(&part->nchannels_active);
244                         ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
245                 }
246
247                 chctl_flags &= ~(XPC_CHCTL_OPENREQUEST | XPC_CHCTL_OPENREPLY);
248
249                 /*
250                  * The meaningful CLOSEREQUEST connection state fields are:
251                  *      reason = reason connection is to be closed
252                  */
253
254                 ch->flags |= XPC_C_RCLOSEREQUEST;
255
256                 if (!(ch->flags & XPC_C_DISCONNECTING)) {
257                         reason = args->reason;
258                         if (reason <= xpSuccess || reason > xpUnknownReason)
259                                 reason = xpUnknownReason;
260                         else if (reason == xpUnregistering)
261                                 reason = xpOtherUnregistering;
262
263                         XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
264
265                         DBUG_ON(chctl_flags & XPC_CHCTL_CLOSEREPLY);
266                         spin_unlock_irqrestore(&ch->lock, irq_flags);
267                         return;
268                 }
269
270                 xpc_process_disconnect(ch, &irq_flags);
271         }
272
273         if (chctl_flags & XPC_CHCTL_CLOSEREPLY) {
274
275                 dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREPLY received from partid="
276                         "%d, channel=%d\n", ch->partid, ch->number);
277
278                 if (ch->flags & XPC_C_DISCONNECTED) {
279                         DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
280                         spin_unlock_irqrestore(&ch->lock, irq_flags);
281                         return;
282                 }
283
284                 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
285
286                 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
287                         if (part->chctl.flags[ch_number] &
288                             XPC_CHCTL_CLOSEREQUEST) {
289
290                                 DBUG_ON(ch->delayed_chctl_flags != 0);
291                                 spin_lock(&part->chctl_lock);
292                                 part->chctl.flags[ch_number] |=
293                                     XPC_CHCTL_CLOSEREPLY;
294                                 spin_unlock(&part->chctl_lock);
295                         }
296                         spin_unlock_irqrestore(&ch->lock, irq_flags);
297                         return;
298                 }
299
300                 ch->flags |= XPC_C_RCLOSEREPLY;
301
302                 if (ch->flags & XPC_C_CLOSEREPLY) {
303                         /* both sides have finished disconnecting */
304                         xpc_process_disconnect(ch, &irq_flags);
305                 }
306         }
307
308         if (chctl_flags & XPC_CHCTL_OPENREQUEST) {
309
310                 dev_dbg(xpc_chan, "XPC_CHCTL_OPENREQUEST (msg_size=%d, "
311                         "local_nentries=%d) received from partid=%d, "
312                         "channel=%d\n", args->msg_size, args->local_nentries,
313                         ch->partid, ch->number);
314
315                 if (part->act_state == XPC_P_DEACTIVATING ||
316                     (ch->flags & XPC_C_ROPENREQUEST)) {
317                         spin_unlock_irqrestore(&ch->lock, irq_flags);
318                         return;
319                 }
320
321                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
322                         ch->delayed_chctl_flags |= XPC_CHCTL_OPENREQUEST;
323                         spin_unlock_irqrestore(&ch->lock, irq_flags);
324                         return;
325                 }
326                 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
327                                        XPC_C_OPENREQUEST)));
328                 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
329                                      XPC_C_OPENREPLY | XPC_C_CONNECTED));
330
331                 /*
332                  * The meaningful OPENREQUEST connection state fields are:
333                  *      msg_size = size of channel's messages in bytes
334                  *      local_nentries = remote partition's local_nentries
335                  */
336                 if (args->msg_size == 0 || args->local_nentries == 0) {
337                         /* assume OPENREQUEST was delayed by mistake */
338                         spin_unlock_irqrestore(&ch->lock, irq_flags);
339                         return;
340                 }
341
342                 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
343                 ch->remote_nentries = args->local_nentries;
344
345                 if (ch->flags & XPC_C_OPENREQUEST) {
346                         if (args->msg_size != ch->msg_size) {
347                                 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
348                                                        &irq_flags);
349                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
350                                 return;
351                         }
352                 } else {
353                         ch->msg_size = args->msg_size;
354
355                         XPC_SET_REASON(ch, 0, 0);
356                         ch->flags &= ~XPC_C_DISCONNECTED;
357
358                         atomic_inc(&part->nchannels_active);
359                 }
360
361                 xpc_process_connect(ch, &irq_flags);
362         }
363
364         if (chctl_flags & XPC_CHCTL_OPENREPLY) {
365
366                 dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY (local_msgqueue_pa="
367                         "0x%lx, local_nentries=%d, remote_nentries=%d) "
368                         "received from partid=%d, channel=%d\n",
369                         (unsigned long)args->local_msgqueue_pa,
370                         args->local_nentries, args->remote_nentries,
371                         ch->partid, ch->number);
372
373                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
374                         spin_unlock_irqrestore(&ch->lock, irq_flags);
375                         return;
376                 }
377                 if (!(ch->flags & XPC_C_OPENREQUEST)) {
378                         XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
379                                                &irq_flags);
380                         spin_unlock_irqrestore(&ch->lock, irq_flags);
381                         return;
382                 }
383
384                 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
385                 DBUG_ON(ch->flags & XPC_C_CONNECTED);
386
387                 /*
388                  * The meaningful OPENREPLY connection state fields are:
389                  *      local_msgqueue_pa = physical address of remote
390                  *                          partition's local_msgqueue
391                  *      local_nentries = remote partition's local_nentries
392                  *      remote_nentries = remote partition's remote_nentries
393                  */
394                 DBUG_ON(args->local_msgqueue_pa == 0);
395                 DBUG_ON(args->local_nentries == 0);
396                 DBUG_ON(args->remote_nentries == 0);
397
398                 ch->flags |= XPC_C_ROPENREPLY;
399                 ch->remote_msgqueue_pa = args->local_msgqueue_pa;
400
401                 if (args->local_nentries < ch->remote_nentries) {
402                         dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
403                                 "remote_nentries=%d, old remote_nentries=%d, "
404                                 "partid=%d, channel=%d\n",
405                                 args->local_nentries, ch->remote_nentries,
406                                 ch->partid, ch->number);
407
408                         ch->remote_nentries = args->local_nentries;
409                 }
410                 if (args->remote_nentries < ch->local_nentries) {
411                         dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
412                                 "local_nentries=%d, old local_nentries=%d, "
413                                 "partid=%d, channel=%d\n",
414                                 args->remote_nentries, ch->local_nentries,
415                                 ch->partid, ch->number);
416
417                         ch->local_nentries = args->remote_nentries;
418                 }
419
420                 xpc_process_connect(ch, &irq_flags);
421         }
422
423         spin_unlock_irqrestore(&ch->lock, irq_flags);
424 }
425
426 /*
427  * Attempt to establish a channel connection to a remote partition.
428  */
429 static enum xp_retval
430 xpc_connect_channel(struct xpc_channel *ch)
431 {
432         unsigned long irq_flags;
433         struct xpc_registration *registration = &xpc_registrations[ch->number];
434
435         if (mutex_trylock(&registration->mutex) == 0)
436                 return xpRetry;
437
438         if (!XPC_CHANNEL_REGISTERED(ch->number)) {
439                 mutex_unlock(&registration->mutex);
440                 return xpUnregistered;
441         }
442
443         spin_lock_irqsave(&ch->lock, irq_flags);
444
445         DBUG_ON(ch->flags & XPC_C_CONNECTED);
446         DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
447
448         if (ch->flags & XPC_C_DISCONNECTING) {
449                 spin_unlock_irqrestore(&ch->lock, irq_flags);
450                 mutex_unlock(&registration->mutex);
451                 return ch->reason;
452         }
453
454         /* add info from the channel connect registration to the channel */
455
456         ch->kthreads_assigned_limit = registration->assigned_limit;
457         ch->kthreads_idle_limit = registration->idle_limit;
458         DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
459         DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
460         DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
461
462         ch->func = registration->func;
463         DBUG_ON(registration->func == NULL);
464         ch->key = registration->key;
465
466         ch->local_nentries = registration->nentries;
467
468         if (ch->flags & XPC_C_ROPENREQUEST) {
469                 if (registration->msg_size != ch->msg_size) {
470                         /* the local and remote sides aren't the same */
471
472                         /*
473                          * Because XPC_DISCONNECT_CHANNEL() can block we're
474                          * forced to up the registration sema before we unlock
475                          * the channel lock. But that's okay here because we're
476                          * done with the part that required the registration
477                          * sema. XPC_DISCONNECT_CHANNEL() requires that the
478                          * channel lock be locked and will unlock and relock
479                          * the channel lock as needed.
480                          */
481                         mutex_unlock(&registration->mutex);
482                         XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
483                                                &irq_flags);
484                         spin_unlock_irqrestore(&ch->lock, irq_flags);
485                         return xpUnequalMsgSizes;
486                 }
487         } else {
488                 ch->msg_size = registration->msg_size;
489
490                 XPC_SET_REASON(ch, 0, 0);
491                 ch->flags &= ~XPC_C_DISCONNECTED;
492
493                 atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
494         }
495
496         mutex_unlock(&registration->mutex);
497
498         /* initiate the connection */
499
500         ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
501         xpc_send_chctl_openrequest(ch, &irq_flags);
502
503         xpc_process_connect(ch, &irq_flags);
504
505         spin_unlock_irqrestore(&ch->lock, irq_flags);
506
507         return xpSuccess;
508 }
509
510 void
511 xpc_process_sent_chctl_flags(struct xpc_partition *part)
512 {
513         unsigned long irq_flags;
514         union xpc_channel_ctl_flags chctl;
515         struct xpc_channel *ch;
516         int ch_number;
517         u32 ch_flags;
518
519         chctl.all_flags = xpc_get_chctl_all_flags(part);
520
521         /*
522          * Initiate channel connections for registered channels.
523          *
524          * For each connected channel that has pending messages activate idle
525          * kthreads and/or create new kthreads as needed.
526          */
527
528         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
529                 ch = &part->channels[ch_number];
530
531                 /*
532                  * Process any open or close related chctl flags, and then deal
533                  * with connecting or disconnecting the channel as required.
534                  */
535
536                 if (chctl.flags[ch_number] & XPC_OPENCLOSE_CHCTL_FLAGS) {
537                         xpc_process_openclose_chctl_flags(part, ch_number,
538                                                         chctl.flags[ch_number]);
539                 }
540
541                 ch_flags = ch->flags;   /* need an atomic snapshot of flags */
542
543                 if (ch_flags & XPC_C_DISCONNECTING) {
544                         spin_lock_irqsave(&ch->lock, irq_flags);
545                         xpc_process_disconnect(ch, &irq_flags);
546                         spin_unlock_irqrestore(&ch->lock, irq_flags);
547                         continue;
548                 }
549
550                 if (part->act_state == XPC_P_DEACTIVATING)
551                         continue;
552
553                 if (!(ch_flags & XPC_C_CONNECTED)) {
554                         if (!(ch_flags & XPC_C_OPENREQUEST)) {
555                                 DBUG_ON(ch_flags & XPC_C_SETUP);
556                                 (void)xpc_connect_channel(ch);
557                         } else {
558                                 spin_lock_irqsave(&ch->lock, irq_flags);
559                                 xpc_process_connect(ch, &irq_flags);
560                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
561                         }
562                         continue;
563                 }
564
565                 /*
566                  * Process any message related chctl flags, this may involve
567                  * the activation of kthreads to deliver any pending messages
568                  * sent from the other partition.
569                  */
570
571                 if (chctl.flags[ch_number] & XPC_MSG_CHCTL_FLAGS)
572                         xpc_process_msg_chctl_flags(part, ch_number);
573         }
574 }
575
576 /*
577  * XPC's heartbeat code calls this function to inform XPC that a partition is
578  * going down.  XPC responds by tearing down the XPartition Communication
579  * infrastructure used for the just downed partition.
580  *
581  * XPC's heartbeat code will never call this function and xpc_partition_up()
582  * at the same time. Nor will it ever make multiple calls to either function
583  * at the same time.
584  */
585 void
586 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
587 {
588         unsigned long irq_flags;
589         int ch_number;
590         struct xpc_channel *ch;
591
592         dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
593                 XPC_PARTID(part), reason);
594
595         if (!xpc_part_ref(part)) {
596                 /* infrastructure for this partition isn't currently set up */
597                 return;
598         }
599
600         /* disconnect channels associated with the partition going down */
601
602         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
603                 ch = &part->channels[ch_number];
604
605                 xpc_msgqueue_ref(ch);
606                 spin_lock_irqsave(&ch->lock, irq_flags);
607
608                 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
609
610                 spin_unlock_irqrestore(&ch->lock, irq_flags);
611                 xpc_msgqueue_deref(ch);
612         }
613
614         xpc_wakeup_channel_mgr(part);
615
616         xpc_part_deref(part);
617 }
618
619 /*
620  * Called by XP at the time of channel connection registration to cause
621  * XPC to establish connections to all currently active partitions.
622  */
623 void
624 xpc_initiate_connect(int ch_number)
625 {
626         short partid;
627         struct xpc_partition *part;
628         struct xpc_channel *ch;
629
630         DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
631
632         for (partid = 0; partid < xp_max_npartitions; partid++) {
633                 part = &xpc_partitions[partid];
634
635                 if (xpc_part_ref(part)) {
636                         ch = &part->channels[ch_number];
637
638                         /*
639                          * Initiate the establishment of a connection on the
640                          * newly registered channel to the remote partition.
641                          */
642                         xpc_wakeup_channel_mgr(part);
643                         xpc_part_deref(part);
644                 }
645         }
646 }
647
648 void
649 xpc_connected_callout(struct xpc_channel *ch)
650 {
651         /* let the registerer know that a connection has been established */
652
653         if (ch->func != NULL) {
654                 dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
655                         "partid=%d, channel=%d\n", ch->partid, ch->number);
656
657                 ch->func(xpConnected, ch->partid, ch->number,
658                          (void *)(u64)ch->local_nentries, ch->key);
659
660                 dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
661                         "partid=%d, channel=%d\n", ch->partid, ch->number);
662         }
663 }
664
665 /*
666  * Called by XP at the time of channel connection unregistration to cause
667  * XPC to teardown all current connections for the specified channel.
668  *
669  * Before returning xpc_initiate_disconnect() will wait until all connections
670  * on the specified channel have been closed/torndown. So the caller can be
671  * assured that they will not be receiving any more callouts from XPC to the
672  * function they registered via xpc_connect().
673  *
674  * Arguments:
675  *
676  *      ch_number - channel # to unregister.
677  */
678 void
679 xpc_initiate_disconnect(int ch_number)
680 {
681         unsigned long irq_flags;
682         short partid;
683         struct xpc_partition *part;
684         struct xpc_channel *ch;
685
686         DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
687
688         /* initiate the channel disconnect for every active partition */
689         for (partid = 0; partid < xp_max_npartitions; partid++) {
690                 part = &xpc_partitions[partid];
691
692                 if (xpc_part_ref(part)) {
693                         ch = &part->channels[ch_number];
694                         xpc_msgqueue_ref(ch);
695
696                         spin_lock_irqsave(&ch->lock, irq_flags);
697
698                         if (!(ch->flags & XPC_C_DISCONNECTED)) {
699                                 ch->flags |= XPC_C_WDISCONNECT;
700
701                                 XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
702                                                        &irq_flags);
703                         }
704
705                         spin_unlock_irqrestore(&ch->lock, irq_flags);
706
707                         xpc_msgqueue_deref(ch);
708                         xpc_part_deref(part);
709                 }
710         }
711
712         xpc_disconnect_wait(ch_number);
713 }
714
715 /*
716  * To disconnect a channel, and reflect it back to all who may be waiting.
717  *
718  * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
719  * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
720  * xpc_disconnect_wait().
721  *
722  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
723  */
724 void
725 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
726                        enum xp_retval reason, unsigned long *irq_flags)
727 {
728         u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
729
730         DBUG_ON(!spin_is_locked(&ch->lock));
731
732         if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
733                 return;
734
735         DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
736
737         dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
738                 reason, line, ch->partid, ch->number);
739
740         XPC_SET_REASON(ch, reason, line);
741
742         ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
743         /* some of these may not have been set */
744         ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
745                        XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
746                        XPC_C_CONNECTING | XPC_C_CONNECTED);
747
748         xpc_send_chctl_closerequest(ch, irq_flags);
749
750         if (channel_was_connected)
751                 ch->flags |= XPC_C_WASCONNECTED;
752
753         spin_unlock_irqrestore(&ch->lock, *irq_flags);
754
755         /* wake all idle kthreads so they can exit */
756         if (atomic_read(&ch->kthreads_idle) > 0) {
757                 wake_up_all(&ch->idle_wq);
758
759         } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
760                    !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
761                 /* start a kthread that will do the xpDisconnecting callout */
762                 xpc_create_kthreads(ch, 1, 1);
763         }
764
765         /* wake those waiting to allocate an entry from the local msg queue */
766         if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
767                 wake_up(&ch->msg_allocate_wq);
768
769         spin_lock_irqsave(&ch->lock, *irq_flags);
770 }
771
772 void
773 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
774 {
775         /*
776          * Let the channel's registerer know that the channel is being
777          * disconnected. We don't want to do this if the registerer was never
778          * informed of a connection being made.
779          */
780
781         if (ch->func != NULL) {
782                 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
783                         "channel=%d\n", reason, ch->partid, ch->number);
784
785                 ch->func(reason, ch->partid, ch->number, NULL, ch->key);
786
787                 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
788                         "channel=%d\n", reason, ch->partid, ch->number);
789         }
790 }
791
792 /*
793  * Wait for a message entry to become available for the specified channel,
794  * but don't wait any longer than 1 jiffy.
795  */
796 enum xp_retval
797 xpc_allocate_msg_wait(struct xpc_channel *ch)
798 {
799         enum xp_retval ret;
800
801         if (ch->flags & XPC_C_DISCONNECTING) {
802                 DBUG_ON(ch->reason == xpInterrupted);
803                 return ch->reason;
804         }
805
806         atomic_inc(&ch->n_on_msg_allocate_wq);
807         ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
808         atomic_dec(&ch->n_on_msg_allocate_wq);
809
810         if (ch->flags & XPC_C_DISCONNECTING) {
811                 ret = ch->reason;
812                 DBUG_ON(ch->reason == xpInterrupted);
813         } else if (ret == 0) {
814                 ret = xpTimeout;
815         } else {
816                 ret = xpInterrupted;
817         }
818
819         return ret;
820 }
821
822 /*
823  * Send a message that contains the user's payload on the specified channel
824  * connected to the specified partition.
825  *
826  * NOTE that this routine can sleep waiting for a message entry to become
827  * available. To not sleep, pass in the XPC_NOWAIT flag.
828  *
829  * Once sent, this routine will not wait for the message to be received, nor
830  * will notification be given when it does happen.
831  *
832  * Arguments:
833  *
834  *      partid - ID of partition to which the channel is connected.
835  *      ch_number - channel # to send message on.
836  *      flags - see xp.h for valid flags.
837  *      payload - pointer to the payload which is to be sent.
838  *      payload_size - size of the payload in bytes.
839  */
840 enum xp_retval
841 xpc_initiate_send(short partid, int ch_number, u32 flags, void *payload,
842                   u16 payload_size)
843 {
844         struct xpc_partition *part = &xpc_partitions[partid];
845         enum xp_retval ret = xpUnknownReason;
846
847         dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
848                 partid, ch_number);
849
850         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
851         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
852         DBUG_ON(payload == NULL);
853
854         if (xpc_part_ref(part)) {
855                 ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
856                                    payload_size, 0, NULL, NULL);
857                 xpc_part_deref(part);
858         }
859
860         return ret;
861 }
862
863 /*
864  * Send a message that contains the user's payload on the specified channel
865  * connected to the specified partition.
866  *
867  * NOTE that this routine can sleep waiting for a message entry to become
868  * available. To not sleep, pass in the XPC_NOWAIT flag.
869  *
870  * This routine will not wait for the message to be sent or received.
871  *
872  * Once the remote end of the channel has received the message, the function
873  * passed as an argument to xpc_initiate_send_notify() will be called. This
874  * allows the sender to free up or re-use any buffers referenced by the
875  * message, but does NOT mean the message has been processed at the remote
876  * end by a receiver.
877  *
878  * If this routine returns an error, the caller's function will NOT be called.
879  *
880  * Arguments:
881  *
882  *      partid - ID of partition to which the channel is connected.
883  *      ch_number - channel # to send message on.
884  *      flags - see xp.h for valid flags.
885  *      payload - pointer to the payload which is to be sent.
886  *      payload_size - size of the payload in bytes.
887  *      func - function to call with asynchronous notification of message
888  *                receipt. THIS FUNCTION MUST BE NON-BLOCKING.
889  *      key - user-defined key to be passed to the function when it's called.
890  */
891 enum xp_retval
892 xpc_initiate_send_notify(short partid, int ch_number, u32 flags, void *payload,
893                          u16 payload_size, xpc_notify_func func, void *key)
894 {
895         struct xpc_partition *part = &xpc_partitions[partid];
896         enum xp_retval ret = xpUnknownReason;
897
898         dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
899                 partid, ch_number);
900
901         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
902         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
903         DBUG_ON(payload == NULL);
904         DBUG_ON(func == NULL);
905
906         if (xpc_part_ref(part)) {
907                 ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
908                                    payload_size, XPC_N_CALL, func, key);
909                 xpc_part_deref(part);
910         }
911         return ret;
912 }
913
914 /*
915  * Deliver a message to its intended recipient.
916  */
917 void
918 xpc_deliver_msg(struct xpc_channel *ch)
919 {
920         struct xpc_msg *msg;
921
922         msg = xpc_get_deliverable_msg(ch);
923         if (msg != NULL) {
924
925                 /*
926                  * This ref is taken to protect the payload itself from being
927                  * freed before the user is finished with it, which the user
928                  * indicates by calling xpc_initiate_received().
929                  */
930                 xpc_msgqueue_ref(ch);
931
932                 atomic_inc(&ch->kthreads_active);
933
934                 if (ch->func != NULL) {
935                         dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
936                                 "msg_number=%ld, partid=%d, channel=%d\n",
937                                 msg, (signed long)msg->number, ch->partid,
938                                 ch->number);
939
940                         /* deliver the message to its intended recipient */
941                         ch->func(xpMsgReceived, ch->partid, ch->number,
942                                  &msg->payload, ch->key);
943
944                         dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
945                                 "msg_number=%ld, partid=%d, channel=%d\n",
946                                 msg, (signed long)msg->number, ch->partid,
947                                 ch->number);
948                 }
949
950                 atomic_dec(&ch->kthreads_active);
951         }
952 }
953
954 /*
955  * Acknowledge receipt of a delivered message.
956  *
957  * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
958  * that sent the message.
959  *
960  * This function, although called by users, does not call xpc_part_ref() to
961  * ensure that the partition infrastructure is in place. It relies on the
962  * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
963  *
964  * Arguments:
965  *
966  *      partid - ID of partition to which the channel is connected.
967  *      ch_number - channel # message received on.
968  *      payload - pointer to the payload area allocated via
969  *                      xpc_initiate_send() or xpc_initiate_send_notify().
970  */
971 void
972 xpc_initiate_received(short partid, int ch_number, void *payload)
973 {
974         struct xpc_partition *part = &xpc_partitions[partid];
975         struct xpc_channel *ch;
976         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
977
978         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
979         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
980
981         ch = &part->channels[ch_number];
982         xpc_received_msg(ch, msg);
983
984         /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
985         xpc_msgqueue_deref(ch);
986 }