]> www.pilppa.org Git - linux-2.6-omap-h63xx.git/blob - fs/dlm/lowcomms-tcp.c
[DLM] rename dlm_config_info fields
[linux-2.6-omap-h63xx.git] / fs / dlm / lowcomms-tcp.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is it's
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * I don't see any problem with the recv thread executing the locking
40  * code on behalf of remote processes as the locking code is
41  * short, efficient and never waits.
42  *
43  */
44
45
46 #include <asm/ioctls.h>
47 #include <net/sock.h>
48 #include <net/tcp.h>
49 #include <linux/pagemap.h>
50
51 #include "dlm_internal.h"
52 #include "lowcomms.h"
53 #include "midcomms.h"
54 #include "config.h"
55
56 struct cbuf {
57         unsigned int base;
58         unsigned int len;
59         unsigned int mask;
60 };
61
62 #define NODE_INCREMENT 32
63 static void cbuf_add(struct cbuf *cb, int n)
64 {
65         cb->len += n;
66 }
67
68 static int cbuf_data(struct cbuf *cb)
69 {
70         return ((cb->base + cb->len) & cb->mask);
71 }
72
73 static void cbuf_init(struct cbuf *cb, int size)
74 {
75         cb->base = cb->len = 0;
76         cb->mask = size-1;
77 }
78
79 static void cbuf_eat(struct cbuf *cb, int n)
80 {
81         cb->len  -= n;
82         cb->base += n;
83         cb->base &= cb->mask;
84 }
85
86 static bool cbuf_empty(struct cbuf *cb)
87 {
88         return cb->len == 0;
89 }
90
91 /* Maximum number of incoming messages to process before
92    doing a cond_resched()
93 */
94 #define MAX_RX_MSG_COUNT 25
95
96 struct connection {
97         struct socket *sock;    /* NULL if not connected */
98         uint32_t nodeid;        /* So we know who we are in the list */
99         struct rw_semaphore sock_sem; /* Stop connect races */
100         struct list_head read_list;   /* On this list when ready for reading */
101         struct list_head write_list;  /* On this list when ready for writing */
102         struct list_head state_list;  /* On this list when ready to connect */
103         unsigned long flags;    /* bit 1,2 = We are on the read/write lists */
104 #define CF_READ_PENDING 1
105 #define CF_WRITE_PENDING 2
106 #define CF_CONNECT_PENDING 3
107 #define CF_IS_OTHERCON 4
108         struct list_head writequeue;  /* List of outgoing writequeue_entries */
109         struct list_head listenlist;  /* List of allocated listening sockets */
110         spinlock_t writequeue_lock;
111         int (*rx_action) (struct connection *); /* What to do when active */
112         struct page *rx_page;
113         struct cbuf cb;
114         int retries;
115         atomic_t waiting_requests;
116 #define MAX_CONNECT_RETRIES 3
117         struct connection *othercon;
118 };
119 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
120
121 /* An entry waiting to be sent */
122 struct writequeue_entry {
123         struct list_head list;
124         struct page *page;
125         int offset;
126         int len;
127         int end;
128         int users;
129         struct connection *con;
130 };
131
132 static struct sockaddr_storage dlm_local_addr;
133
134 /* Manage daemons */
135 static struct task_struct *recv_task;
136 static struct task_struct *send_task;
137
138 static wait_queue_t lowcomms_send_waitq_head;
139 static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
140 static wait_queue_t lowcomms_recv_waitq_head;
141 static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
142
143 /* An array of pointers to connections, indexed by NODEID */
144 static struct connection **connections;
145 static DECLARE_MUTEX(connections_lock);
146 static struct kmem_cache *con_cache;
147 static int conn_array_size;
148
149 /* List of sockets that have reads pending */
150 static LIST_HEAD(read_sockets);
151 static DEFINE_SPINLOCK(read_sockets_lock);
152
153 /* List of sockets which have writes pending */
154 static LIST_HEAD(write_sockets);
155 static DEFINE_SPINLOCK(write_sockets_lock);
156
157 /* List of sockets which have connects pending */
158 static LIST_HEAD(state_sockets);
159 static DEFINE_SPINLOCK(state_sockets_lock);
160
161 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
162 {
163         struct connection *con = NULL;
164
165         down(&connections_lock);
166         if (nodeid >= conn_array_size) {
167                 int new_size = nodeid + NODE_INCREMENT;
168                 struct connection **new_conns;
169
170                 new_conns = kzalloc(sizeof(struct connection *) *
171                                     new_size, allocation);
172                 if (!new_conns)
173                         goto finish;
174
175                 memcpy(new_conns, connections,  sizeof(struct connection *) * conn_array_size);
176                 conn_array_size = new_size;
177                 kfree(connections);
178                 connections = new_conns;
179
180         }
181
182         con = connections[nodeid];
183         if (con == NULL && allocation) {
184                 con = kmem_cache_zalloc(con_cache, allocation);
185                 if (!con)
186                         goto finish;
187
188                 con->nodeid = nodeid;
189                 init_rwsem(&con->sock_sem);
190                 INIT_LIST_HEAD(&con->writequeue);
191                 spin_lock_init(&con->writequeue_lock);
192
193                 connections[nodeid] = con;
194         }
195
196 finish:
197         up(&connections_lock);
198         return con;
199 }
200
201 /* Data available on socket or listen socket received a connect */
202 static void lowcomms_data_ready(struct sock *sk, int count_unused)
203 {
204         struct connection *con = sock2con(sk);
205
206         atomic_inc(&con->waiting_requests);
207         if (test_and_set_bit(CF_READ_PENDING, &con->flags))
208                 return;
209
210         spin_lock_bh(&read_sockets_lock);
211         list_add_tail(&con->read_list, &read_sockets);
212         spin_unlock_bh(&read_sockets_lock);
213
214         wake_up_interruptible(&lowcomms_recv_waitq);
215 }
216
217 static void lowcomms_write_space(struct sock *sk)
218 {
219         struct connection *con = sock2con(sk);
220
221         if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
222                 return;
223
224         spin_lock_bh(&write_sockets_lock);
225         list_add_tail(&con->write_list, &write_sockets);
226         spin_unlock_bh(&write_sockets_lock);
227
228         wake_up_interruptible(&lowcomms_send_waitq);
229 }
230
231 static inline void lowcomms_connect_sock(struct connection *con)
232 {
233         if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
234                 return;
235
236         spin_lock_bh(&state_sockets_lock);
237         list_add_tail(&con->state_list, &state_sockets);
238         spin_unlock_bh(&state_sockets_lock);
239
240         wake_up_interruptible(&lowcomms_send_waitq);
241 }
242
243 static void lowcomms_state_change(struct sock *sk)
244 {
245         if (sk->sk_state == TCP_ESTABLISHED)
246                 lowcomms_write_space(sk);
247 }
248
249 /* Make a socket active */
250 static int add_sock(struct socket *sock, struct connection *con)
251 {
252         con->sock = sock;
253
254         /* Install a data_ready callback */
255         con->sock->sk->sk_data_ready = lowcomms_data_ready;
256         con->sock->sk->sk_write_space = lowcomms_write_space;
257         con->sock->sk->sk_state_change = lowcomms_state_change;
258
259         return 0;
260 }
261
262 /* Add the port number to an IP6 or 4 sockaddr and return the address
263    length */
264 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
265                           int *addr_len)
266 {
267         saddr->ss_family =  dlm_local_addr.ss_family;
268         if (saddr->ss_family == AF_INET) {
269                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
270                 in4_addr->sin_port = cpu_to_be16(port);
271                 *addr_len = sizeof(struct sockaddr_in);
272         } else {
273                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
274                 in6_addr->sin6_port = cpu_to_be16(port);
275                 *addr_len = sizeof(struct sockaddr_in6);
276         }
277 }
278
279 /* Close a remote connection and tidy up */
280 static void close_connection(struct connection *con, bool and_other)
281 {
282         down_write(&con->sock_sem);
283
284         if (con->sock) {
285                 sock_release(con->sock);
286                 con->sock = NULL;
287         }
288         if (con->othercon && and_other) {
289                 /* Will only re-enter once. */
290                 close_connection(con->othercon, false);
291         }
292         if (con->rx_page) {
293                 __free_page(con->rx_page);
294                 con->rx_page = NULL;
295         }
296         con->retries = 0;
297         up_write(&con->sock_sem);
298 }
299
300 /* Data received from remote end */
301 static int receive_from_sock(struct connection *con)
302 {
303         int ret = 0;
304         struct msghdr msg;
305         struct iovec iov[2];
306         mm_segment_t fs;
307         unsigned len;
308         int r;
309         int call_again_soon = 0;
310
311         down_read(&con->sock_sem);
312
313         if (con->sock == NULL)
314                 goto out;
315         if (con->rx_page == NULL) {
316                 /*
317                  * This doesn't need to be atomic, but I think it should
318                  * improve performance if it is.
319                  */
320                 con->rx_page = alloc_page(GFP_ATOMIC);
321                 if (con->rx_page == NULL)
322                         goto out_resched;
323                 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
324         }
325
326         msg.msg_control = NULL;
327         msg.msg_controllen = 0;
328         msg.msg_iovlen = 1;
329         msg.msg_iov = iov;
330         msg.msg_name = NULL;
331         msg.msg_namelen = 0;
332         msg.msg_flags = 0;
333
334         /*
335          * iov[0] is the bit of the circular buffer between the current end
336          * point (cb.base + cb.len) and the end of the buffer.
337          */
338         iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
339         iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
340         iov[1].iov_len = 0;
341
342         /*
343          * iov[1] is the bit of the circular buffer between the start of the
344          * buffer and the start of the currently used section (cb.base)
345          */
346         if (cbuf_data(&con->cb) >= con->cb.base) {
347                 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
348                 iov[1].iov_len = con->cb.base;
349                 iov[1].iov_base = page_address(con->rx_page);
350                 msg.msg_iovlen = 2;
351         }
352         len = iov[0].iov_len + iov[1].iov_len;
353
354         fs = get_fs();
355         set_fs(get_ds());
356         r = ret = sock_recvmsg(con->sock, &msg, len,
357                                MSG_DONTWAIT | MSG_NOSIGNAL);
358         set_fs(fs);
359
360         if (ret <= 0)
361                 goto out_close;
362         if (ret == len)
363                 call_again_soon = 1;
364         cbuf_add(&con->cb, ret);
365         ret = dlm_process_incoming_buffer(con->nodeid,
366                                           page_address(con->rx_page),
367                                           con->cb.base, con->cb.len,
368                                           PAGE_CACHE_SIZE);
369         if (ret == -EBADMSG) {
370                 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
371                        "iov_len=%u, iov_base[0]=%p, read=%d\n",
372                        page_address(con->rx_page), con->cb.base, con->cb.len,
373                        len, iov[0].iov_base, r);
374         }
375         if (ret < 0)
376                 goto out_close;
377         cbuf_eat(&con->cb, ret);
378
379         if (cbuf_empty(&con->cb) && !call_again_soon) {
380                 __free_page(con->rx_page);
381                 con->rx_page = NULL;
382         }
383
384 out:
385         if (call_again_soon)
386                 goto out_resched;
387         up_read(&con->sock_sem);
388         return 0;
389
390 out_resched:
391         lowcomms_data_ready(con->sock->sk, 0);
392         up_read(&con->sock_sem);
393         cond_resched();
394         return 0;
395
396 out_close:
397         up_read(&con->sock_sem);
398         if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
399                 close_connection(con, false);
400                 /* Reconnect when there is something to send */
401         }
402
403         return ret;
404 }
405
406 /* Listening socket is busy, accept a connection */
407 static int accept_from_sock(struct connection *con)
408 {
409         int result;
410         struct sockaddr_storage peeraddr;
411         struct socket *newsock;
412         int len;
413         int nodeid;
414         struct connection *newcon;
415
416         memset(&peeraddr, 0, sizeof(peeraddr));
417         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
418                                   IPPROTO_TCP, &newsock);
419         if (result < 0)
420                 return -ENOMEM;
421
422         down_read(&con->sock_sem);
423
424         result = -ENOTCONN;
425         if (con->sock == NULL)
426                 goto accept_err;
427
428         newsock->type = con->sock->type;
429         newsock->ops = con->sock->ops;
430
431         result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
432         if (result < 0)
433                 goto accept_err;
434
435         /* Get the connected socket's peer */
436         memset(&peeraddr, 0, sizeof(peeraddr));
437         if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
438                                   &len, 2)) {
439                 result = -ECONNABORTED;
440                 goto accept_err;
441         }
442
443         /* Get the new node's NODEID */
444         make_sockaddr(&peeraddr, 0, &len);
445         if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
446                 printk("dlm: connect from non cluster node\n");
447                 sock_release(newsock);
448                 up_read(&con->sock_sem);
449                 return -1;
450         }
451
452         log_print("got connection from %d", nodeid);
453
454         /*  Check to see if we already have a connection to this node. This
455          *  could happen if the two nodes initiate a connection at roughly
456          *  the same time and the connections cross on the wire.
457          * TEMPORARY FIX:
458          *  In this case we store the incoming one in "othercon"
459          */
460         newcon = nodeid2con(nodeid, GFP_KERNEL);
461         if (!newcon) {
462                 result = -ENOMEM;
463                 goto accept_err;
464         }
465         down_write(&newcon->sock_sem);
466         if (newcon->sock) {
467                 struct connection *othercon = newcon->othercon;
468
469                 if (!othercon) {
470                         othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
471                         if (!othercon) {
472                                 printk("dlm: failed to allocate incoming socket\n");
473                                 up_write(&newcon->sock_sem);
474                                 result = -ENOMEM;
475                                 goto accept_err;
476                         }
477                         othercon->nodeid = nodeid;
478                         othercon->rx_action = receive_from_sock;
479                         init_rwsem(&othercon->sock_sem);
480                         set_bit(CF_IS_OTHERCON, &othercon->flags);
481                         newcon->othercon = othercon;
482                 }
483                 othercon->sock = newsock;
484                 newsock->sk->sk_user_data = othercon;
485                 add_sock(newsock, othercon);
486         }
487         else {
488                 newsock->sk->sk_user_data = newcon;
489                 newcon->rx_action = receive_from_sock;
490                 add_sock(newsock, newcon);
491
492         }
493
494         up_write(&newcon->sock_sem);
495
496         /*
497          * Add it to the active queue in case we got data
498          * beween processing the accept adding the socket
499          * to the read_sockets list
500          */
501         lowcomms_data_ready(newsock->sk, 0);
502         up_read(&con->sock_sem);
503
504         return 0;
505
506 accept_err:
507         up_read(&con->sock_sem);
508         sock_release(newsock);
509
510         if (result != -EAGAIN)
511                 printk("dlm: error accepting connection from node: %d\n", result);
512         return result;
513 }
514
515 /* Connect a new socket to its peer */
516 static void connect_to_sock(struct connection *con)
517 {
518         int result = -EHOSTUNREACH;
519         struct sockaddr_storage saddr;
520         int addr_len;
521         struct socket *sock;
522
523         if (con->nodeid == 0) {
524                 log_print("attempt to connect sock 0 foiled");
525                 return;
526         }
527
528         down_write(&con->sock_sem);
529         if (con->retries++ > MAX_CONNECT_RETRIES)
530                 goto out;
531
532         /* Some odd races can cause double-connects, ignore them */
533         if (con->sock) {
534                 result = 0;
535                 goto out;
536         }
537
538         /* Create a socket to communicate with */
539         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
540                                   IPPROTO_TCP, &sock);
541         if (result < 0)
542                 goto out_err;
543
544         memset(&saddr, 0, sizeof(saddr));
545         if (dlm_nodeid_to_addr(con->nodeid, &saddr))
546                 goto out_err;
547
548         sock->sk->sk_user_data = con;
549         con->rx_action = receive_from_sock;
550
551         make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
552
553         add_sock(sock, con);
554
555         log_print("connecting to %d", con->nodeid);
556         result =
557                 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
558                                    O_NONBLOCK);
559         if (result == -EINPROGRESS)
560                 result = 0;
561         if (result == 0)
562                 goto out;
563
564 out_err:
565         if (con->sock) {
566                 sock_release(con->sock);
567                 con->sock = NULL;
568         }
569         /*
570          * Some errors are fatal and this list might need adjusting. For other
571          * errors we try again until the max number of retries is reached.
572          */
573         if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
574             result != -ENETDOWN && result != EINVAL
575             && result != -EPROTONOSUPPORT) {
576                 lowcomms_connect_sock(con);
577                 result = 0;
578         }
579 out:
580         up_write(&con->sock_sem);
581         return;
582 }
583
584 static struct socket *create_listen_sock(struct connection *con,
585                                          struct sockaddr_storage *saddr)
586 {
587         struct socket *sock = NULL;
588         mm_segment_t fs;
589         int result = 0;
590         int one = 1;
591         int addr_len;
592
593         if (dlm_local_addr.ss_family == AF_INET)
594                 addr_len = sizeof(struct sockaddr_in);
595         else
596                 addr_len = sizeof(struct sockaddr_in6);
597
598         /* Create a socket to communicate with */
599         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
600         if (result < 0) {
601                 printk("dlm: Can't create listening comms socket\n");
602                 goto create_out;
603         }
604
605         fs = get_fs();
606         set_fs(get_ds());
607         result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
608                                  (char *)&one, sizeof(one));
609         set_fs(fs);
610         if (result < 0) {
611                 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
612                        result);
613         }
614         sock->sk->sk_user_data = con;
615         con->rx_action = accept_from_sock;
616         con->sock = sock;
617
618         /* Bind to our port */
619         make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
620         result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
621         if (result < 0) {
622                 printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
623                 sock_release(sock);
624                 sock = NULL;
625                 con->sock = NULL;
626                 goto create_out;
627         }
628
629         fs = get_fs();
630         set_fs(get_ds());
631
632         result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
633                                  (char *)&one, sizeof(one));
634         set_fs(fs);
635         if (result < 0) {
636                 printk("dlm: Set keepalive failed: %d\n", result);
637         }
638
639         result = sock->ops->listen(sock, 5);
640         if (result < 0) {
641                 printk("dlm: Can't listen on port %d\n",
642                        dlm_config.ci_tcp_port);
643                 sock_release(sock);
644                 sock = NULL;
645                 goto create_out;
646         }
647
648 create_out:
649         return sock;
650 }
651
652
653 /* Listen on all interfaces */
654 static int listen_for_all(void)
655 {
656         struct socket *sock = NULL;
657         struct connection *con = nodeid2con(0, GFP_KERNEL);
658         int result = -EINVAL;
659
660         /* We don't support multi-homed hosts */
661         set_bit(CF_IS_OTHERCON, &con->flags);
662
663         sock = create_listen_sock(con, &dlm_local_addr);
664         if (sock) {
665                 add_sock(sock, con);
666                 result = 0;
667         }
668         else {
669                 result = -EADDRINUSE;
670         }
671
672         return result;
673 }
674
675
676
677 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
678                                                      gfp_t allocation)
679 {
680         struct writequeue_entry *entry;
681
682         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
683         if (!entry)
684                 return NULL;
685
686         entry->page = alloc_page(allocation);
687         if (!entry->page) {
688                 kfree(entry);
689                 return NULL;
690         }
691
692         entry->offset = 0;
693         entry->len = 0;
694         entry->end = 0;
695         entry->users = 0;
696         entry->con = con;
697
698         return entry;
699 }
700
701 void *dlm_lowcomms_get_buffer(int nodeid, int len,
702                               gfp_t allocation, char **ppc)
703 {
704         struct connection *con;
705         struct writequeue_entry *e;
706         int offset = 0;
707         int users = 0;
708
709         con = nodeid2con(nodeid, allocation);
710         if (!con)
711                 return NULL;
712
713         spin_lock(&con->writequeue_lock);
714         e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
715         if ((&e->list == &con->writequeue) ||
716             (PAGE_CACHE_SIZE - e->end < len)) {
717                 e = NULL;
718         } else {
719                 offset = e->end;
720                 e->end += len;
721                 users = e->users++;
722         }
723         spin_unlock(&con->writequeue_lock);
724
725         if (e) {
726         got_one:
727                 if (users == 0)
728                         kmap(e->page);
729                 *ppc = page_address(e->page) + offset;
730                 return e;
731         }
732
733         e = new_writequeue_entry(con, allocation);
734         if (e) {
735                 spin_lock(&con->writequeue_lock);
736                 offset = e->end;
737                 e->end += len;
738                 users = e->users++;
739                 list_add_tail(&e->list, &con->writequeue);
740                 spin_unlock(&con->writequeue_lock);
741                 goto got_one;
742         }
743         return NULL;
744 }
745
746 void dlm_lowcomms_commit_buffer(void *mh)
747 {
748         struct writequeue_entry *e = (struct writequeue_entry *)mh;
749         struct connection *con = e->con;
750         int users;
751
752         spin_lock(&con->writequeue_lock);
753         users = --e->users;
754         if (users)
755                 goto out;
756         e->len = e->end - e->offset;
757         kunmap(e->page);
758         spin_unlock(&con->writequeue_lock);
759
760         if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
761                 spin_lock_bh(&write_sockets_lock);
762                 list_add_tail(&con->write_list, &write_sockets);
763                 spin_unlock_bh(&write_sockets_lock);
764
765                 wake_up_interruptible(&lowcomms_send_waitq);
766         }
767         return;
768
769 out:
770         spin_unlock(&con->writequeue_lock);
771         return;
772 }
773
774 static void free_entry(struct writequeue_entry *e)
775 {
776         __free_page(e->page);
777         kfree(e);
778 }
779
780 /* Send a message */
781 static void send_to_sock(struct connection *con)
782 {
783         int ret = 0;
784         ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
785         const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
786         struct writequeue_entry *e;
787         int len, offset;
788
789         down_read(&con->sock_sem);
790         if (con->sock == NULL)
791                 goto out_connect;
792
793         sendpage = con->sock->ops->sendpage;
794
795         spin_lock(&con->writequeue_lock);
796         for (;;) {
797                 e = list_entry(con->writequeue.next, struct writequeue_entry,
798                                list);
799                 if ((struct list_head *) e == &con->writequeue)
800                         break;
801
802                 len = e->len;
803                 offset = e->offset;
804                 BUG_ON(len == 0 && e->users == 0);
805                 spin_unlock(&con->writequeue_lock);
806
807                 ret = 0;
808                 if (len) {
809                         ret = sendpage(con->sock, e->page, offset, len,
810                                        msg_flags);
811                         if (ret == -EAGAIN || ret == 0)
812                                 goto out;
813                         if (ret <= 0)
814                                 goto send_error;
815                 }
816                 else {
817                         /* Don't starve people filling buffers */
818                         cond_resched();
819                 }
820
821                 spin_lock(&con->writequeue_lock);
822                 e->offset += ret;
823                 e->len -= ret;
824
825                 if (e->len == 0 && e->users == 0) {
826                         list_del(&e->list);
827                         kunmap(e->page);
828                         free_entry(e);
829                         continue;
830                 }
831         }
832         spin_unlock(&con->writequeue_lock);
833 out:
834         up_read(&con->sock_sem);
835         return;
836
837 send_error:
838         up_read(&con->sock_sem);
839         close_connection(con, false);
840         lowcomms_connect_sock(con);
841         return;
842
843 out_connect:
844         up_read(&con->sock_sem);
845         lowcomms_connect_sock(con);
846         return;
847 }
848
849 static void clean_one_writequeue(struct connection *con)
850 {
851         struct list_head *list;
852         struct list_head *temp;
853
854         spin_lock(&con->writequeue_lock);
855         list_for_each_safe(list, temp, &con->writequeue) {
856                 struct writequeue_entry *e =
857                         list_entry(list, struct writequeue_entry, list);
858                 list_del(&e->list);
859                 free_entry(e);
860         }
861         spin_unlock(&con->writequeue_lock);
862 }
863
864 /* Called from recovery when it knows that a node has
865    left the cluster */
866 int dlm_lowcomms_close(int nodeid)
867 {
868         struct connection *con;
869
870         if (!connections)
871                 goto out;
872
873         log_print("closing connection to node %d", nodeid);
874         con = nodeid2con(nodeid, 0);
875         if (con) {
876                 clean_one_writequeue(con);
877                 close_connection(con, true);
878                 atomic_set(&con->waiting_requests, 0);
879         }
880         return 0;
881
882 out:
883         return -1;
884 }
885
886 /* Look for activity on active sockets */
887 static void process_sockets(void)
888 {
889         struct list_head *list;
890         struct list_head *temp;
891         int count = 0;
892
893         spin_lock_bh(&read_sockets_lock);
894         list_for_each_safe(list, temp, &read_sockets) {
895
896                 struct connection *con =
897                         list_entry(list, struct connection, read_list);
898                 list_del(&con->read_list);
899                 clear_bit(CF_READ_PENDING, &con->flags);
900
901                 spin_unlock_bh(&read_sockets_lock);
902
903                 /* This can reach zero if we are processing requests
904                  * as they come in.
905                  */
906                 if (atomic_read(&con->waiting_requests) == 0) {
907                         spin_lock_bh(&read_sockets_lock);
908                         continue;
909                 }
910
911                 do {
912                         con->rx_action(con);
913
914                         /* Don't starve out everyone else */
915                         if (++count >= MAX_RX_MSG_COUNT) {
916                                 cond_resched();
917                                 count = 0;
918                         }
919
920                 } while (!atomic_dec_and_test(&con->waiting_requests) &&
921                          !kthread_should_stop());
922
923                 spin_lock_bh(&read_sockets_lock);
924         }
925         spin_unlock_bh(&read_sockets_lock);
926 }
927
928 /* Try to send any messages that are pending
929  */
930 static void process_output_queue(void)
931 {
932         struct list_head *list;
933         struct list_head *temp;
934
935         spin_lock_bh(&write_sockets_lock);
936         list_for_each_safe(list, temp, &write_sockets) {
937                 struct connection *con =
938                         list_entry(list, struct connection, write_list);
939                 clear_bit(CF_WRITE_PENDING, &con->flags);
940                 list_del(&con->write_list);
941
942                 spin_unlock_bh(&write_sockets_lock);
943                 send_to_sock(con);
944                 spin_lock_bh(&write_sockets_lock);
945         }
946         spin_unlock_bh(&write_sockets_lock);
947 }
948
949 static void process_state_queue(void)
950 {
951         struct list_head *list;
952         struct list_head *temp;
953
954         spin_lock_bh(&state_sockets_lock);
955         list_for_each_safe(list, temp, &state_sockets) {
956                 struct connection *con =
957                         list_entry(list, struct connection, state_list);
958                 list_del(&con->state_list);
959                 clear_bit(CF_CONNECT_PENDING, &con->flags);
960                 spin_unlock_bh(&state_sockets_lock);
961
962                 connect_to_sock(con);
963                 spin_lock_bh(&state_sockets_lock);
964         }
965         spin_unlock_bh(&state_sockets_lock);
966 }
967
968
969 /* Discard all entries on the write queues */
970 static void clean_writequeues(void)
971 {
972         int nodeid;
973
974         for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
975                 struct connection *con = nodeid2con(nodeid, 0);
976
977                 if (con)
978                         clean_one_writequeue(con);
979         }
980 }
981
982 static int read_list_empty(void)
983 {
984         int status;
985
986         spin_lock_bh(&read_sockets_lock);
987         status = list_empty(&read_sockets);
988         spin_unlock_bh(&read_sockets_lock);
989
990         return status;
991 }
992
993 /* DLM Transport comms receive daemon */
994 static int dlm_recvd(void *data)
995 {
996         init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
997         add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
998
999         while (!kthread_should_stop()) {
1000                 set_current_state(TASK_INTERRUPTIBLE);
1001                 if (read_list_empty())
1002                         schedule();
1003                 set_current_state(TASK_RUNNING);
1004
1005                 process_sockets();
1006         }
1007
1008         return 0;
1009 }
1010
1011 static int write_and_state_lists_empty(void)
1012 {
1013         int status;
1014
1015         spin_lock_bh(&write_sockets_lock);
1016         status = list_empty(&write_sockets);
1017         spin_unlock_bh(&write_sockets_lock);
1018
1019         spin_lock_bh(&state_sockets_lock);
1020         if (list_empty(&state_sockets) == 0)
1021                 status = 0;
1022         spin_unlock_bh(&state_sockets_lock);
1023
1024         return status;
1025 }
1026
1027 /* DLM Transport send daemon */
1028 static int dlm_sendd(void *data)
1029 {
1030         init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1031         add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1032
1033         while (!kthread_should_stop()) {
1034                 set_current_state(TASK_INTERRUPTIBLE);
1035                 if (write_and_state_lists_empty())
1036                         schedule();
1037                 set_current_state(TASK_RUNNING);
1038
1039                 process_state_queue();
1040                 process_output_queue();
1041         }
1042
1043         return 0;
1044 }
1045
1046 static void daemons_stop(void)
1047 {
1048         kthread_stop(recv_task);
1049         kthread_stop(send_task);
1050 }
1051
1052 static int daemons_start(void)
1053 {
1054         struct task_struct *p;
1055         int error;
1056
1057         p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1058         error = IS_ERR(p);
1059         if (error) {
1060                 log_print("can't start dlm_recvd %d", error);
1061                 return error;
1062         }
1063         recv_task = p;
1064
1065         p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1066         error = IS_ERR(p);
1067         if (error) {
1068                 log_print("can't start dlm_sendd %d", error);
1069                 kthread_stop(recv_task);
1070                 return error;
1071         }
1072         send_task = p;
1073
1074         return 0;
1075 }
1076
1077 void dlm_lowcomms_stop(void)
1078 {
1079         int i;
1080
1081         /* Set all the flags to prevent any
1082            socket activity.
1083         */
1084         for (i = 0; i < conn_array_size; i++) {
1085                 if (connections[i])
1086                         connections[i]->flags |= 0xFF;
1087         }
1088
1089         daemons_stop();
1090         clean_writequeues();
1091
1092         for (i = 0; i < conn_array_size; i++) {
1093                 if (connections[i]) {
1094                         close_connection(connections[i], true);
1095                         if (connections[i]->othercon)
1096                                 kmem_cache_free(con_cache, connections[i]->othercon);
1097                         kmem_cache_free(con_cache, connections[i]);
1098                 }
1099         }
1100
1101         kfree(connections);
1102         connections = NULL;
1103
1104         kmem_cache_destroy(con_cache);
1105 }
1106
1107 /* This is quite likely to sleep... */
1108 int dlm_lowcomms_start(void)
1109 {
1110         int error = 0;
1111
1112         error = -ENOMEM;
1113         connections = kzalloc(sizeof(struct connection *) *
1114                               NODE_INCREMENT, GFP_KERNEL);
1115         if (!connections)
1116                 goto out;
1117
1118         conn_array_size = NODE_INCREMENT;
1119
1120         if (dlm_our_addr(&dlm_local_addr, 0)) {
1121                 log_print("no local IP address has been set");
1122                 goto fail_free_conn;
1123         }
1124         if (!dlm_our_addr(&dlm_local_addr, 1)) {
1125                 log_print("This dlm comms module does not support multi-homed clustering");
1126                 goto fail_free_conn;
1127         }
1128
1129         con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1130                                       __alignof__(struct connection), 0,
1131                                       NULL, NULL);
1132         if (!con_cache)
1133                 goto fail_free_conn;
1134
1135
1136         /* Start listening */
1137         error = listen_for_all();
1138         if (error)
1139                 goto fail_unlisten;
1140
1141         error = daemons_start();
1142         if (error)
1143                 goto fail_unlisten;
1144
1145         return 0;
1146
1147 fail_unlisten:
1148         close_connection(connections[0], false);
1149         kmem_cache_free(con_cache, connections[0]);
1150         kmem_cache_destroy(con_cache);
1151
1152 fail_free_conn:
1153         kfree(connections);
1154
1155 out:
1156         return error;
1157 }
1158
1159 /*
1160  * Overrides for Emacs so that we follow Linus's tabbing style.
1161  * Emacs will notice this stuff at the end of the file and automatically
1162  * adjust the settings for this buffer only.  This must remain at the end
1163  * of the file.
1164  * ---------------------------------------------------------------------------
1165  * Local variables:
1166  * c-file-style: "linux"
1167  * End:
1168  */