ret =  o2nm_cluster_attr_write(page, count, &val);
 
        if (ret > 0) {
-               if (val <= cluster->cl_keepalive_delay_ms) {
+               if (cluster->cl_idle_timeout_ms != val
+                       && o2net_num_connected_peers()) {
+                       mlog(ML_NOTICE,
+                            "o2net: cannot change idle timeout after "
+                            "the first peer has agreed to it."
+                            "  %d connected peers\n",
+                            o2net_num_connected_peers());
+                       ret = -EINVAL;
+               } else if (val <= cluster->cl_keepalive_delay_ms) {
                        mlog(ML_NOTICE, "o2net: idle timeout must be larger "
                             "than keepalive delay\n");
-                       return -EINVAL;
+                       ret = -EINVAL;
+               } else {
+                       cluster->cl_idle_timeout_ms = val;
                }
-               cluster->cl_idle_timeout_ms = val;
        }
 
        return ret;
        ret =  o2nm_cluster_attr_write(page, count, &val);
 
        if (ret > 0) {
-               if (val >= cluster->cl_idle_timeout_ms) {
+               if (cluster->cl_keepalive_delay_ms != val
+                   && o2net_num_connected_peers()) {
+                       mlog(ML_NOTICE,
+                            "o2net: cannot change keepalive delay after"
+                            " the first peer has agreed to it."
+                            "  %d connected peers\n",
+                            o2net_num_connected_peers());
+                       ret = -EINVAL;
+               } else if (val >= cluster->cl_idle_timeout_ms) {
                        mlog(ML_NOTICE, "o2net: keepalive delay must be "
                             "smaller than idle timeout\n");
-                       return -EINVAL;
+                       ret = -EINVAL;
+               } else {
+                       cluster->cl_keepalive_delay_ms = val;
                }
-               cluster->cl_keepalive_delay_ms = val;
        }
 
        return ret;
 
                sc_put(sc);
 }
 
+static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
+
+int o2net_num_connected_peers(void)
+{
+       return atomic_read(&o2net_connected_peers);
+}
+
 static void o2net_set_nn_state(struct o2net_node *nn,
                               struct o2net_sock_container *sc,
                               unsigned valid, int err)
 
        assert_spin_locked(&nn->nn_lock);
 
+       if (old_sc && !sc)
+               atomic_dec(&o2net_connected_peers);
+       else if (!old_sc && sc)
+               atomic_inc(&o2net_connected_peers);
+
        /* the node num comparison and single connect/accept path should stop
         * an non-null sc from being overwritten with another */
        BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
                return -1;
        }
 
+       /*
+        * Ensure timeouts are consistent with other nodes, otherwise
+        * we can end up with one node thinking that the other must be down,
+        * but isn't. This can ultimately cause corruption.
+        */
+       if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
+                               o2net_idle_timeout(sc->sc_node)) {
+               mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
+                    "%u ms, but we use %u ms locally.  disconnecting\n",
+                    SC_NODEF_ARGS(sc),
+                    be32_to_cpu(hand->o2net_idle_timeout_ms),
+                    o2net_idle_timeout(sc->sc_node));
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
+       if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
+                       o2net_keepalive_delay(sc->sc_node)) {
+               mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
+                    "%u ms, but we use %u ms locally.  disconnecting\n",
+                    SC_NODEF_ARGS(sc),
+                    be32_to_cpu(hand->o2net_keepalive_delay_ms),
+                    o2net_keepalive_delay(sc->sc_node));
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
+       if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
+                       O2HB_MAX_WRITE_TIMEOUT_MS) {
+               mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
+                    "%u ms, but we use %u ms locally.  disconnecting\n",
+                    SC_NODEF_ARGS(sc),
+                    be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
+                    O2HB_MAX_WRITE_TIMEOUT_MS);
+               o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+               return -1;
+       }
+
        sc->sc_handshake_ok = 1;
 
        spin_lock(&nn->nn_lock);
        sclog(sc, "receiving\n");
        do_gettimeofday(&sc->sc_tv_advance_start);
 
+       if (unlikely(sc->sc_handshake_ok == 0)) {
+               if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
+                       data = page_address(sc->sc_page) + sc->sc_page_off;
+                       datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
+                       ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
+                       if (ret > 0)
+                               sc->sc_page_off += ret;
+               }
+
+               if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
+                       o2net_check_handshake(sc);
+                       if (unlikely(sc->sc_handshake_ok == 0))
+                               ret = -EPROTO;
+               }
+               goto out;
+       }
+
        /* do we need more header? */
        if (sc->sc_page_off < sizeof(struct o2net_msg)) {
                data = page_address(sc->sc_page) + sc->sc_page_off;
                ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
                if (ret > 0) {
                        sc->sc_page_off += ret;
-
-                       /* this working relies on the handshake being
-                        * smaller than the normal message header */
-                       if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
-                           !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
-                               ret = -EPROTO;
-                               goto out;
-                       }
-
                        /* only swab incoming here.. we can
                         * only get here once as we cross from
                         * being under to over */
        return ret;
 }
 
+static void o2net_initialize_handshake(void)
+{
+       o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
+               O2HB_MAX_WRITE_TIMEOUT_MS);
+       o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
+               o2net_idle_timeout(NULL));
+       o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
+               o2net_keepalive_delay(NULL));
+       o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
+               o2net_reconnect_delay(NULL));
+}
+
 /* ------------------------------------------------------------ */
 
 /* called when a connect completes and after a sock is accepted.  the
               (unsigned long long)O2NET_PROTOCOL_VERSION,
              (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
 
+       o2net_initialize_handshake();
        o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
        sc_put(sc);
 }
 
        if (node_num != o2nm_this_node())
                o2net_disconnect_node(node);
+
+       BUG_ON(atomic_read(&o2net_connected_peers) < 0);
 }
 
 static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
        o2net_register_callbacks(sc->sc_sock->sk, sc);
        o2net_sc_queue_work(sc, &sc->sc_rx_work);
 
+       o2net_initialize_handshake();
        o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
 
 out: