]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - drivers/misc/sgi-xp/xpc_channel.c
sgi-xp: isolate xpc_vars_part structure to sn2 only
[linux-2.6-omap-h63xx.git] / drivers / misc / sgi-xp / xpc_channel.c
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8
9 /*
10  * Cross Partition Communication (XPC) channel support.
11  *
12  *      This is the part of XPC that manages the channels and
13  *      sends/receives messages across them to/from other partitions.
14  *
15  */
16
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/sn_sal.h>
25 #include "xpc.h"
26
27 /*
28  * Guarantee that the kzalloc'd memory is cacheline aligned.
29  */
30 void *
31 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
32 {
33         /* see if kzalloc will give us cachline aligned memory by default */
34         *base = kzalloc(size, flags);
35         if (*base == NULL)
36                 return NULL;
37
38         if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
39                 return *base;
40
41         kfree(*base);
42
43         /* nope, we'll have to do it ourselves */
44         *base = kzalloc(size + L1_CACHE_BYTES, flags);
45         if (*base == NULL)
46                 return NULL;
47
48         return (void *)L1_CACHE_ALIGN((u64)*base);
49 }
50
51 /*
52  * Allocate the local message queue and the notify queue.
53  */
54 static enum xp_retval
55 xpc_allocate_local_msgqueue(struct xpc_channel *ch)
56 {
57         unsigned long irq_flags;
58         int nentries;
59         size_t nbytes;
60
61         for (nentries = ch->local_nentries; nentries > 0; nentries--) {
62
63                 nbytes = nentries * ch->msg_size;
64                 ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
65                                                                    GFP_KERNEL,
66                                                       &ch->local_msgqueue_base);
67                 if (ch->local_msgqueue == NULL)
68                         continue;
69
70                 nbytes = nentries * sizeof(struct xpc_notify);
71                 ch->notify_queue = kzalloc(nbytes, GFP_KERNEL);
72                 if (ch->notify_queue == NULL) {
73                         kfree(ch->local_msgqueue_base);
74                         ch->local_msgqueue = NULL;
75                         continue;
76                 }
77
78                 spin_lock_irqsave(&ch->lock, irq_flags);
79                 if (nentries < ch->local_nentries) {
80                         dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
81                                 "partid=%d, channel=%d\n", nentries,
82                                 ch->local_nentries, ch->partid, ch->number);
83
84                         ch->local_nentries = nentries;
85                 }
86                 spin_unlock_irqrestore(&ch->lock, irq_flags);
87                 return xpSuccess;
88         }
89
90         dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
91                 "queue, partid=%d, channel=%d\n", ch->partid, ch->number);
92         return xpNoMemory;
93 }
94
95 /*
96  * Allocate the cached remote message queue.
97  */
98 static enum xp_retval
99 xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
100 {
101         unsigned long irq_flags;
102         int nentries;
103         size_t nbytes;
104
105         DBUG_ON(ch->remote_nentries <= 0);
106
107         for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
108
109                 nbytes = nentries * ch->msg_size;
110                 ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
111                                                                     GFP_KERNEL,
112                                                      &ch->remote_msgqueue_base);
113                 if (ch->remote_msgqueue == NULL)
114                         continue;
115
116                 spin_lock_irqsave(&ch->lock, irq_flags);
117                 if (nentries < ch->remote_nentries) {
118                         dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
119                                 "partid=%d, channel=%d\n", nentries,
120                                 ch->remote_nentries, ch->partid, ch->number);
121
122                         ch->remote_nentries = nentries;
123                 }
124                 spin_unlock_irqrestore(&ch->lock, irq_flags);
125                 return xpSuccess;
126         }
127
128         dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
129                 "partid=%d, channel=%d\n", ch->partid, ch->number);
130         return xpNoMemory;
131 }
132
133 /*
134  * Allocate message queues and other stuff associated with a channel.
135  *
136  * Note: Assumes all of the channel sizes are filled in.
137  */
138 static enum xp_retval
139 xpc_allocate_msgqueues(struct xpc_channel *ch)
140 {
141         unsigned long irq_flags;
142         enum xp_retval ret;
143
144         DBUG_ON(ch->flags & XPC_C_SETUP);
145
146         ret = xpc_allocate_local_msgqueue(ch);
147         if (ret != xpSuccess)
148                 return ret;
149
150         ret = xpc_allocate_remote_msgqueue(ch);
151         if (ret != xpSuccess) {
152                 kfree(ch->local_msgqueue_base);
153                 ch->local_msgqueue = NULL;
154                 kfree(ch->notify_queue);
155                 ch->notify_queue = NULL;
156                 return ret;
157         }
158
159         spin_lock_irqsave(&ch->lock, irq_flags);
160         ch->flags |= XPC_C_SETUP;
161         spin_unlock_irqrestore(&ch->lock, irq_flags);
162
163         return xpSuccess;
164 }
165
166 /*
167  * Process a connect message from a remote partition.
168  *
169  * Note: xpc_process_connect() is expecting to be called with the
170  * spin_lock_irqsave held and will leave it locked upon return.
171  */
172 static void
173 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
174 {
175         enum xp_retval ret;
176
177         DBUG_ON(!spin_is_locked(&ch->lock));
178
179         if (!(ch->flags & XPC_C_OPENREQUEST) ||
180             !(ch->flags & XPC_C_ROPENREQUEST)) {
181                 /* nothing more to do for now */
182                 return;
183         }
184         DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
185
186         if (!(ch->flags & XPC_C_SETUP)) {
187                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
188                 ret = xpc_allocate_msgqueues(ch);
189                 spin_lock_irqsave(&ch->lock, *irq_flags);
190
191                 if (ret != xpSuccess)
192                         XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
193
194                 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
195                         return;
196
197                 DBUG_ON(!(ch->flags & XPC_C_SETUP));
198                 DBUG_ON(ch->local_msgqueue == NULL);
199                 DBUG_ON(ch->remote_msgqueue == NULL);
200         }
201
202         if (!(ch->flags & XPC_C_OPENREPLY)) {
203                 ch->flags |= XPC_C_OPENREPLY;
204                 xpc_IPI_send_openreply(ch, irq_flags);
205         }
206
207         if (!(ch->flags & XPC_C_ROPENREPLY))
208                 return;
209
210         DBUG_ON(ch->remote_msgqueue_pa == 0);
211
212         ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);    /* clear all else */
213
214         dev_info(xpc_chan, "channel %d to partition %d connected\n",
215                  ch->number, ch->partid);
216
217         spin_unlock_irqrestore(&ch->lock, *irq_flags);
218         xpc_create_kthreads(ch, 1, 0);
219         spin_lock_irqsave(&ch->lock, *irq_flags);
220 }
221
222 /*
223  * Notify those who wanted to be notified upon delivery of their message.
224  */
225 static void
226 xpc_notify_senders(struct xpc_channel *ch, enum xp_retval reason, s64 put)
227 {
228         struct xpc_notify *notify;
229         u8 notify_type;
230         s64 get = ch->w_remote_GP.get - 1;
231
232         while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
233
234                 notify = &ch->notify_queue[get % ch->local_nentries];
235
236                 /*
237                  * See if the notify entry indicates it was associated with
238                  * a message who's sender wants to be notified. It is possible
239                  * that it is, but someone else is doing or has done the
240                  * notification.
241                  */
242                 notify_type = notify->type;
243                 if (notify_type == 0 ||
244                     cmpxchg(&notify->type, notify_type, 0) != notify_type) {
245                         continue;
246                 }
247
248                 DBUG_ON(notify_type != XPC_N_CALL);
249
250                 atomic_dec(&ch->n_to_notify);
251
252                 if (notify->func != NULL) {
253                         dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
254                                 "msg_number=%ld, partid=%d, channel=%d\n",
255                                 (void *)notify, get, ch->partid, ch->number);
256
257                         notify->func(reason, ch->partid, ch->number,
258                                      notify->key);
259
260                         dev_dbg(xpc_chan, "notify->func() returned, "
261                                 "notify=0x%p, msg_number=%ld, partid=%d, "
262                                 "channel=%d\n", (void *)notify, get,
263                                 ch->partid, ch->number);
264                 }
265         }
266 }
267
268 /*
269  * Free up message queues and other stuff that were allocated for the specified
270  * channel.
271  *
272  * Note: ch->reason and ch->reason_line are left set for debugging purposes,
273  * they're cleared when XPC_C_DISCONNECTED is cleared.
274  */
275 static void
276 xpc_free_msgqueues(struct xpc_channel *ch)
277 {
278         DBUG_ON(!spin_is_locked(&ch->lock));
279         DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
280
281         ch->remote_msgqueue_pa = 0;
282         ch->func = NULL;
283         ch->key = NULL;
284         ch->msg_size = 0;
285         ch->local_nentries = 0;
286         ch->remote_nentries = 0;
287         ch->kthreads_assigned_limit = 0;
288         ch->kthreads_idle_limit = 0;
289
290         ch->local_GP->get = 0;
291         ch->local_GP->put = 0;
292         ch->remote_GP.get = 0;
293         ch->remote_GP.put = 0;
294         ch->w_local_GP.get = 0;
295         ch->w_local_GP.put = 0;
296         ch->w_remote_GP.get = 0;
297         ch->w_remote_GP.put = 0;
298         ch->next_msg_to_pull = 0;
299
300         if (ch->flags & XPC_C_SETUP) {
301                 ch->flags &= ~XPC_C_SETUP;
302
303                 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
304                         ch->flags, ch->partid, ch->number);
305
306                 kfree(ch->local_msgqueue_base);
307                 ch->local_msgqueue = NULL;
308                 kfree(ch->remote_msgqueue_base);
309                 ch->remote_msgqueue = NULL;
310                 kfree(ch->notify_queue);
311                 ch->notify_queue = NULL;
312         }
313 }
314
315 /*
316  * spin_lock_irqsave() is expected to be held on entry.
317  */
318 static void
319 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
320 {
321         struct xpc_partition *part = &xpc_partitions[ch->partid];
322         u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
323
324         DBUG_ON(!spin_is_locked(&ch->lock));
325
326         if (!(ch->flags & XPC_C_DISCONNECTING))
327                 return;
328
329         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
330
331         /* make sure all activity has settled down first */
332
333         if (atomic_read(&ch->kthreads_assigned) > 0 ||
334             atomic_read(&ch->references) > 0) {
335                 return;
336         }
337         DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
338                 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
339
340         if (part->act_state == XPC_P_DEACTIVATING) {
341                 /* can't proceed until the other side disengages from us */
342                 if (xpc_partition_engaged(1UL << ch->partid))
343                         return;
344
345         } else {
346
347                 /* as long as the other side is up do the full protocol */
348
349                 if (!(ch->flags & XPC_C_RCLOSEREQUEST))
350                         return;
351
352                 if (!(ch->flags & XPC_C_CLOSEREPLY)) {
353                         ch->flags |= XPC_C_CLOSEREPLY;
354                         xpc_IPI_send_closereply(ch, irq_flags);
355                 }
356
357                 if (!(ch->flags & XPC_C_RCLOSEREPLY))
358                         return;
359         }
360
361         /* wake those waiting for notify completion */
362         if (atomic_read(&ch->n_to_notify) > 0) {
363                 /* >>> we do callout while holding ch->lock */
364                 xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
365         }
366
367         /* both sides are disconnected now */
368
369         if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
370                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
371                 xpc_disconnect_callout(ch, xpDisconnected);
372                 spin_lock_irqsave(&ch->lock, *irq_flags);
373         }
374
375         /* it's now safe to free the channel's message queues */
376         xpc_free_msgqueues(ch);
377
378         /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
379         ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
380
381         atomic_dec(&part->nchannels_active);
382
383         if (channel_was_connected) {
384                 dev_info(xpc_chan, "channel %d to partition %d disconnected, "
385                          "reason=%d\n", ch->number, ch->partid, ch->reason);
386         }
387
388         if (ch->flags & XPC_C_WDISCONNECT) {
389                 /* we won't lose the CPU since we're holding ch->lock */
390                 complete(&ch->wdisconnect_wait);
391         } else if (ch->delayed_IPI_flags) {
392                 if (part->act_state != XPC_P_DEACTIVATING) {
393                         /* time to take action on any delayed IPI flags */
394                         spin_lock(&part->IPI_lock);
395                         XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
396                                           ch->delayed_IPI_flags);
397                         spin_unlock(&part->IPI_lock);
398                 }
399                 ch->delayed_IPI_flags = 0;
400         }
401 }
402
403 /*
404  * Process a change in the channel's remote connection state.
405  */
406 static void
407 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
408                           u8 IPI_flags)
409 {
410         unsigned long irq_flags;
411         struct xpc_openclose_args *args =
412             &part->remote_openclose_args[ch_number];
413         struct xpc_channel *ch = &part->channels[ch_number];
414         enum xp_retval reason;
415
416         spin_lock_irqsave(&ch->lock, irq_flags);
417
418 again:
419
420         if ((ch->flags & XPC_C_DISCONNECTED) &&
421             (ch->flags & XPC_C_WDISCONNECT)) {
422                 /*
423                  * Delay processing IPI flags until thread waiting disconnect
424                  * has had a chance to see that the channel is disconnected.
425                  */
426                 ch->delayed_IPI_flags |= IPI_flags;
427                 spin_unlock_irqrestore(&ch->lock, irq_flags);
428                 return;
429         }
430
431         if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
432
433                 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
434                         "from partid=%d, channel=%d\n", args->reason,
435                         ch->partid, ch->number);
436
437                 /*
438                  * If RCLOSEREQUEST is set, we're probably waiting for
439                  * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
440                  * with this RCLOSEREQUEST in the IPI_flags.
441                  */
442
443                 if (ch->flags & XPC_C_RCLOSEREQUEST) {
444                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
445                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
446                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
447                         DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
448
449                         DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY));
450                         IPI_flags &= ~XPC_IPI_CLOSEREPLY;
451                         ch->flags |= XPC_C_RCLOSEREPLY;
452
453                         /* both sides have finished disconnecting */
454                         xpc_process_disconnect(ch, &irq_flags);
455                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
456                         goto again;
457                 }
458
459                 if (ch->flags & XPC_C_DISCONNECTED) {
460                         if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
461                                 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
462                                                        ch_number) &
463                                      XPC_IPI_OPENREQUEST)) {
464
465                                         DBUG_ON(ch->delayed_IPI_flags != 0);
466                                         spin_lock(&part->IPI_lock);
467                                         XPC_SET_IPI_FLAGS(part->local_IPI_amo,
468                                                           ch_number,
469                                                           XPC_IPI_CLOSEREQUEST);
470                                         spin_unlock(&part->IPI_lock);
471                                 }
472                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
473                                 return;
474                         }
475
476                         XPC_SET_REASON(ch, 0, 0);
477                         ch->flags &= ~XPC_C_DISCONNECTED;
478
479                         atomic_inc(&part->nchannels_active);
480                         ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
481                 }
482
483                 IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY);
484
485                 /*
486                  * The meaningful CLOSEREQUEST connection state fields are:
487                  *      reason = reason connection is to be closed
488                  */
489
490                 ch->flags |= XPC_C_RCLOSEREQUEST;
491
492                 if (!(ch->flags & XPC_C_DISCONNECTING)) {
493                         reason = args->reason;
494                         if (reason <= xpSuccess || reason > xpUnknownReason)
495                                 reason = xpUnknownReason;
496                         else if (reason == xpUnregistering)
497                                 reason = xpOtherUnregistering;
498
499                         XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
500
501                         DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
502                         spin_unlock_irqrestore(&ch->lock, irq_flags);
503                         return;
504                 }
505
506                 xpc_process_disconnect(ch, &irq_flags);
507         }
508
509         if (IPI_flags & XPC_IPI_CLOSEREPLY) {
510
511                 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d,"
512                         " channel=%d\n", ch->partid, ch->number);
513
514                 if (ch->flags & XPC_C_DISCONNECTED) {
515                         DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
516                         spin_unlock_irqrestore(&ch->lock, irq_flags);
517                         return;
518                 }
519
520                 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
521
522                 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
523                         if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
524                              & XPC_IPI_CLOSEREQUEST)) {
525
526                                 DBUG_ON(ch->delayed_IPI_flags != 0);
527                                 spin_lock(&part->IPI_lock);
528                                 XPC_SET_IPI_FLAGS(part->local_IPI_amo,
529                                                   ch_number,
530                                                   XPC_IPI_CLOSEREPLY);
531                                 spin_unlock(&part->IPI_lock);
532                         }
533                         spin_unlock_irqrestore(&ch->lock, irq_flags);
534                         return;
535                 }
536
537                 ch->flags |= XPC_C_RCLOSEREPLY;
538
539                 if (ch->flags & XPC_C_CLOSEREPLY) {
540                         /* both sides have finished disconnecting */
541                         xpc_process_disconnect(ch, &irq_flags);
542                 }
543         }
544
545         if (IPI_flags & XPC_IPI_OPENREQUEST) {
546
547                 dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, "
548                         "local_nentries=%d) received from partid=%d, "
549                         "channel=%d\n", args->msg_size, args->local_nentries,
550                         ch->partid, ch->number);
551
552                 if (part->act_state == XPC_P_DEACTIVATING ||
553                     (ch->flags & XPC_C_ROPENREQUEST)) {
554                         spin_unlock_irqrestore(&ch->lock, irq_flags);
555                         return;
556                 }
557
558                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
559                         ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
560                         spin_unlock_irqrestore(&ch->lock, irq_flags);
561                         return;
562                 }
563                 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
564                                        XPC_C_OPENREQUEST)));
565                 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
566                                      XPC_C_OPENREPLY | XPC_C_CONNECTED));
567
568                 /*
569                  * The meaningful OPENREQUEST connection state fields are:
570                  *      msg_size = size of channel's messages in bytes
571                  *      local_nentries = remote partition's local_nentries
572                  */
573                 if (args->msg_size == 0 || args->local_nentries == 0) {
574                         /* assume OPENREQUEST was delayed by mistake */
575                         spin_unlock_irqrestore(&ch->lock, irq_flags);
576                         return;
577                 }
578
579                 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
580                 ch->remote_nentries = args->local_nentries;
581
582                 if (ch->flags & XPC_C_OPENREQUEST) {
583                         if (args->msg_size != ch->msg_size) {
584                                 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
585                                                        &irq_flags);
586                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
587                                 return;
588                         }
589                 } else {
590                         ch->msg_size = args->msg_size;
591
592                         XPC_SET_REASON(ch, 0, 0);
593                         ch->flags &= ~XPC_C_DISCONNECTED;
594
595                         atomic_inc(&part->nchannels_active);
596                 }
597
598                 xpc_process_connect(ch, &irq_flags);
599         }
600
601         if (IPI_flags & XPC_IPI_OPENREPLY) {
602
603                 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
604                         "local_nentries=%d, remote_nentries=%d) received from "
605                         "partid=%d, channel=%d\n", args->local_msgqueue_pa,
606                         args->local_nentries, args->remote_nentries,
607                         ch->partid, ch->number);
608
609                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
610                         spin_unlock_irqrestore(&ch->lock, irq_flags);
611                         return;
612                 }
613                 if (!(ch->flags & XPC_C_OPENREQUEST)) {
614                         XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
615                                                &irq_flags);
616                         spin_unlock_irqrestore(&ch->lock, irq_flags);
617                         return;
618                 }
619
620                 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
621                 DBUG_ON(ch->flags & XPC_C_CONNECTED);
622
623                 /*
624                  * The meaningful OPENREPLY connection state fields are:
625                  *      local_msgqueue_pa = physical address of remote
626                  *                          partition's local_msgqueue
627                  *      local_nentries = remote partition's local_nentries
628                  *      remote_nentries = remote partition's remote_nentries
629                  */
630                 DBUG_ON(args->local_msgqueue_pa == 0);
631                 DBUG_ON(args->local_nentries == 0);
632                 DBUG_ON(args->remote_nentries == 0);
633
634                 ch->flags |= XPC_C_ROPENREPLY;
635                 ch->remote_msgqueue_pa = args->local_msgqueue_pa;
636
637                 if (args->local_nentries < ch->remote_nentries) {
638                         dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
639                                 "remote_nentries=%d, old remote_nentries=%d, "
640                                 "partid=%d, channel=%d\n",
641                                 args->local_nentries, ch->remote_nentries,
642                                 ch->partid, ch->number);
643
644                         ch->remote_nentries = args->local_nentries;
645                 }
646                 if (args->remote_nentries < ch->local_nentries) {
647                         dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
648                                 "local_nentries=%d, old local_nentries=%d, "
649                                 "partid=%d, channel=%d\n",
650                                 args->remote_nentries, ch->local_nentries,
651                                 ch->partid, ch->number);
652
653                         ch->local_nentries = args->remote_nentries;
654                 }
655
656                 xpc_process_connect(ch, &irq_flags);
657         }
658
659         spin_unlock_irqrestore(&ch->lock, irq_flags);
660 }
661
662 /*
663  * Attempt to establish a channel connection to a remote partition.
664  */
665 static enum xp_retval
666 xpc_connect_channel(struct xpc_channel *ch)
667 {
668         unsigned long irq_flags;
669         struct xpc_registration *registration = &xpc_registrations[ch->number];
670
671         if (mutex_trylock(&registration->mutex) == 0)
672                 return xpRetry;
673
674         if (!XPC_CHANNEL_REGISTERED(ch->number)) {
675                 mutex_unlock(&registration->mutex);
676                 return xpUnregistered;
677         }
678
679         spin_lock_irqsave(&ch->lock, irq_flags);
680
681         DBUG_ON(ch->flags & XPC_C_CONNECTED);
682         DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
683
684         if (ch->flags & XPC_C_DISCONNECTING) {
685                 spin_unlock_irqrestore(&ch->lock, irq_flags);
686                 mutex_unlock(&registration->mutex);
687                 return ch->reason;
688         }
689
690         /* add info from the channel connect registration to the channel */
691
692         ch->kthreads_assigned_limit = registration->assigned_limit;
693         ch->kthreads_idle_limit = registration->idle_limit;
694         DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
695         DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
696         DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
697
698         ch->func = registration->func;
699         DBUG_ON(registration->func == NULL);
700         ch->key = registration->key;
701
702         ch->local_nentries = registration->nentries;
703
704         if (ch->flags & XPC_C_ROPENREQUEST) {
705                 if (registration->msg_size != ch->msg_size) {
706                         /* the local and remote sides aren't the same */
707
708                         /*
709                          * Because XPC_DISCONNECT_CHANNEL() can block we're
710                          * forced to up the registration sema before we unlock
711                          * the channel lock. But that's okay here because we're
712                          * done with the part that required the registration
713                          * sema. XPC_DISCONNECT_CHANNEL() requires that the
714                          * channel lock be locked and will unlock and relock
715                          * the channel lock as needed.
716                          */
717                         mutex_unlock(&registration->mutex);
718                         XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
719                                                &irq_flags);
720                         spin_unlock_irqrestore(&ch->lock, irq_flags);
721                         return xpUnequalMsgSizes;
722                 }
723         } else {
724                 ch->msg_size = registration->msg_size;
725
726                 XPC_SET_REASON(ch, 0, 0);
727                 ch->flags &= ~XPC_C_DISCONNECTED;
728
729                 atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
730         }
731
732         mutex_unlock(&registration->mutex);
733
734         /* initiate the connection */
735
736         ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
737         xpc_IPI_send_openrequest(ch, &irq_flags);
738
739         xpc_process_connect(ch, &irq_flags);
740
741         spin_unlock_irqrestore(&ch->lock, irq_flags);
742
743         return xpSuccess;
744 }
745
746 /*
747  * Clear some of the msg flags in the local message queue.
748  */
749 static inline void
750 xpc_clear_local_msgqueue_flags(struct xpc_channel *ch)
751 {
752         struct xpc_msg *msg;
753         s64 get;
754
755         get = ch->w_remote_GP.get;
756         do {
757                 msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
758                                          (get % ch->local_nentries) *
759                                          ch->msg_size);
760                 msg->flags = 0;
761         } while (++get < ch->remote_GP.get);
762 }
763
764 /*
765  * Clear some of the msg flags in the remote message queue.
766  */
767 static inline void
768 xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch)
769 {
770         struct xpc_msg *msg;
771         s64 put;
772
773         put = ch->w_remote_GP.put;
774         do {
775                 msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
776                                          (put % ch->remote_nentries) *
777                                          ch->msg_size);
778                 msg->flags = 0;
779         } while (++put < ch->remote_GP.put);
780 }
781
782 static void
783 xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
784 {
785         struct xpc_channel *ch = &part->channels[ch_number];
786         int nmsgs_sent;
787
788         ch->remote_GP = part->remote_GPs[ch_number];
789
790         /* See what, if anything, has changed for each connected channel */
791
792         xpc_msgqueue_ref(ch);
793
794         if (ch->w_remote_GP.get == ch->remote_GP.get &&
795             ch->w_remote_GP.put == ch->remote_GP.put) {
796                 /* nothing changed since GPs were last pulled */
797                 xpc_msgqueue_deref(ch);
798                 return;
799         }
800
801         if (!(ch->flags & XPC_C_CONNECTED)) {
802                 xpc_msgqueue_deref(ch);
803                 return;
804         }
805
806         /*
807          * First check to see if messages recently sent by us have been
808          * received by the other side. (The remote GET value will have
809          * changed since we last looked at it.)
810          */
811
812         if (ch->w_remote_GP.get != ch->remote_GP.get) {
813
814                 /*
815                  * We need to notify any senders that want to be notified
816                  * that their sent messages have been received by their
817                  * intended recipients. We need to do this before updating
818                  * w_remote_GP.get so that we don't allocate the same message
819                  * queue entries prematurely (see xpc_allocate_msg()).
820                  */
821                 if (atomic_read(&ch->n_to_notify) > 0) {
822                         /*
823                          * Notify senders that messages sent have been
824                          * received and delivered by the other side.
825                          */
826                         xpc_notify_senders(ch, xpMsgDelivered,
827                                            ch->remote_GP.get);
828                 }
829
830                 /*
831                  * Clear msg->flags in previously sent messages, so that
832                  * they're ready for xpc_allocate_msg().
833                  */
834                 xpc_clear_local_msgqueue_flags(ch);
835
836                 ch->w_remote_GP.get = ch->remote_GP.get;
837
838                 dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, "
839                         "channel=%d\n", ch->w_remote_GP.get, ch->partid,
840                         ch->number);
841
842                 /*
843                  * If anyone was waiting for message queue entries to become
844                  * available, wake them up.
845                  */
846                 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
847                         wake_up(&ch->msg_allocate_wq);
848         }
849
850         /*
851          * Now check for newly sent messages by the other side. (The remote
852          * PUT value will have changed since we last looked at it.)
853          */
854
855         if (ch->w_remote_GP.put != ch->remote_GP.put) {
856                 /*
857                  * Clear msg->flags in previously received messages, so that
858                  * they're ready for xpc_get_deliverable_msg().
859                  */
860                 xpc_clear_remote_msgqueue_flags(ch);
861
862                 ch->w_remote_GP.put = ch->remote_GP.put;
863
864                 dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, "
865                         "channel=%d\n", ch->w_remote_GP.put, ch->partid,
866                         ch->number);
867
868                 nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get;
869                 if (nmsgs_sent > 0) {
870                         dev_dbg(xpc_chan, "msgs waiting to be copied and "
871                                 "delivered=%d, partid=%d, channel=%d\n",
872                                 nmsgs_sent, ch->partid, ch->number);
873
874                         if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)
875                                 xpc_activate_kthreads(ch, nmsgs_sent);
876                 }
877         }
878
879         xpc_msgqueue_deref(ch);
880 }
881
882 void
883 xpc_process_channel_activity(struct xpc_partition *part)
884 {
885         unsigned long irq_flags;
886         u64 IPI_amo, IPI_flags;
887         struct xpc_channel *ch;
888         int ch_number;
889         u32 ch_flags;
890
891         IPI_amo = xpc_get_IPI_flags(part);
892
893         /*
894          * Initiate channel connections for registered channels.
895          *
896          * For each connected channel that has pending messages activate idle
897          * kthreads and/or create new kthreads as needed.
898          */
899
900         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
901                 ch = &part->channels[ch_number];
902
903                 /*
904                  * Process any open or close related IPI flags, and then deal
905                  * with connecting or disconnecting the channel as required.
906                  */
907
908                 IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number);
909
910                 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags))
911                         xpc_process_openclose_IPI(part, ch_number, IPI_flags);
912
913                 ch_flags = ch->flags;   /* need an atomic snapshot of flags */
914
915                 if (ch_flags & XPC_C_DISCONNECTING) {
916                         spin_lock_irqsave(&ch->lock, irq_flags);
917                         xpc_process_disconnect(ch, &irq_flags);
918                         spin_unlock_irqrestore(&ch->lock, irq_flags);
919                         continue;
920                 }
921
922                 if (part->act_state == XPC_P_DEACTIVATING)
923                         continue;
924
925                 if (!(ch_flags & XPC_C_CONNECTED)) {
926                         if (!(ch_flags & XPC_C_OPENREQUEST)) {
927                                 DBUG_ON(ch_flags & XPC_C_SETUP);
928                                 (void)xpc_connect_channel(ch);
929                         } else {
930                                 spin_lock_irqsave(&ch->lock, irq_flags);
931                                 xpc_process_connect(ch, &irq_flags);
932                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
933                         }
934                         continue;
935                 }
936
937                 /*
938                  * Process any message related IPI flags, this may involve the
939                  * activation of kthreads to deliver any pending messages sent
940                  * from the other partition.
941                  */
942
943                 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags))
944                         xpc_process_msg_IPI(part, ch_number);
945         }
946 }
947
948 /*
949  * XPC's heartbeat code calls this function to inform XPC that a partition is
950  * going down.  XPC responds by tearing down the XPartition Communication
951  * infrastructure used for the just downed partition.
952  *
953  * XPC's heartbeat code will never call this function and xpc_partition_up()
954  * at the same time. Nor will it ever make multiple calls to either function
955  * at the same time.
956  */
957 void
958 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
959 {
960         unsigned long irq_flags;
961         int ch_number;
962         struct xpc_channel *ch;
963
964         dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
965                 XPC_PARTID(part), reason);
966
967         if (!xpc_part_ref(part)) {
968                 /* infrastructure for this partition isn't currently set up */
969                 return;
970         }
971
972         /* disconnect channels associated with the partition going down */
973
974         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
975                 ch = &part->channels[ch_number];
976
977                 xpc_msgqueue_ref(ch);
978                 spin_lock_irqsave(&ch->lock, irq_flags);
979
980                 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
981
982                 spin_unlock_irqrestore(&ch->lock, irq_flags);
983                 xpc_msgqueue_deref(ch);
984         }
985
986         xpc_wakeup_channel_mgr(part);
987
988         xpc_part_deref(part);
989 }
990
991 /*
992  * Called by XP at the time of channel connection registration to cause
993  * XPC to establish connections to all currently active partitions.
994  */
995 void
996 xpc_initiate_connect(int ch_number)
997 {
998         short partid;
999         struct xpc_partition *part;
1000         struct xpc_channel *ch;
1001
1002         DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
1003
1004         for (partid = 0; partid < xp_max_npartitions; partid++) {
1005                 part = &xpc_partitions[partid];
1006
1007                 if (xpc_part_ref(part)) {
1008                         ch = &part->channels[ch_number];
1009
1010                         /*
1011                          * Initiate the establishment of a connection on the
1012                          * newly registered channel to the remote partition.
1013                          */
1014                         xpc_wakeup_channel_mgr(part);
1015                         xpc_part_deref(part);
1016                 }
1017         }
1018 }
1019
1020 void
1021 xpc_connected_callout(struct xpc_channel *ch)
1022 {
1023         /* let the registerer know that a connection has been established */
1024
1025         if (ch->func != NULL) {
1026                 dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
1027                         "partid=%d, channel=%d\n", ch->partid, ch->number);
1028
1029                 ch->func(xpConnected, ch->partid, ch->number,
1030                          (void *)(u64)ch->local_nentries, ch->key);
1031
1032                 dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
1033                         "partid=%d, channel=%d\n", ch->partid, ch->number);
1034         }
1035 }
1036
1037 /*
1038  * Called by XP at the time of channel connection unregistration to cause
1039  * XPC to teardown all current connections for the specified channel.
1040  *
1041  * Before returning xpc_initiate_disconnect() will wait until all connections
1042  * on the specified channel have been closed/torndown. So the caller can be
1043  * assured that they will not be receiving any more callouts from XPC to the
1044  * function they registered via xpc_connect().
1045  *
1046  * Arguments:
1047  *
1048  *      ch_number - channel # to unregister.
1049  */
1050 void
1051 xpc_initiate_disconnect(int ch_number)
1052 {
1053         unsigned long irq_flags;
1054         short partid;
1055         struct xpc_partition *part;
1056         struct xpc_channel *ch;
1057
1058         DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
1059
1060         /* initiate the channel disconnect for every active partition */
1061         for (partid = 0; partid < xp_max_npartitions; partid++) {
1062                 part = &xpc_partitions[partid];
1063
1064                 if (xpc_part_ref(part)) {
1065                         ch = &part->channels[ch_number];
1066                         xpc_msgqueue_ref(ch);
1067
1068                         spin_lock_irqsave(&ch->lock, irq_flags);
1069
1070                         if (!(ch->flags & XPC_C_DISCONNECTED)) {
1071                                 ch->flags |= XPC_C_WDISCONNECT;
1072
1073                                 XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
1074                                                        &irq_flags);
1075                         }
1076
1077                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1078
1079                         xpc_msgqueue_deref(ch);
1080                         xpc_part_deref(part);
1081                 }
1082         }
1083
1084         xpc_disconnect_wait(ch_number);
1085 }
1086
1087 /*
1088  * To disconnect a channel, and reflect it back to all who may be waiting.
1089  *
1090  * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
1091  * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
1092  * xpc_disconnect_wait().
1093  *
1094  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
1095  */
1096 void
1097 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
1098                        enum xp_retval reason, unsigned long *irq_flags)
1099 {
1100         u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
1101
1102         DBUG_ON(!spin_is_locked(&ch->lock));
1103
1104         if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
1105                 return;
1106
1107         DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
1108
1109         dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
1110                 reason, line, ch->partid, ch->number);
1111
1112         XPC_SET_REASON(ch, reason, line);
1113
1114         ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
1115         /* some of these may not have been set */
1116         ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
1117                        XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
1118                        XPC_C_CONNECTING | XPC_C_CONNECTED);
1119
1120         xpc_IPI_send_closerequest(ch, irq_flags);
1121
1122         if (channel_was_connected)
1123                 ch->flags |= XPC_C_WASCONNECTED;
1124
1125         spin_unlock_irqrestore(&ch->lock, *irq_flags);
1126
1127         /* wake all idle kthreads so they can exit */
1128         if (atomic_read(&ch->kthreads_idle) > 0) {
1129                 wake_up_all(&ch->idle_wq);
1130
1131         } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
1132                    !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
1133                 /* start a kthread that will do the xpDisconnecting callout */
1134                 xpc_create_kthreads(ch, 1, 1);
1135         }
1136
1137         /* wake those waiting to allocate an entry from the local msg queue */
1138         if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1139                 wake_up(&ch->msg_allocate_wq);
1140
1141         spin_lock_irqsave(&ch->lock, *irq_flags);
1142 }
1143
1144 void
1145 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
1146 {
1147         /*
1148          * Let the channel's registerer know that the channel is being
1149          * disconnected. We don't want to do this if the registerer was never
1150          * informed of a connection being made.
1151          */
1152
1153         if (ch->func != NULL) {
1154                 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
1155                         "channel=%d\n", reason, ch->partid, ch->number);
1156
1157                 ch->func(reason, ch->partid, ch->number, NULL, ch->key);
1158
1159                 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
1160                         "channel=%d\n", reason, ch->partid, ch->number);
1161         }
1162 }
1163
1164 /*
1165  * Wait for a message entry to become available for the specified channel,
1166  * but don't wait any longer than 1 jiffy.
1167  */
1168 static enum xp_retval
1169 xpc_allocate_msg_wait(struct xpc_channel *ch)
1170 {
1171         enum xp_retval ret;
1172
1173         if (ch->flags & XPC_C_DISCONNECTING) {
1174                 DBUG_ON(ch->reason == xpInterrupted);
1175                 return ch->reason;
1176         }
1177
1178         atomic_inc(&ch->n_on_msg_allocate_wq);
1179         ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
1180         atomic_dec(&ch->n_on_msg_allocate_wq);
1181
1182         if (ch->flags & XPC_C_DISCONNECTING) {
1183                 ret = ch->reason;
1184                 DBUG_ON(ch->reason == xpInterrupted);
1185         } else if (ret == 0) {
1186                 ret = xpTimeout;
1187         } else {
1188                 ret = xpInterrupted;
1189         }
1190
1191         return ret;
1192 }
1193
1194 /*
1195  * Allocate an entry for a message from the message queue associated with the
1196  * specified channel.
1197  */
1198 static enum xp_retval
1199 xpc_allocate_msg(struct xpc_channel *ch, u32 flags,
1200                  struct xpc_msg **address_of_msg)
1201 {
1202         struct xpc_msg *msg;
1203         enum xp_retval ret;
1204         s64 put;
1205
1206         /* this reference will be dropped in xpc_send_msg() */
1207         xpc_msgqueue_ref(ch);
1208
1209         if (ch->flags & XPC_C_DISCONNECTING) {
1210                 xpc_msgqueue_deref(ch);
1211                 return ch->reason;
1212         }
1213         if (!(ch->flags & XPC_C_CONNECTED)) {
1214                 xpc_msgqueue_deref(ch);
1215                 return xpNotConnected;
1216         }
1217
1218         /*
1219          * Get the next available message entry from the local message queue.
1220          * If none are available, we'll make sure that we grab the latest
1221          * GP values.
1222          */
1223         ret = xpTimeout;
1224
1225         while (1) {
1226
1227                 put = ch->w_local_GP.put;
1228                 rmb();  /* guarantee that .put loads before .get */
1229                 if (put - ch->w_remote_GP.get < ch->local_nentries) {
1230
1231                         /* There are available message entries. We need to try
1232                          * to secure one for ourselves. We'll do this by trying
1233                          * to increment w_local_GP.put as long as someone else
1234                          * doesn't beat us to it. If they do, we'll have to
1235                          * try again.
1236                          */
1237                         if (cmpxchg(&ch->w_local_GP.put, put, put + 1) == put) {
1238                                 /* we got the entry referenced by put */
1239                                 break;
1240                         }
1241                         continue;       /* try again */
1242                 }
1243
1244                 /*
1245                  * There aren't any available msg entries at this time.
1246                  *
1247                  * In waiting for a message entry to become available,
1248                  * we set a timeout in case the other side is not
1249                  * sending completion IPIs. This lets us fake an IPI
1250                  * that will cause the IPI handler to fetch the latest
1251                  * GP values as if an IPI was sent by the other side.
1252                  */
1253                 if (ret == xpTimeout)
1254                         xpc_IPI_send_local_msgrequest(ch);
1255
1256                 if (flags & XPC_NOWAIT) {
1257                         xpc_msgqueue_deref(ch);
1258                         return xpNoWait;
1259                 }
1260
1261                 ret = xpc_allocate_msg_wait(ch);
1262                 if (ret != xpInterrupted && ret != xpTimeout) {
1263                         xpc_msgqueue_deref(ch);
1264                         return ret;
1265                 }
1266         }
1267
1268         /* get the message's address and initialize it */
1269         msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
1270                                  (put % ch->local_nentries) * ch->msg_size);
1271
1272         DBUG_ON(msg->flags != 0);
1273         msg->number = put;
1274
1275         dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, "
1276                 "msg_number=%ld, partid=%d, channel=%d\n", put + 1,
1277                 (void *)msg, msg->number, ch->partid, ch->number);
1278
1279         *address_of_msg = msg;
1280
1281         return xpSuccess;
1282 }
1283
1284 /*
1285  * Allocate an entry for a message from the message queue associated with the
1286  * specified channel. NOTE that this routine can sleep waiting for a message
1287  * entry to become available. To not sleep, pass in the XPC_NOWAIT flag.
1288  *
1289  * Arguments:
1290  *
1291  *      partid - ID of partition to which the channel is connected.
1292  *      ch_number - channel #.
1293  *      flags - see xpc.h for valid flags.
1294  *      payload - address of the allocated payload area pointer (filled in on
1295  *                return) in which the user-defined message is constructed.
1296  */
1297 enum xp_retval
1298 xpc_initiate_allocate(short partid, int ch_number, u32 flags, void **payload)
1299 {
1300         struct xpc_partition *part = &xpc_partitions[partid];
1301         enum xp_retval ret = xpUnknownReason;
1302         struct xpc_msg *msg = NULL;
1303
1304         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1305         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1306
1307         *payload = NULL;
1308
1309         if (xpc_part_ref(part)) {
1310                 ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg);
1311                 xpc_part_deref(part);
1312
1313                 if (msg != NULL)
1314                         *payload = &msg->payload;
1315         }
1316
1317         return ret;
1318 }
1319
1320 /*
1321  * Now we actually send the messages that are ready to be sent by advancing
1322  * the local message queue's Put value and then send an IPI to the recipient
1323  * partition.
1324  */
1325 static void
1326 xpc_send_msgs(struct xpc_channel *ch, s64 initial_put)
1327 {
1328         struct xpc_msg *msg;
1329         s64 put = initial_put + 1;
1330         int send_IPI = 0;
1331
1332         while (1) {
1333
1334                 while (1) {
1335                         if (put == ch->w_local_GP.put)
1336                                 break;
1337
1338                         msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
1339                                                  (put % ch->local_nentries) *
1340                                                  ch->msg_size);
1341
1342                         if (!(msg->flags & XPC_M_READY))
1343                                 break;
1344
1345                         put++;
1346                 }
1347
1348                 if (put == initial_put) {
1349                         /* nothing's changed */
1350                         break;
1351                 }
1352
1353                 if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) !=
1354                     initial_put) {
1355                         /* someone else beat us to it */
1356                         DBUG_ON(ch->local_GP->put < initial_put);
1357                         break;
1358                 }
1359
1360                 /* we just set the new value of local_GP->put */
1361
1362                 dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, "
1363                         "channel=%d\n", put, ch->partid, ch->number);
1364
1365                 send_IPI = 1;
1366
1367                 /*
1368                  * We need to ensure that the message referenced by
1369                  * local_GP->put is not XPC_M_READY or that local_GP->put
1370                  * equals w_local_GP.put, so we'll go have a look.
1371                  */
1372                 initial_put = put;
1373         }
1374
1375         if (send_IPI)
1376                 xpc_IPI_send_msgrequest(ch);
1377 }
1378
1379 /*
1380  * Common code that does the actual sending of the message by advancing the
1381  * local message queue's Put value and sends an IPI to the partition the
1382  * message is being sent to.
1383  */
1384 static enum xp_retval
1385 xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
1386              xpc_notify_func func, void *key)
1387 {
1388         enum xp_retval ret = xpSuccess;
1389         struct xpc_notify *notify = notify;
1390         s64 put, msg_number = msg->number;
1391
1392         DBUG_ON(notify_type == XPC_N_CALL && func == NULL);
1393         DBUG_ON((((u64)msg - (u64)ch->local_msgqueue) / ch->msg_size) !=
1394                 msg_number % ch->local_nentries);
1395         DBUG_ON(msg->flags & XPC_M_READY);
1396
1397         if (ch->flags & XPC_C_DISCONNECTING) {
1398                 /* drop the reference grabbed in xpc_allocate_msg() */
1399                 xpc_msgqueue_deref(ch);
1400                 return ch->reason;
1401         }
1402
1403         if (notify_type != 0) {
1404                 /*
1405                  * Tell the remote side to send an ACK interrupt when the
1406                  * message has been delivered.
1407                  */
1408                 msg->flags |= XPC_M_INTERRUPT;
1409
1410                 atomic_inc(&ch->n_to_notify);
1411
1412                 notify = &ch->notify_queue[msg_number % ch->local_nentries];
1413                 notify->func = func;
1414                 notify->key = key;
1415                 notify->type = notify_type;
1416
1417                 /* >>> is a mb() needed here? */
1418
1419                 if (ch->flags & XPC_C_DISCONNECTING) {
1420                         /*
1421                          * An error occurred between our last error check and
1422                          * this one. We will try to clear the type field from
1423                          * the notify entry. If we succeed then
1424                          * xpc_disconnect_channel() didn't already process
1425                          * the notify entry.
1426                          */
1427                         if (cmpxchg(&notify->type, notify_type, 0) ==
1428                             notify_type) {
1429                                 atomic_dec(&ch->n_to_notify);
1430                                 ret = ch->reason;
1431                         }
1432
1433                         /* drop the reference grabbed in xpc_allocate_msg() */
1434                         xpc_msgqueue_deref(ch);
1435                         return ret;
1436                 }
1437         }
1438
1439         msg->flags |= XPC_M_READY;
1440
1441         /*
1442          * The preceding store of msg->flags must occur before the following
1443          * load of ch->local_GP->put.
1444          */
1445         mb();
1446
1447         /* see if the message is next in line to be sent, if so send it */
1448
1449         put = ch->local_GP->put;
1450         if (put == msg_number)
1451                 xpc_send_msgs(ch, put);
1452
1453         /* drop the reference grabbed in xpc_allocate_msg() */
1454         xpc_msgqueue_deref(ch);
1455         return ret;
1456 }
1457
1458 /*
1459  * Send a message previously allocated using xpc_initiate_allocate() on the
1460  * specified channel connected to the specified partition.
1461  *
1462  * This routine will not wait for the message to be received, nor will
1463  * notification be given when it does happen. Once this routine has returned
1464  * the message entry allocated via xpc_initiate_allocate() is no longer
1465  * accessable to the caller.
1466  *
1467  * This routine, although called by users, does not call xpc_part_ref() to
1468  * ensure that the partition infrastructure is in place. It relies on the
1469  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1470  *
1471  * Arguments:
1472  *
1473  *      partid - ID of partition to which the channel is connected.
1474  *      ch_number - channel # to send message on.
1475  *      payload - pointer to the payload area allocated via
1476  *                      xpc_initiate_allocate().
1477  */
1478 enum xp_retval
1479 xpc_initiate_send(short partid, int ch_number, void *payload)
1480 {
1481         struct xpc_partition *part = &xpc_partitions[partid];
1482         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1483         enum xp_retval ret;
1484
1485         dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
1486                 partid, ch_number);
1487
1488         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1489         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1490         DBUG_ON(msg == NULL);
1491
1492         ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL);
1493
1494         return ret;
1495 }
1496
1497 /*
1498  * Send a message previously allocated using xpc_initiate_allocate on the
1499  * specified channel connected to the specified partition.
1500  *
1501  * This routine will not wait for the message to be sent. Once this routine
1502  * has returned the message entry allocated via xpc_initiate_allocate() is no
1503  * longer accessable to the caller.
1504  *
1505  * Once the remote end of the channel has received the message, the function
1506  * passed as an argument to xpc_initiate_send_notify() will be called. This
1507  * allows the sender to free up or re-use any buffers referenced by the
1508  * message, but does NOT mean the message has been processed at the remote
1509  * end by a receiver.
1510  *
1511  * If this routine returns an error, the caller's function will NOT be called.
1512  *
1513  * This routine, although called by users, does not call xpc_part_ref() to
1514  * ensure that the partition infrastructure is in place. It relies on the
1515  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1516  *
1517  * Arguments:
1518  *
1519  *      partid - ID of partition to which the channel is connected.
1520  *      ch_number - channel # to send message on.
1521  *      payload - pointer to the payload area allocated via
1522  *                      xpc_initiate_allocate().
1523  *      func - function to call with asynchronous notification of message
1524  *                receipt. THIS FUNCTION MUST BE NON-BLOCKING.
1525  *      key - user-defined key to be passed to the function when it's called.
1526  */
1527 enum xp_retval
1528 xpc_initiate_send_notify(short partid, int ch_number, void *payload,
1529                          xpc_notify_func func, void *key)
1530 {
1531         struct xpc_partition *part = &xpc_partitions[partid];
1532         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1533         enum xp_retval ret;
1534
1535         dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
1536                 partid, ch_number);
1537
1538         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1539         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1540         DBUG_ON(msg == NULL);
1541         DBUG_ON(func == NULL);
1542
1543         ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL,
1544                            func, key);
1545         return ret;
1546 }
1547
1548 /*
1549  * Deliver a message to its intended recipient.
1550  */
1551 void
1552 xpc_deliver_msg(struct xpc_channel *ch)
1553 {
1554         struct xpc_msg *msg;
1555
1556         msg = xpc_get_deliverable_msg(ch);
1557         if (msg != NULL) {
1558
1559                 /*
1560                  * This ref is taken to protect the payload itself from being
1561                  * freed before the user is finished with it, which the user
1562                  * indicates by calling xpc_initiate_received().
1563                  */
1564                 xpc_msgqueue_ref(ch);
1565
1566                 atomic_inc(&ch->kthreads_active);
1567
1568                 if (ch->func != NULL) {
1569                         dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
1570                                 "msg_number=%ld, partid=%d, channel=%d\n",
1571                                 (void *)msg, msg->number, ch->partid,
1572                                 ch->number);
1573
1574                         /* deliver the message to its intended recipient */
1575                         ch->func(xpMsgReceived, ch->partid, ch->number,
1576                                  &msg->payload, ch->key);
1577
1578                         dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
1579                                 "msg_number=%ld, partid=%d, channel=%d\n",
1580                                 (void *)msg, msg->number, ch->partid,
1581                                 ch->number);
1582                 }
1583
1584                 atomic_dec(&ch->kthreads_active);
1585         }
1586 }
1587
1588 /*
1589  * Now we actually acknowledge the messages that have been delivered and ack'd
1590  * by advancing the cached remote message queue's Get value and if requested
1591  * send an IPI to the message sender's partition.
1592  */
1593 static void
1594 xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags)
1595 {
1596         struct xpc_msg *msg;
1597         s64 get = initial_get + 1;
1598         int send_IPI = 0;
1599
1600         while (1) {
1601
1602                 while (1) {
1603                         if (get == ch->w_local_GP.get)
1604                                 break;
1605
1606                         msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
1607                                                  (get % ch->remote_nentries) *
1608                                                  ch->msg_size);
1609
1610                         if (!(msg->flags & XPC_M_DONE))
1611                                 break;
1612
1613                         msg_flags |= msg->flags;
1614                         get++;
1615                 }
1616
1617                 if (get == initial_get) {
1618                         /* nothing's changed */
1619                         break;
1620                 }
1621
1622                 if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) !=
1623                     initial_get) {
1624                         /* someone else beat us to it */
1625                         DBUG_ON(ch->local_GP->get <= initial_get);
1626                         break;
1627                 }
1628
1629                 /* we just set the new value of local_GP->get */
1630
1631                 dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, "
1632                         "channel=%d\n", get, ch->partid, ch->number);
1633
1634                 send_IPI = (msg_flags & XPC_M_INTERRUPT);
1635
1636                 /*
1637                  * We need to ensure that the message referenced by
1638                  * local_GP->get is not XPC_M_DONE or that local_GP->get
1639                  * equals w_local_GP.get, so we'll go have a look.
1640                  */
1641                 initial_get = get;
1642         }
1643
1644         if (send_IPI)
1645                 xpc_IPI_send_msgrequest(ch);
1646 }
1647
1648 /*
1649  * Acknowledge receipt of a delivered message.
1650  *
1651  * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
1652  * that sent the message.
1653  *
1654  * This function, although called by users, does not call xpc_part_ref() to
1655  * ensure that the partition infrastructure is in place. It relies on the
1656  * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
1657  *
1658  * Arguments:
1659  *
1660  *      partid - ID of partition to which the channel is connected.
1661  *      ch_number - channel # message received on.
1662  *      payload - pointer to the payload area allocated via
1663  *                      xpc_initiate_allocate().
1664  */
1665 void
1666 xpc_initiate_received(short partid, int ch_number, void *payload)
1667 {
1668         struct xpc_partition *part = &xpc_partitions[partid];
1669         struct xpc_channel *ch;
1670         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1671         s64 get, msg_number = msg->number;
1672
1673         DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1674         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1675
1676         ch = &part->channels[ch_number];
1677
1678         dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n",
1679                 (void *)msg, msg_number, ch->partid, ch->number);
1680
1681         DBUG_ON((((u64)msg - (u64)ch->remote_msgqueue) / ch->msg_size) !=
1682                 msg_number % ch->remote_nentries);
1683         DBUG_ON(msg->flags & XPC_M_DONE);
1684
1685         msg->flags |= XPC_M_DONE;
1686
1687         /*
1688          * The preceding store of msg->flags must occur before the following
1689          * load of ch->local_GP->get.
1690          */
1691         mb();
1692
1693         /*
1694          * See if this message is next in line to be acknowledged as having
1695          * been delivered.
1696          */
1697         get = ch->local_GP->get;
1698         if (get == msg_number)
1699                 xpc_acknowledge_msgs(ch, get, msg->flags);
1700
1701         /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
1702         xpc_msgqueue_deref(ch);
1703 }