Linux Audio

Check our new training course

Loading...
   1/* -*- mode: c; c-basic-offset: 8; -*-
   2 *
   3 * vim: noexpandtab sw=8 ts=8 sts=0:
   4 *
   5 * Copyright (C) 2004 Oracle.  All rights reserved.
   6 *
   7 * This program is free software; you can redistribute it and/or
   8 * modify it under the terms of the GNU General Public
   9 * License as published by the Free Software Foundation; either
  10 * version 2 of the License, or (at your option) any later version.
  11 *
  12 * This program is distributed in the hope that it will be useful,
  13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  15 * General Public License for more details.
  16 *
  17 * You should have received a copy of the GNU General Public
  18 * License along with this program; if not, write to the
  19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  20 * Boston, MA 021110-1307, USA.
  21 *
  22 * ----
  23 *
  24 * Callers for this were originally written against a very simple synchronus
  25 * API.  This implementation reflects those simple callers.  Some day I'm sure
  26 * we'll need to move to a more robust posting/callback mechanism.
  27 *
  28 * Transmit calls pass in kernel virtual addresses and block copying this into
  29 * the socket's tx buffers via a usual blocking sendmsg.  They'll block waiting
  30 * for a failed socket to timeout.  TX callers can also pass in a poniter to an
  31 * 'int' which gets filled with an errno off the wire in response to the
  32 * message they send.
  33 *
  34 * Handlers for unsolicited messages are registered.  Each socket has a page
  35 * that incoming data is copied into.  First the header, then the data.
  36 * Handlers are called from only one thread with a reference to this per-socket
  37 * page.  This page is destroyed after the handler call, so it can't be
  38 * referenced beyond the call.  Handlers may block but are discouraged from
  39 * doing so.
  40 *
  41 * Any framing errors (bad magic, large payload lengths) close a connection.
  42 *
  43 * Our sock_container holds the state we associate with a socket.  It's current
  44 * framing state is held there as well as the refcounting we do around when it
  45 * is safe to tear down the socket.  The socket is only finally torn down from
  46 * the container when the container loses all of its references -- so as long
  47 * as you hold a ref on the container you can trust that the socket is valid
  48 * for use with kernel socket APIs.
  49 *
  50 * Connections are initiated between a pair of nodes when the node with the
  51 * higher node number gets a heartbeat callback which indicates that the lower
  52 * numbered node has started heartbeating.  The lower numbered node is passive
  53 * and only accepts the connection if the higher numbered node is heartbeating.
  54 */
  55
  56#include <linux/kernel.h>
  57#include <linux/jiffies.h>
  58#include <linux/slab.h>
  59#include <linux/idr.h>
  60#include <linux/kref.h>
  61#include <linux/net.h>
  62#include <linux/export.h>
  63#include <linux/uaccess.h>
  64#include <net/tcp.h>
  65
  66
  67#include "heartbeat.h"
  68#include "tcp.h"
  69#include "nodemanager.h"
  70#define MLOG_MASK_PREFIX ML_TCP
  71#include "masklog.h"
  72
  73#include "tcp_internal.h"
  74
  75#define SC_NODEF_FMT "node %s (num %u) at %pI4:%u"
  76
  77/*
  78 * In the following two log macros, the whitespace after the ',' just
  79 * before ##args is intentional. Otherwise, gcc 2.95 will eat the
  80 * previous token if args expands to nothing.
  81 */
  82#define msglog(hdr, fmt, args...) do {					\
  83	typeof(hdr) __hdr = (hdr);					\
  84	mlog(ML_MSG, "[mag %u len %u typ %u stat %d sys_stat %d "	\
  85	     "key %08x num %u] " fmt,					\
  86	be16_to_cpu(__hdr->magic), be16_to_cpu(__hdr->data_len),	\
  87	     be16_to_cpu(__hdr->msg_type), be32_to_cpu(__hdr->status),	\
  88	     be32_to_cpu(__hdr->sys_status), be32_to_cpu(__hdr->key),	\
  89	     be32_to_cpu(__hdr->msg_num) ,  ##args);			\
  90} while (0)
  91
  92#define sclog(sc, fmt, args...) do {					\
  93	typeof(sc) __sc = (sc);						\
  94	mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p "	\
  95	     "pg_off %zu] " fmt, __sc,					\
  96	     atomic_read(&__sc->sc_kref.refcount), __sc->sc_sock,	\
  97	    __sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off ,	\
  98	    ##args);							\
  99} while (0)
 100
 101static DEFINE_RWLOCK(r2net_handler_lock);
 102static struct rb_root r2net_handler_tree = RB_ROOT;
 103
 104static struct r2net_node r2net_nodes[R2NM_MAX_NODES];
 105
 106/* XXX someday we'll need better accounting */
 107static struct socket *r2net_listen_sock;
 108
 109/*
 110 * listen work is only queued by the listening socket callbacks on the
 111 * r2net_wq.  teardown detaches the callbacks before destroying the workqueue.
 112 * quorum work is queued as sock containers are shutdown.. stop_listening
 113 * tears down all the node's sock containers, preventing future shutdowns
 114 * and queued quorum work, before canceling delayed quorum work and
 115 * destroying the work queue.
 116 */
 117static struct workqueue_struct *r2net_wq;
 118static struct work_struct r2net_listen_work;
 119
 120static struct r2hb_callback_func r2net_hb_up, r2net_hb_down;
 121#define R2NET_HB_PRI 0x1
 122
 123static struct r2net_handshake *r2net_hand;
 124static struct r2net_msg *r2net_keep_req, *r2net_keep_resp;
 125
 126static int r2net_sys_err_translations[R2NET_ERR_MAX] = {
 127		[R2NET_ERR_NONE]	= 0,
 128		[R2NET_ERR_NO_HNDLR]	= -ENOPROTOOPT,
 129		[R2NET_ERR_OVERFLOW]	= -EOVERFLOW,
 130		[R2NET_ERR_DIED]	= -EHOSTDOWN,};
 131
 132/* can't quite avoid *all* internal declarations :/ */
 133static void r2net_sc_connect_completed(struct work_struct *work);
 134static void r2net_rx_until_empty(struct work_struct *work);
 135static void r2net_shutdown_sc(struct work_struct *work);
 136static void r2net_listen_data_ready(struct sock *sk, int bytes);
 137static void r2net_sc_send_keep_req(struct work_struct *work);
 138static void r2net_idle_timer(unsigned long data);
 139static void r2net_sc_postpone_idle(struct r2net_sock_container *sc);
 140static void r2net_sc_reset_idle_timer(struct r2net_sock_container *sc);
 141
 142#ifdef CONFIG_DEBUG_FS
 143static void r2net_init_nst(struct r2net_send_tracking *nst, u32 msgtype,
 144			   u32 msgkey, struct task_struct *task, u8 node)
 145{
 146	INIT_LIST_HEAD(&nst->st_net_debug_item);
 147	nst->st_task = task;
 148	nst->st_msg_type = msgtype;
 149	nst->st_msg_key = msgkey;
 150	nst->st_node = node;
 151}
 152
 153static inline void r2net_set_nst_sock_time(struct r2net_send_tracking *nst)
 154{
 155	nst->st_sock_time = ktime_get();
 156}
 157
 158static inline void r2net_set_nst_send_time(struct r2net_send_tracking *nst)
 159{
 160	nst->st_send_time = ktime_get();
 161}
 162
 163static inline void r2net_set_nst_status_time(struct r2net_send_tracking *nst)
 164{
 165	nst->st_status_time = ktime_get();
 166}
 167
 168static inline void r2net_set_nst_sock_container(struct r2net_send_tracking *nst,
 169						struct r2net_sock_container *sc)
 170{
 171	nst->st_sc = sc;
 172}
 173
 174static inline void r2net_set_nst_msg_id(struct r2net_send_tracking *nst,
 175					u32 msg_id)
 176{
 177	nst->st_id = msg_id;
 178}
 179
 180static inline void r2net_set_sock_timer(struct r2net_sock_container *sc)
 181{
 182	sc->sc_tv_timer = ktime_get();
 183}
 184
 185static inline void r2net_set_data_ready_time(struct r2net_sock_container *sc)
 186{
 187	sc->sc_tv_data_ready = ktime_get();
 188}
 189
 190static inline void r2net_set_advance_start_time(struct r2net_sock_container *sc)
 191{
 192	sc->sc_tv_advance_start = ktime_get();
 193}
 194
 195static inline void r2net_set_advance_stop_time(struct r2net_sock_container *sc)
 196{
 197	sc->sc_tv_advance_stop = ktime_get();
 198}
 199
 200static inline void r2net_set_func_start_time(struct r2net_sock_container *sc)
 201{
 202	sc->sc_tv_func_start = ktime_get();
 203}
 204
 205static inline void r2net_set_func_stop_time(struct r2net_sock_container *sc)
 206{
 207	sc->sc_tv_func_stop = ktime_get();
 208}
 209
 210#else  /* CONFIG_DEBUG_FS */
 211# define r2net_init_nst(a, b, c, d, e)
 212# define r2net_set_nst_sock_time(a)
 213# define r2net_set_nst_send_time(a)
 214# define r2net_set_nst_status_time(a)
 215# define r2net_set_nst_sock_container(a, b)
 216# define r2net_set_nst_msg_id(a, b)
 217# define r2net_set_sock_timer(a)
 218# define r2net_set_data_ready_time(a)
 219# define r2net_set_advance_start_time(a)
 220# define r2net_set_advance_stop_time(a)
 221# define r2net_set_func_start_time(a)
 222# define r2net_set_func_stop_time(a)
 223#endif /* CONFIG_DEBUG_FS */
 224
 225#ifdef CONFIG_RAMSTER_FS_STATS
 226static ktime_t r2net_get_func_run_time(struct r2net_sock_container *sc)
 227{
 228	return ktime_sub(sc->sc_tv_func_stop, sc->sc_tv_func_start);
 229}
 230
 231static void r2net_update_send_stats(struct r2net_send_tracking *nst,
 232				    struct r2net_sock_container *sc)
 233{
 234	sc->sc_tv_status_total = ktime_add(sc->sc_tv_status_total,
 235					   ktime_sub(ktime_get(),
 236						     nst->st_status_time));
 237	sc->sc_tv_send_total = ktime_add(sc->sc_tv_send_total,
 238					 ktime_sub(nst->st_status_time,
 239						   nst->st_send_time));
 240	sc->sc_tv_acquiry_total = ktime_add(sc->sc_tv_acquiry_total,
 241					    ktime_sub(nst->st_send_time,
 242						      nst->st_sock_time));
 243	sc->sc_send_count++;
 244}
 245
 246static void r2net_update_recv_stats(struct r2net_sock_container *sc)
 247{
 248	sc->sc_tv_process_total = ktime_add(sc->sc_tv_process_total,
 249					    r2net_get_func_run_time(sc));
 250	sc->sc_recv_count++;
 251}
 252
 253#else
 254
 255# define r2net_update_send_stats(a, b)
 256
 257# define r2net_update_recv_stats(sc)
 258
 259#endif /* CONFIG_RAMSTER_FS_STATS */
 260
 261static inline int r2net_reconnect_delay(void)
 262{
 263	return r2nm_single_cluster->cl_reconnect_delay_ms;
 264}
 265
 266static inline int r2net_keepalive_delay(void)
 267{
 268	return r2nm_single_cluster->cl_keepalive_delay_ms;
 269}
 270
 271static inline int r2net_idle_timeout(void)
 272{
 273	return r2nm_single_cluster->cl_idle_timeout_ms;
 274}
 275
 276static inline int r2net_sys_err_to_errno(enum r2net_system_error err)
 277{
 278	int trans;
 279	BUG_ON(err >= R2NET_ERR_MAX);
 280	trans = r2net_sys_err_translations[err];
 281
 282	/* Just in case we mess up the translation table above */
 283	BUG_ON(err != R2NET_ERR_NONE && trans == 0);
 284	return trans;
 285}
 286
 287struct r2net_node *r2net_nn_from_num(u8 node_num)
 288{
 289	BUG_ON(node_num >= ARRAY_SIZE(r2net_nodes));
 290	return &r2net_nodes[node_num];
 291}
 292
 293static u8 r2net_num_from_nn(struct r2net_node *nn)
 294{
 295	BUG_ON(nn == NULL);
 296	return nn - r2net_nodes;
 297}
 298
 299/* ------------------------------------------------------------ */
 300
 301static int r2net_prep_nsw(struct r2net_node *nn, struct r2net_status_wait *nsw)
 302{
 303	int ret = 0;
 304
 305	do {
 306		if (!idr_pre_get(&nn->nn_status_idr, GFP_ATOMIC)) {
 307			ret = -EAGAIN;
 308			break;
 309		}
 310		spin_lock(&nn->nn_lock);
 311		ret = idr_get_new(&nn->nn_status_idr, nsw, &nsw->ns_id);
 312		if (ret == 0)
 313			list_add_tail(&nsw->ns_node_item,
 314				      &nn->nn_status_list);
 315		spin_unlock(&nn->nn_lock);
 316	} while (ret == -EAGAIN);
 317
 318	if (ret == 0)  {
 319		init_waitqueue_head(&nsw->ns_wq);
 320		nsw->ns_sys_status = R2NET_ERR_NONE;
 321		nsw->ns_status = 0;
 322	}
 323
 324	return ret;
 325}
 326
 327static void r2net_complete_nsw_locked(struct r2net_node *nn,
 328				      struct r2net_status_wait *nsw,
 329				      enum r2net_system_error sys_status,
 330				      s32 status)
 331{
 332	assert_spin_locked(&nn->nn_lock);
 333
 334	if (!list_empty(&nsw->ns_node_item)) {
 335		list_del_init(&nsw->ns_node_item);
 336		nsw->ns_sys_status = sys_status;
 337		nsw->ns_status = status;
 338		idr_remove(&nn->nn_status_idr, nsw->ns_id);
 339		wake_up(&nsw->ns_wq);
 340	}
 341}
 342
 343static void r2net_complete_nsw(struct r2net_node *nn,
 344			       struct r2net_status_wait *nsw,
 345			       u64 id, enum r2net_system_error sys_status,
 346			       s32 status)
 347{
 348	spin_lock(&nn->nn_lock);
 349	if (nsw == NULL) {
 350		if (id > INT_MAX)
 351			goto out;
 352
 353		nsw = idr_find(&nn->nn_status_idr, id);
 354		if (nsw == NULL)
 355			goto out;
 356	}
 357
 358	r2net_complete_nsw_locked(nn, nsw, sys_status, status);
 359
 360out:
 361	spin_unlock(&nn->nn_lock);
 362	return;
 363}
 364
 365static void r2net_complete_nodes_nsw(struct r2net_node *nn)
 366{
 367	struct r2net_status_wait *nsw, *tmp;
 368	unsigned int num_kills = 0;
 369
 370	assert_spin_locked(&nn->nn_lock);
 371
 372	list_for_each_entry_safe(nsw, tmp, &nn->nn_status_list, ns_node_item) {
 373		r2net_complete_nsw_locked(nn, nsw, R2NET_ERR_DIED, 0);
 374		num_kills++;
 375	}
 376
 377	mlog(0, "completed %d messages for node %u\n", num_kills,
 378	     r2net_num_from_nn(nn));
 379}
 380
 381static int r2net_nsw_completed(struct r2net_node *nn,
 382			       struct r2net_status_wait *nsw)
 383{
 384	int completed;
 385	spin_lock(&nn->nn_lock);
 386	completed = list_empty(&nsw->ns_node_item);
 387	spin_unlock(&nn->nn_lock);
 388	return completed;
 389}
 390
 391/* ------------------------------------------------------------ */
 392
 393static void sc_kref_release(struct kref *kref)
 394{
 395	struct r2net_sock_container *sc = container_of(kref,
 396					struct r2net_sock_container, sc_kref);
 397	BUG_ON(timer_pending(&sc->sc_idle_timeout));
 398
 399	sclog(sc, "releasing\n");
 400
 401	if (sc->sc_sock) {
 402		sock_release(sc->sc_sock);
 403		sc->sc_sock = NULL;
 404	}
 405
 406	r2nm_undepend_item(&sc->sc_node->nd_item);
 407	r2nm_node_put(sc->sc_node);
 408	sc->sc_node = NULL;
 409
 410	r2net_debug_del_sc(sc);
 411	kfree(sc);
 412}
 413
 414static void sc_put(struct r2net_sock_container *sc)
 415{
 416	sclog(sc, "put\n");
 417	kref_put(&sc->sc_kref, sc_kref_release);
 418}
 419static void sc_get(struct r2net_sock_container *sc)
 420{
 421	sclog(sc, "get\n");
 422	kref_get(&sc->sc_kref);
 423}
 424static struct r2net_sock_container *sc_alloc(struct r2nm_node *node)
 425{
 426	struct r2net_sock_container *sc, *ret = NULL;
 427	struct page *page = NULL;
 428	int status = 0;
 429
 430	page = alloc_page(GFP_NOFS);
 431	sc = kzalloc(sizeof(*sc), GFP_NOFS);
 432	if (sc == NULL || page == NULL)
 433		goto out;
 434
 435	kref_init(&sc->sc_kref);
 436	r2nm_node_get(node);
 437	sc->sc_node = node;
 438
 439	/* pin the node item of the remote node */
 440	status = r2nm_depend_item(&node->nd_item);
 441	if (status) {
 442		mlog_errno(status);
 443		r2nm_node_put(node);
 444		goto out;
 445	}
 446	INIT_WORK(&sc->sc_connect_work, r2net_sc_connect_completed);
 447	INIT_WORK(&sc->sc_rx_work, r2net_rx_until_empty);
 448	INIT_WORK(&sc->sc_shutdown_work, r2net_shutdown_sc);
 449	INIT_DELAYED_WORK(&sc->sc_keepalive_work, r2net_sc_send_keep_req);
 450
 451	init_timer(&sc->sc_idle_timeout);
 452	sc->sc_idle_timeout.function = r2net_idle_timer;
 453	sc->sc_idle_timeout.data = (unsigned long)sc;
 454
 455	sclog(sc, "alloced\n");
 456
 457	ret = sc;
 458	sc->sc_page = page;
 459	r2net_debug_add_sc(sc);
 460	sc = NULL;
 461	page = NULL;
 462
 463out:
 464	if (page)
 465		__free_page(page);
 466	kfree(sc);
 467
 468	return ret;
 469}
 470
 471/* ------------------------------------------------------------ */
 472
 473static void r2net_sc_queue_work(struct r2net_sock_container *sc,
 474				struct work_struct *work)
 475{
 476	sc_get(sc);
 477	if (!queue_work(r2net_wq, work))
 478		sc_put(sc);
 479}
 480static void r2net_sc_queue_delayed_work(struct r2net_sock_container *sc,
 481					struct delayed_work *work,
 482					int delay)
 483{
 484	sc_get(sc);
 485	if (!queue_delayed_work(r2net_wq, work, delay))
 486		sc_put(sc);
 487}
 488static void r2net_sc_cancel_delayed_work(struct r2net_sock_container *sc,
 489					 struct delayed_work *work)
 490{
 491	if (cancel_delayed_work(work))
 492		sc_put(sc);
 493}
 494
 495static atomic_t r2net_connected_peers = ATOMIC_INIT(0);
 496
 497int r2net_num_connected_peers(void)
 498{
 499	return atomic_read(&r2net_connected_peers);
 500}
 501
 502static void r2net_set_nn_state(struct r2net_node *nn,
 503			       struct r2net_sock_container *sc,
 504			       unsigned valid, int err)
 505{
 506	int was_valid = nn->nn_sc_valid;
 507	int was_err = nn->nn_persistent_error;
 508	struct r2net_sock_container *old_sc = nn->nn_sc;
 509
 510	assert_spin_locked(&nn->nn_lock);
 511
 512	if (old_sc && !sc)
 513		atomic_dec(&r2net_connected_peers);
 514	else if (!old_sc && sc)
 515		atomic_inc(&r2net_connected_peers);
 516
 517	/* the node num comparison and single connect/accept path should stop
 518	 * an non-null sc from being overwritten with another */
 519	BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
 520	mlog_bug_on_msg(err && valid, "err %d valid %u\n", err, valid);
 521	mlog_bug_on_msg(valid && !sc, "valid %u sc %p\n", valid, sc);
 522
 523	if (was_valid && !valid && err == 0)
 524		err = -ENOTCONN;
 525
 526	mlog(ML_CONN, "node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n",
 527	     r2net_num_from_nn(nn), nn->nn_sc, sc, nn->nn_sc_valid, valid,
 528	     nn->nn_persistent_error, err);
 529
 530	nn->nn_sc = sc;
 531	nn->nn_sc_valid = valid ? 1 : 0;
 532	nn->nn_persistent_error = err;
 533
 534	/* mirrors r2net_tx_can_proceed() */
 535	if (nn->nn_persistent_error || nn->nn_sc_valid)
 536		wake_up(&nn->nn_sc_wq);
 537
 538	if (!was_err && nn->nn_persistent_error) {
 539		queue_delayed_work(r2net_wq, &nn->nn_still_up,
 540				   msecs_to_jiffies(R2NET_QUORUM_DELAY_MS));
 541	}
 542
 543	if (was_valid && !valid) {
 544		printk(KERN_NOTICE "ramster: No longer connected to "
 545		       SC_NODEF_FMT "\n",
 546			old_sc->sc_node->nd_name, old_sc->sc_node->nd_num,
 547			&old_sc->sc_node->nd_ipv4_address,
 548			ntohs(old_sc->sc_node->nd_ipv4_port));
 549		r2net_complete_nodes_nsw(nn);
 550	}
 551
 552	if (!was_valid && valid) {
 553		cancel_delayed_work(&nn->nn_connect_expired);
 554		printk(KERN_NOTICE "ramster: %s " SC_NODEF_FMT "\n",
 555		       r2nm_this_node() > sc->sc_node->nd_num ?
 556		       "Connected to" : "Accepted connection from",
 557		       sc->sc_node->nd_name, sc->sc_node->nd_num,
 558			&sc->sc_node->nd_ipv4_address,
 559			ntohs(sc->sc_node->nd_ipv4_port));
 560	}
 561
 562	/* trigger the connecting worker func as long as we're not valid,
 563	 * it will back off if it shouldn't connect.  This can be called
 564	 * from node config teardown and so needs to be careful about
 565	 * the work queue actually being up. */
 566	if (!valid && r2net_wq) {
 567		unsigned long delay;
 568		/* delay if we're within a RECONNECT_DELAY of the
 569		 * last attempt */
 570		delay = (nn->nn_last_connect_attempt +
 571			 msecs_to_jiffies(r2net_reconnect_delay()))
 572			- jiffies;
 573		if (delay > msecs_to_jiffies(r2net_reconnect_delay()))
 574			delay = 0;
 575		mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
 576		queue_delayed_work(r2net_wq, &nn->nn_connect_work, delay);
 577
 578		/*
 579		 * Delay the expired work after idle timeout.
 580		 *
 581		 * We might have lots of failed connection attempts that run
 582		 * through here but we only cancel the connect_expired work when
 583		 * a connection attempt succeeds.  So only the first enqueue of
 584		 * the connect_expired work will do anything.  The rest will see
 585		 * that it's already queued and do nothing.
 586		 */
 587		delay += msecs_to_jiffies(r2net_idle_timeout());
 588		queue_delayed_work(r2net_wq, &nn->nn_connect_expired, delay);
 589	}
 590
 591	/* keep track of the nn's sc ref for the caller */
 592	if ((old_sc == NULL) && sc)
 593		sc_get(sc);
 594	if (old_sc && (old_sc != sc)) {
 595		r2net_sc_queue_work(old_sc, &old_sc->sc_shutdown_work);
 596		sc_put(old_sc);
 597	}
 598}
 599
 600/* see r2net_register_callbacks() */
 601static void r2net_data_ready(struct sock *sk, int bytes)
 602{
 603	void (*ready)(struct sock *sk, int bytes);
 604
 605	read_lock(&sk->sk_callback_lock);
 606	if (sk->sk_user_data) {
 607		struct r2net_sock_container *sc = sk->sk_user_data;
 608		sclog(sc, "data_ready hit\n");
 609		r2net_set_data_ready_time(sc);
 610		r2net_sc_queue_work(sc, &sc->sc_rx_work);
 611		ready = sc->sc_data_ready;
 612	} else {
 613		ready = sk->sk_data_ready;
 614	}
 615	read_unlock(&sk->sk_callback_lock);
 616
 617	ready(sk, bytes);
 618}
 619
 620/* see r2net_register_callbacks() */
 621static void r2net_state_change(struct sock *sk)
 622{
 623	void (*state_change)(struct sock *sk);
 624	struct r2net_sock_container *sc;
 625
 626	read_lock(&sk->sk_callback_lock);
 627	sc = sk->sk_user_data;
 628	if (sc == NULL) {
 629		state_change = sk->sk_state_change;
 630		goto out;
 631	}
 632
 633	sclog(sc, "state_change to %d\n", sk->sk_state);
 634
 635	state_change = sc->sc_state_change;
 636
 637	switch (sk->sk_state) {
 638
 639	/* ignore connecting sockets as they make progress */
 640	case TCP_SYN_SENT:
 641	case TCP_SYN_RECV:
 642		break;
 643	case TCP_ESTABLISHED:
 644		r2net_sc_queue_work(sc, &sc->sc_connect_work);
 645		break;
 646	default:
 647		printk(KERN_INFO "ramster: Connection to "
 648			SC_NODEF_FMT " shutdown, state %d\n",
 649			sc->sc_node->nd_name, sc->sc_node->nd_num,
 650			&sc->sc_node->nd_ipv4_address,
 651			ntohs(sc->sc_node->nd_ipv4_port), sk->sk_state);
 652		r2net_sc_queue_work(sc, &sc->sc_shutdown_work);
 653		break;
 654
 655	}
 656out:
 657	read_unlock(&sk->sk_callback_lock);
 658	state_change(sk);
 659}
 660
 661/*
 662 * we register callbacks so we can queue work on events before calling
 663 * the original callbacks.  our callbacks are careful to test user_data
 664 * to discover when they've reaced with r2net_unregister_callbacks().
 665 */
 666static void r2net_register_callbacks(struct sock *sk,
 667				     struct r2net_sock_container *sc)
 668{
 669	write_lock_bh(&sk->sk_callback_lock);
 670
 671	/* accepted sockets inherit the old listen socket data ready */
 672	if (sk->sk_data_ready == r2net_listen_data_ready) {
 673		sk->sk_data_ready = sk->sk_user_data;
 674		sk->sk_user_data = NULL;
 675	}
 676
 677	BUG_ON(sk->sk_user_data != NULL);
 678	sk->sk_user_data = sc;
 679	sc_get(sc);
 680
 681	sc->sc_data_ready = sk->sk_data_ready;
 682	sc->sc_state_change = sk->sk_state_change;
 683	sk->sk_data_ready = r2net_data_ready;
 684	sk->sk_state_change = r2net_state_change;
 685
 686	mutex_init(&sc->sc_send_lock);
 687
 688	write_unlock_bh(&sk->sk_callback_lock);
 689}
 690
 691static int r2net_unregister_callbacks(struct sock *sk,
 692					struct r2net_sock_container *sc)
 693{
 694	int ret = 0;
 695
 696	write_lock_bh(&sk->sk_callback_lock);
 697	if (sk->sk_user_data == sc) {
 698		ret = 1;
 699		sk->sk_user_data = NULL;
 700		sk->sk_data_ready = sc->sc_data_ready;
 701		sk->sk_state_change = sc->sc_state_change;
 702	}
 703	write_unlock_bh(&sk->sk_callback_lock);
 704
 705	return ret;
 706}
 707
 708/*
 709 * this is a little helper that is called by callers who have seen a problem
 710 * with an sc and want to detach it from the nn if someone already hasn't beat
 711 * them to it.  if an error is given then the shutdown will be persistent
 712 * and pending transmits will be canceled.
 713 */
 714static void r2net_ensure_shutdown(struct r2net_node *nn,
 715					struct r2net_sock_container *sc,
 716				   int err)
 717{
 718	spin_lock(&nn->nn_lock);
 719	if (nn->nn_sc == sc)
 720		r2net_set_nn_state(nn, NULL, 0, err);
 721	spin_unlock(&nn->nn_lock);
 722}
 723
 724/*
 725 * This work queue function performs the blocking parts of socket shutdown.  A
 726 * few paths lead here.  set_nn_state will trigger this callback if it sees an
 727 * sc detached from the nn.  state_change will also trigger this callback
 728 * directly when it sees errors.  In that case we need to call set_nn_state
 729 * ourselves as state_change couldn't get the nn_lock and call set_nn_state
 730 * itself.
 731 */
 732static void r2net_shutdown_sc(struct work_struct *work)
 733{
 734	struct r2net_sock_container *sc =
 735		container_of(work, struct r2net_sock_container,
 736			     sc_shutdown_work);
 737	struct r2net_node *nn = r2net_nn_from_num(sc->sc_node->nd_num);
 738
 739	sclog(sc, "shutting down\n");
 740
 741	/* drop the callbacks ref and call shutdown only once */
 742	if (r2net_unregister_callbacks(sc->sc_sock->sk, sc)) {
 743		/* we shouldn't flush as we're in the thread, the
 744		 * races with pending sc work structs are harmless */
 745		del_timer_sync(&sc->sc_idle_timeout);
 746		r2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
 747		sc_put(sc);
 748		kernel_sock_shutdown(sc->sc_sock, SHUT_RDWR);
 749	}
 750
 751	/* not fatal so failed connects before the other guy has our
 752	 * heartbeat can be retried */
 753	r2net_ensure_shutdown(nn, sc, 0);
 754	sc_put(sc);
 755}
 756
 757/* ------------------------------------------------------------ */
 758
 759static int r2net_handler_cmp(struct r2net_msg_handler *nmh, u32 msg_type,
 760			     u32 key)
 761{
 762	int ret = memcmp(&nmh->nh_key, &key, sizeof(key));
 763
 764	if (ret == 0)
 765		ret = memcmp(&nmh->nh_msg_type, &msg_type, sizeof(msg_type));
 766
 767	return ret;
 768}
 769
 770static struct r2net_msg_handler *
 771r2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p,
 772				struct rb_node **ret_parent)
 773{
 774	struct rb_node **p = &r2net_handler_tree.rb_node;
 775	struct rb_node *parent = NULL;
 776	struct r2net_msg_handler *nmh, *ret = NULL;
 777	int cmp;
 778
 779	while (*p) {
 780		parent = *p;
 781		nmh = rb_entry(parent, struct r2net_msg_handler, nh_node);
 782		cmp = r2net_handler_cmp(nmh, msg_type, key);
 783
 784		if (cmp < 0)
 785			p = &(*p)->rb_left;
 786		else if (cmp > 0)
 787			p = &(*p)->rb_right;
 788		else {
 789			ret = nmh;
 790			break;
 791		}
 792	}
 793
 794	if (ret_p != NULL)
 795		*ret_p = p;
 796	if (ret_parent != NULL)
 797		*ret_parent = parent;
 798
 799	return ret;
 800}
 801
 802static void r2net_handler_kref_release(struct kref *kref)
 803{
 804	struct r2net_msg_handler *nmh;
 805	nmh = container_of(kref, struct r2net_msg_handler, nh_kref);
 806
 807	kfree(nmh);
 808}
 809
 810static void r2net_handler_put(struct r2net_msg_handler *nmh)
 811{
 812	kref_put(&nmh->nh_kref, r2net_handler_kref_release);
 813}
 814
 815/* max_len is protection for the handler func.  incoming messages won't
 816 * be given to the handler if their payload is longer than the max. */
 817int r2net_register_handler(u32 msg_type, u32 key, u32 max_len,
 818			   r2net_msg_handler_func *func, void *data,
 819			   r2net_post_msg_handler_func *post_func,
 820			   struct list_head *unreg_list)
 821{
 822	struct r2net_msg_handler *nmh = NULL;
 823	struct rb_node **p, *parent;
 824	int ret = 0;
 825
 826	if (max_len > R2NET_MAX_PAYLOAD_BYTES) {
 827		mlog(0, "max_len for message handler out of range: %u\n",
 828			max_len);
 829		ret = -EINVAL;
 830		goto out;
 831	}
 832
 833	if (!msg_type) {
 834		mlog(0, "no message type provided: %u, %p\n", msg_type, func);
 835		ret = -EINVAL;
 836		goto out;
 837
 838	}
 839	if (!func) {
 840		mlog(0, "no message handler provided: %u, %p\n",
 841		       msg_type, func);
 842		ret = -EINVAL;
 843		goto out;
 844	}
 845
 846	nmh = kzalloc(sizeof(struct r2net_msg_handler), GFP_NOFS);
 847	if (nmh == NULL) {
 848		ret = -ENOMEM;
 849		goto out;
 850	}
 851
 852	nmh->nh_func = func;
 853	nmh->nh_func_data = data;
 854	nmh->nh_post_func = post_func;
 855	nmh->nh_msg_type = msg_type;
 856	nmh->nh_max_len = max_len;
 857	nmh->nh_key = key;
 858	/* the tree and list get this ref.. they're both removed in
 859	 * unregister when this ref is dropped */
 860	kref_init(&nmh->nh_kref);
 861	INIT_LIST_HEAD(&nmh->nh_unregister_item);
 862
 863	write_lock(&r2net_handler_lock);
 864	if (r2net_handler_tree_lookup(msg_type, key, &p, &parent))
 865		ret = -EEXIST;
 866	else {
 867		rb_link_node(&nmh->nh_node, parent, p);
 868		rb_insert_color(&nmh->nh_node, &r2net_handler_tree);
 869		list_add_tail(&nmh->nh_unregister_item, unreg_list);
 870
 871		mlog(ML_TCP, "registered handler func %p type %u key %08x\n",
 872		     func, msg_type, key);
 873		/* we've had some trouble with handlers seemingly vanishing. */
 874		mlog_bug_on_msg(r2net_handler_tree_lookup(msg_type, key, &p,
 875							  &parent) == NULL,
 876				"couldn't find handler we *just* registered "
 877				"for type %u key %08x\n", msg_type, key);
 878	}
 879	write_unlock(&r2net_handler_lock);
 880	if (ret)
 881		goto out;
 882
 883out:
 884	if (ret)
 885		kfree(nmh);
 886
 887	return ret;
 888}
 889EXPORT_SYMBOL_GPL(r2net_register_handler);
 890
 891void r2net_unregister_handler_list(struct list_head *list)
 892{
 893	struct r2net_msg_handler *nmh, *n;
 894
 895	write_lock(&r2net_handler_lock);
 896	list_for_each_entry_safe(nmh, n, list, nh_unregister_item) {
 897		mlog(ML_TCP, "unregistering handler func %p type %u key %08x\n",
 898		     nmh->nh_func, nmh->nh_msg_type, nmh->nh_key);
 899		rb_erase(&nmh->nh_node, &r2net_handler_tree);
 900		list_del_init(&nmh->nh_unregister_item);
 901		kref_put(&nmh->nh_kref, r2net_handler_kref_release);
 902	}
 903	write_unlock(&r2net_handler_lock);
 904}
 905EXPORT_SYMBOL_GPL(r2net_unregister_handler_list);
 906
 907static struct r2net_msg_handler *r2net_handler_get(u32 msg_type, u32 key)
 908{
 909	struct r2net_msg_handler *nmh;
 910
 911	read_lock(&r2net_handler_lock);
 912	nmh = r2net_handler_tree_lookup(msg_type, key, NULL, NULL);
 913	if (nmh)
 914		kref_get(&nmh->nh_kref);
 915	read_unlock(&r2net_handler_lock);
 916
 917	return nmh;
 918}
 919
 920/* ------------------------------------------------------------ */
 921
 922static int r2net_recv_tcp_msg(struct socket *sock, void *data, size_t len)
 923{
 924	int ret;
 925	mm_segment_t oldfs;
 926	struct kvec vec = {
 927		.iov_len = len,
 928		.iov_base = data,
 929	};
 930	struct msghdr msg = {
 931		.msg_iovlen = 1,
 932		.msg_iov = (struct iovec *)&vec,
 933		.msg_flags = MSG_DONTWAIT,
 934	};
 935
 936	oldfs = get_fs();
 937	set_fs(get_ds());
 938	ret = sock_recvmsg(sock, &msg, len, msg.msg_flags);
 939	set_fs(oldfs);
 940
 941	return ret;
 942}
 943
 944static int r2net_send_tcp_msg(struct socket *sock, struct kvec *vec,
 945			      size_t veclen, size_t total)
 946{
 947	int ret;
 948	mm_segment_t oldfs;
 949	struct msghdr msg = {
 950		.msg_iov = (struct iovec *)vec,
 951		.msg_iovlen = veclen,
 952	};
 953
 954	if (sock == NULL) {
 955		ret = -EINVAL;
 956		goto out;
 957	}
 958
 959	oldfs = get_fs();
 960	set_fs(get_ds());
 961	ret = sock_sendmsg(sock, &msg, total);
 962	set_fs(oldfs);
 963	if (ret != total) {
 964		mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret,
 965		     total);
 966		if (ret >= 0)
 967			ret = -EPIPE; /* should be smarter, I bet */
 968		goto out;
 969	}
 970
 971	ret = 0;
 972out:
 973	if (ret < 0)
 974		mlog(0, "returning error: %d\n", ret);
 975	return ret;
 976}
 977
 978static void r2net_sendpage(struct r2net_sock_container *sc,
 979			   void *kmalloced_virt,
 980			   size_t size)
 981{
 982	struct r2net_node *nn = r2net_nn_from_num(sc->sc_node->nd_num);
 983	ssize_t ret;
 984
 985	while (1) {
 986		mutex_lock(&sc->sc_send_lock);
 987		ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
 988					virt_to_page(kmalloced_virt),
 989					(long)kmalloced_virt & ~PAGE_MASK,
 990					size, MSG_DONTWAIT);
 991		mutex_unlock(&sc->sc_send_lock);
 992		if (ret == size)
 993			break;
 994		if (ret == (ssize_t)-EAGAIN) {
 995			mlog(0, "sendpage of size %zu to " SC_NODEF_FMT
 996			     " returned EAGAIN\n", size, sc->sc_node->nd_name,
 997				sc->sc_node->nd_num,
 998				&sc->sc_node->nd_ipv4_address,
 999				ntohs(sc->sc_node->nd_ipv4_port));
1000			cond_resched();
1001			continue;
1002		}
1003		mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT
1004		     " failed with %zd\n", size, sc->sc_node->nd_name,
1005			sc->sc_node->nd_num, &sc->sc_node->nd_ipv4_address,
1006			ntohs(sc->sc_node->nd_ipv4_port), ret);
1007		r2net_ensure_shutdown(nn, sc, 0);
1008		break;
1009	}
1010}
1011
1012static void r2net_init_msg(struct r2net_msg *msg, u16 data_len,
1013				u16 msg_type, u32 key)
1014{
1015	memset(msg, 0, sizeof(struct r2net_msg));
1016	msg->magic = cpu_to_be16(R2NET_MSG_MAGIC);
1017	msg->data_len = cpu_to_be16(data_len);
1018	msg->msg_type = cpu_to_be16(msg_type);
1019	msg->sys_status = cpu_to_be32(R2NET_ERR_NONE);
1020	msg->status = 0;
1021	msg->key = cpu_to_be32(key);
1022}
1023
1024static int r2net_tx_can_proceed(struct r2net_node *nn,
1025				struct r2net_sock_container **sc_ret,
1026				int *error)
1027{
1028	int ret = 0;
1029
1030	spin_lock(&nn->nn_lock);
1031	if (nn->nn_persistent_error) {
1032		ret = 1;
1033		*sc_ret = NULL;
1034		*error = nn->nn_persistent_error;
1035	} else if (nn->nn_sc_valid) {
1036		kref_get(&nn->nn_sc->sc_kref);
1037
1038		ret = 1;
1039		*sc_ret = nn->nn_sc;
1040		*error = 0;
1041	}
1042	spin_unlock(&nn->nn_lock);
1043
1044	return ret;
1045}
1046
1047/* Get a map of all nodes to which this node is currently connected to */
1048void r2net_fill_node_map(unsigned long *map, unsigned bytes)
1049{
1050	struct r2net_sock_container *sc;
1051	int node, ret;
1052
1053	BUG_ON(bytes < (BITS_TO_LONGS(R2NM_MAX_NODES) * sizeof(unsigned long)));
1054
1055	memset(map, 0, bytes);
1056	for (node = 0; node < R2NM_MAX_NODES; ++node) {
1057		r2net_tx_can_proceed(r2net_nn_from_num(node), &sc, &ret);
1058		if (!ret) {
1059			set_bit(node, map);
1060			sc_put(sc);
1061		}
1062	}
1063}
1064EXPORT_SYMBOL_GPL(r2net_fill_node_map);
1065
1066int r2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
1067			   size_t caller_veclen, u8 target_node, int *status)
1068{
1069	int ret = 0;
1070	struct r2net_msg *msg = NULL;
1071	size_t veclen, caller_bytes = 0;
1072	struct kvec *vec = NULL;
1073	struct r2net_sock_container *sc = NULL;
1074	struct r2net_node *nn = r2net_nn_from_num(target_node);
1075	struct r2net_status_wait nsw = {
1076		.ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item),
1077	};
1078	struct r2net_send_tracking nst;
1079
1080	/* this may be a general bug fix */
1081	init_waitqueue_head(&nsw.ns_wq);
1082
1083	r2net_init_nst(&nst, msg_type, key, current, target_node);
1084
1085	if (r2net_wq == NULL) {
1086		mlog(0, "attempt to tx without r2netd running\n");
1087		ret = -ESRCH;
1088		goto out;
1089	}
1090
1091	if (caller_veclen == 0) {
1092		mlog(0, "bad kvec array length\n");
1093		ret = -EINVAL;
1094		goto out;
1095	}
1096
1097	caller_bytes = iov_length((struct iovec *)caller_vec, caller_veclen);
1098	if (caller_bytes > R2NET_MAX_PAYLOAD_BYTES) {
1099		mlog(0, "total payload len %zu too large\n", caller_bytes);
1100		ret = -EINVAL;
1101		goto out;
1102	}
1103
1104	if (target_node == r2nm_this_node()) {
1105		ret = -ELOOP;
1106		goto out;
1107	}
1108
1109	r2net_debug_add_nst(&nst);
1110
1111	r2net_set_nst_sock_time(&nst);
1112
1113	wait_event(nn->nn_sc_wq, r2net_tx_can_proceed(nn, &sc, &ret));
1114	if (ret)
1115		goto out;
1116
1117	r2net_set_nst_sock_container(&nst, sc);
1118
1119	veclen = caller_veclen + 1;
1120	vec = kmalloc(sizeof(struct kvec) * veclen, GFP_ATOMIC);
1121	if (vec == NULL) {
1122		mlog(0, "failed to %zu element kvec!\n", veclen);
1123		ret = -ENOMEM;
1124		goto out;
1125	}
1126
1127	msg = kmalloc(sizeof(struct r2net_msg), GFP_ATOMIC);
1128	if (!msg) {
1129		mlog(0, "failed to allocate a r2net_msg!\n");
1130		ret = -ENOMEM;
1131		goto out;
1132	}
1133
1134	r2net_init_msg(msg, caller_bytes, msg_type, key);
1135
1136	vec[0].iov_len = sizeof(struct r2net_msg);
1137	vec[0].iov_base = msg;
1138	memcpy(&vec[1], caller_vec, caller_veclen * sizeof(struct kvec));
1139
1140	ret = r2net_prep_nsw(nn, &nsw);
1141	if (ret)
1142		goto out;
1143
1144	msg->msg_num = cpu_to_be32(nsw.ns_id);
1145	r2net_set_nst_msg_id(&nst, nsw.ns_id);
1146
1147	r2net_set_nst_send_time(&nst);
1148
1149	/* finally, convert the message header to network byte-order
1150	 * and send */
1151	mutex_lock(&sc->sc_send_lock);
1152	ret = r2net_send_tcp_msg(sc->sc_sock, vec, veclen,
1153				 sizeof(struct r2net_msg) + caller_bytes);
1154	mutex_unlock(&sc->sc_send_lock);
1155	msglog(msg, "sending returned %d\n", ret);
1156	if (ret < 0) {
1157		mlog(0, "error returned from r2net_send_tcp_msg=%d\n", ret);
1158		goto out;
1159	}
1160
1161	/* wait on other node's handler */
1162	r2net_set_nst_status_time(&nst);
1163	wait_event(nsw.ns_wq, r2net_nsw_completed(nn, &nsw));
1164
1165	r2net_update_send_stats(&nst, sc);
1166
1167	/* Note that we avoid overwriting the callers status return
1168	 * variable if a system error was reported on the other
1169	 * side. Callers beware. */
1170	ret = r2net_sys_err_to_errno(nsw.ns_sys_status);
1171	if (status && !ret)
1172		*status = nsw.ns_status;
1173
1174	mlog(0, "woken, returning system status %d, user status %d\n",
1175	     ret, nsw.ns_status);
1176out:
1177	r2net_debug_del_nst(&nst); /* must be before dropping sc and node */
1178	if (sc)
1179		sc_put(sc);
1180	kfree(vec);
1181	kfree(msg);
1182	r2net_complete_nsw(nn, &nsw, 0, 0, 0);
1183	return ret;
1184}
1185EXPORT_SYMBOL_GPL(r2net_send_message_vec);
1186
1187int r2net_send_message(u32 msg_type, u32 key, void *data, u32 len,
1188		       u8 target_node, int *status)
1189{
1190	struct kvec vec = {
1191		.iov_base = data,
1192		.iov_len = len,
1193	};
1194	return r2net_send_message_vec(msg_type, key, &vec, 1,
1195				      target_node, status);
1196}
1197EXPORT_SYMBOL_GPL(r2net_send_message);
1198
1199static int r2net_send_status_magic(struct socket *sock, struct r2net_msg *hdr,
1200				   enum r2net_system_error syserr, int err)
1201{
1202	struct kvec vec = {
1203		.iov_base = hdr,
1204		.iov_len = sizeof(struct r2net_msg),
1205	};
1206
1207	BUG_ON(syserr >= R2NET_ERR_MAX);
1208
1209	/* leave other fields intact from the incoming message, msg_num
1210	 * in particular */
1211	hdr->sys_status = cpu_to_be32(syserr);
1212	hdr->status = cpu_to_be32(err);
1213	/* twiddle the magic */
1214	hdr->magic = cpu_to_be16(R2NET_MSG_STATUS_MAGIC);
1215	hdr->data_len = 0;
1216
1217	msglog(hdr, "about to send status magic %d\n", err);
1218	/* hdr has been in host byteorder this whole time */
1219	return r2net_send_tcp_msg(sock, &vec, 1, sizeof(struct r2net_msg));
1220}
1221
1222/*
1223 * "data magic" is a long version of "status magic" where the message
1224 * payload actually contains data to be passed in reply to certain messages
1225 */
1226static int r2net_send_data_magic(struct r2net_sock_container *sc,
1227			  struct r2net_msg *hdr,
1228			  void *data, size_t data_len,
1229			  enum r2net_system_error syserr, int err)
1230{
1231	struct kvec vec[2];
1232	int ret;
1233
1234	vec[0].iov_base = hdr;
1235	vec[0].iov_len = sizeof(struct r2net_msg);
1236	vec[1].iov_base = data;
1237	vec[1].iov_len = data_len;
1238
1239	BUG_ON(syserr >= R2NET_ERR_MAX);
1240
1241	/* leave other fields intact from the incoming message, msg_num
1242	 * in particular */
1243	hdr->sys_status = cpu_to_be32(syserr);
1244	hdr->status = cpu_to_be32(err);
1245	hdr->magic = cpu_to_be16(R2NET_MSG_DATA_MAGIC);  /* twiddle magic */
1246	hdr->data_len = cpu_to_be16(data_len);
1247
1248	msglog(hdr, "about to send data magic %d\n", err);
1249	/* hdr has been in host byteorder this whole time */
1250	ret = r2net_send_tcp_msg(sc->sc_sock, vec, 2,
1251			sizeof(struct r2net_msg) + data_len);
1252	return ret;
1253}
1254
1255/*
1256 * called by a message handler to convert an otherwise normal reply
1257 * message into a "data magic" message
1258 */
1259void r2net_force_data_magic(struct r2net_msg *hdr, u16 msgtype, u32 msgkey)
1260{
1261	hdr->magic = cpu_to_be16(R2NET_MSG_DATA_MAGIC);
1262	hdr->msg_type = cpu_to_be16(msgtype);
1263	hdr->key = cpu_to_be32(msgkey);
1264}
1265
1266/* this returns -errno if the header was unknown or too large, etc.
1267 * after this is called the buffer us reused for the next message */
1268static int r2net_process_message(struct r2net_sock_container *sc,
1269				 struct r2net_msg *hdr)
1270{
1271	struct r2net_node *nn = r2net_nn_from_num(sc->sc_node->nd_num);
1272	int ret = 0, handler_status;
1273	enum  r2net_system_error syserr;
1274	struct r2net_msg_handler *nmh = NULL;
1275	void *ret_data = NULL;
1276	int data_magic = 0;
1277
1278	msglog(hdr, "processing message\n");
1279
1280	r2net_sc_postpone_idle(sc);
1281
1282	switch (be16_to_cpu(hdr->magic)) {
1283
1284	case R2NET_MSG_STATUS_MAGIC:
1285		/* special type for returning message status */
1286		r2net_complete_nsw(nn, NULL, be32_to_cpu(hdr->msg_num),
1287						be32_to_cpu(hdr->sys_status),
1288						be32_to_cpu(hdr->status));
1289		goto out;
1290	case R2NET_MSG_KEEP_REQ_MAGIC:
1291		r2net_sendpage(sc, r2net_keep_resp, sizeof(*r2net_keep_resp));
1292		goto out;
1293	case R2NET_MSG_KEEP_RESP_MAGIC:
1294		goto out;
1295	case R2NET_MSG_MAGIC:
1296		break;
1297	case R2NET_MSG_DATA_MAGIC:
1298		/*
1299		 * unlike a normal status magic, a data magic DOES
1300		 * (MUST) have a handler, so the control flow is
1301		 * a little funky here as a result
1302		 */
1303		data_magic = 1;
1304		break;
1305	default:
1306		msglog(hdr, "bad magic\n");
1307		ret = -EINVAL;
1308		goto out;
1309		break;
1310	}
1311
1312	/* find a handler for it */
1313	handler_status = 0;
1314	nmh = r2net_handler_get(be16_to_cpu(hdr->msg_type),
1315				be32_to_cpu(hdr->key));
1316	if (!nmh) {
1317		mlog(ML_TCP, "couldn't find handler for type %u key %08x\n",
1318		     be16_to_cpu(hdr->msg_type), be32_to_cpu(hdr->key));
1319		syserr = R2NET_ERR_NO_HNDLR;
1320		goto out_respond;
1321	}
1322
1323	syserr = R2NET_ERR_NONE;
1324
1325	if (be16_to_cpu(hdr->data_len) > nmh->nh_max_len)
1326		syserr = R2NET_ERR_OVERFLOW;
1327
1328	if (syserr != R2NET_ERR_NONE)
1329		goto out_respond;
1330
1331	r2net_set_func_start_time(sc);
1332	sc->sc_msg_key = be32_to_cpu(hdr->key);
1333	sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
1334	handler_status = (nmh->nh_func)(hdr, sizeof(struct r2net_msg) +
1335					     be16_to_cpu(hdr->data_len),
1336					nmh->nh_func_data, &ret_data);
1337	if (data_magic) {
1338		/*
1339		 * handler handled data sent in reply to request
1340		 * so complete the transaction
1341		 */
1342		r2net_complete_nsw(nn, NULL, be32_to_cpu(hdr->msg_num),
1343			be32_to_cpu(hdr->sys_status), handler_status);
1344		goto out;
1345	}
1346	/*
1347	 * handler changed magic to DATA_MAGIC to reply to request for data,
1348	 * implies ret_data points to data to return and handler_status
1349	 * is the number of bytes of data
1350	 */
1351	if (be16_to_cpu(hdr->magic) == R2NET_MSG_DATA_MAGIC) {
1352		ret = r2net_send_data_magic(sc, hdr,
1353						ret_data, handler_status,
1354						syserr, 0);
1355		hdr = NULL;
1356		mlog(0, "sending data reply %d, syserr %d returned %d\n",
1357			handler_status, syserr, ret);
1358		r2net_set_func_stop_time(sc);
1359
1360		r2net_update_recv_stats(sc);
1361		goto out;
1362	}
1363	r2net_set_func_stop_time(sc);
1364
1365	r2net_update_recv_stats(sc);
1366
1367out_respond:
1368	/* this destroys the hdr, so don't use it after this */
1369	mutex_lock(&sc->sc_send_lock);
1370	ret = r2net_send_status_magic(sc->sc_sock, hdr, syserr,
1371				      handler_status);
1372	mutex_unlock(&sc->sc_send_lock);
1373	hdr = NULL;
1374	mlog(0, "sending handler status %d, syserr %d returned %d\n",
1375	     handler_status, syserr, ret);
1376
1377	if (nmh) {
1378		BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL);
1379		if (nmh->nh_post_func)
1380			(nmh->nh_post_func)(handler_status, nmh->nh_func_data,
1381					    ret_data);
1382	}
1383
1384out:
1385	if (nmh)
1386		r2net_handler_put(nmh);
1387	return ret;
1388}
1389
1390static int r2net_check_handshake(struct r2net_sock_container *sc)
1391{
1392	struct r2net_handshake *hand = page_address(sc->sc_page);
1393	struct r2net_node *nn = r2net_nn_from_num(sc->sc_node->nd_num);
1394
1395	if (hand->protocol_version != cpu_to_be64(R2NET_PROTOCOL_VERSION)) {
1396		printk(KERN_NOTICE "ramster: " SC_NODEF_FMT " Advertised net "
1397		       "protocol version %llu but %llu is required. "
1398		       "Disconnecting.\n", sc->sc_node->nd_name,
1399			sc->sc_node->nd_num, &sc->sc_node->nd_ipv4_address,
1400			ntohs(sc->sc_node->nd_ipv4_port),
1401		       (unsigned long long)be64_to_cpu(hand->protocol_version),
1402		       R2NET_PROTOCOL_VERSION);
1403
1404		/* don't bother reconnecting if its the wrong version. */
1405		r2net_ensure_shutdown(nn, sc, -ENOTCONN);
1406		return -1;
1407	}
1408
1409	/*
1410	 * Ensure timeouts are consistent with other nodes, otherwise
1411	 * we can end up with one node thinking that the other must be down,
1412	 * but isn't. This can ultimately cause corruption.
1413	 */
1414	if (be32_to_cpu(hand->r2net_idle_timeout_ms) !=
1415				r2net_idle_timeout()) {
1416		printk(KERN_NOTICE "ramster: " SC_NODEF_FMT " uses a network "
1417		       "idle timeout of %u ms, but we use %u ms locally. "
1418		       "Disconnecting.\n", sc->sc_node->nd_name,
1419			sc->sc_node->nd_num, &sc->sc_node->nd_ipv4_address,
1420			ntohs(sc->sc_node->nd_ipv4_port),
1421		       be32_to_cpu(hand->r2net_idle_timeout_ms),
1422		       r2net_idle_timeout());
1423		r2net_ensure_shutdown(nn, sc, -ENOTCONN);
1424		return -1;
1425	}
1426
1427	if (be32_to_cpu(hand->r2net_keepalive_delay_ms) !=
1428			r2net_keepalive_delay()) {
1429		printk(KERN_NOTICE "ramster: " SC_NODEF_FMT " uses a keepalive "
1430		       "delay of %u ms, but we use %u ms locally. "
1431		       "Disconnecting.\n", sc->sc_node->nd_name,
1432			sc->sc_node->nd_num, &sc->sc_node->nd_ipv4_address,
1433			ntohs(sc->sc_node->nd_ipv4_port),
1434		       be32_to_cpu(hand->r2net_keepalive_delay_ms),
1435		       r2net_keepalive_delay());
1436		r2net_ensure_shutdown(nn, sc, -ENOTCONN);
1437		return -1;
1438	}
1439
1440	if (be32_to_cpu(hand->r2hb_heartbeat_timeout_ms) !=
1441			R2HB_MAX_WRITE_TIMEOUT_MS) {
1442		printk(KERN_NOTICE "ramster: " SC_NODEF_FMT " uses a heartbeat "
1443		       "timeout of %u ms, but we use %u ms locally. "
1444		       "Disconnecting.\n", sc->sc_node->nd_name,
1445			sc->sc_node->nd_num, &sc->sc_node->nd_ipv4_address,
1446			ntohs(sc->sc_node->nd_ipv4_port),
1447		       be32_to_cpu(hand->r2hb_heartbeat_timeout_ms),
1448		       R2HB_MAX_WRITE_TIMEOUT_MS);
1449		r2net_ensure_shutdown(nn, sc, -ENOTCONN);
1450		return -1;
1451	}
1452
1453	sc->sc_handshake_ok = 1;
1454
1455	spin_lock(&nn->nn_lock);
1456	/* set valid and queue the idle timers only if it hasn't been
1457	 * shut down already */
1458	if (nn->nn_sc == sc) {
1459		r2net_sc_reset_idle_timer(sc);
1460		atomic_set(&nn->nn_timeout, 0);
1461		r2net_set_nn_state(nn, sc, 1, 0);
1462	}
1463	spin_unlock(&nn->nn_lock);
1464
1465	/* shift everything up as though it wasn't there */
1466	sc->sc_page_off -= sizeof(struct r2net_handshake);
1467	if (sc->sc_page_off)
1468		memmove(hand, hand + 1, sc->sc_page_off);
1469
1470	return 0;
1471}
1472
1473/* this demuxes the queued rx bytes into header or payload bits and calls
1474 * handlers as each full message is read off the socket.  it returns -error,
1475 * == 0 eof, or > 0 for progress made.*/
1476static int r2net_advance_rx(struct r2net_sock_container *sc)
1477{
1478	struct r2net_msg *hdr;
1479	int ret = 0;
1480	void *data;
1481	size_t datalen;
1482
1483	sclog(sc, "receiving\n");
1484	r2net_set_advance_start_time(sc);
1485
1486	if (unlikely(sc->sc_handshake_ok == 0)) {
1487		if (sc->sc_page_off < sizeof(struct r2net_handshake)) {
1488			data = page_address(sc->sc_page) + sc->sc_page_off;
1489			datalen = sizeof(struct r2net_handshake) -
1490							sc->sc_page_off;
1491			ret = r2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1492			if (ret > 0)
1493				sc->sc_page_off += ret;
1494		}
1495
1496		if (sc->sc_page_off == sizeof(struct r2net_handshake)) {
1497			r2net_check_handshake(sc);
1498			if (unlikely(sc->sc_handshake_ok == 0))
1499				ret = -EPROTO;
1500		}
1501		goto out;
1502	}
1503
1504	/* do we need more header? */
1505	if (sc->sc_page_off < sizeof(struct r2net_msg)) {
1506		data = page_address(sc->sc_page) + sc->sc_page_off;
1507		datalen = sizeof(struct r2net_msg) - sc->sc_page_off;
1508		ret = r2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1509		if (ret > 0) {
1510			sc->sc_page_off += ret;
1511			/* only swab incoming here.. we can
1512			 * only get here once as we cross from
1513			 * being under to over */
1514			if (sc->sc_page_off == sizeof(struct r2net_msg)) {
1515				hdr = page_address(sc->sc_page);
1516				if (be16_to_cpu(hdr->data_len) >
1517				    R2NET_MAX_PAYLOAD_BYTES)
1518					ret = -EOVERFLOW;
1519			}
1520		}
1521		if (ret <= 0)
1522			goto out;
1523	}
1524
1525	if (sc->sc_page_off < sizeof(struct r2net_msg)) {
1526		/* oof, still don't have a header */
1527		goto out;
1528	}
1529
1530	/* this was swabbed above when we first read it */
1531	hdr = page_address(sc->sc_page);
1532
1533	msglog(hdr, "at page_off %zu\n", sc->sc_page_off);
1534
1535	/* do we need more payload? */
1536	if (sc->sc_page_off - sizeof(struct r2net_msg) <
1537					be16_to_cpu(hdr->data_len)) {
1538		/* need more payload */
1539		data = page_address(sc->sc_page) + sc->sc_page_off;
1540		datalen = (sizeof(struct r2net_msg) +
1541				be16_to_cpu(hdr->data_len)) -
1542				sc->sc_page_off;
1543		ret = r2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1544		if (ret > 0)
1545			sc->sc_page_off += ret;
1546		if (ret <= 0)
1547			goto out;
1548	}
1549
1550	if (sc->sc_page_off - sizeof(struct r2net_msg) ==
1551						be16_to_cpu(hdr->data_len)) {
1552		/* we can only get here once, the first time we read
1553		 * the payload.. so set ret to progress if the handler
1554		 * works out. after calling this the message is toast */
1555		ret = r2net_process_message(sc, hdr);
1556		if (ret == 0)
1557			ret = 1;
1558		sc->sc_page_off = 0;
1559	}
1560
1561out:
1562	sclog(sc, "ret = %d\n", ret);
1563	r2net_set_advance_stop_time(sc);
1564	return ret;
1565}
1566
1567/* this work func is triggerd by data ready.  it reads until it can read no
1568 * more.  it interprets 0, eof, as fatal.  if data_ready hits while we're doing
1569 * our work the work struct will be marked and we'll be called again. */
1570static void r2net_rx_until_empty(struct work_struct *work)
1571{
1572	struct r2net_sock_container *sc =
1573		container_of(work, struct r2net_sock_container, sc_rx_work);
1574	int ret;
1575
1576	do {
1577		ret = r2net_advance_rx(sc);
1578	} while (ret > 0);
1579
1580	if (ret <= 0 && ret != -EAGAIN) {
1581		struct r2net_node *nn = r2net_nn_from_num(sc->sc_node->nd_num);
1582		sclog(sc, "saw error %d, closing\n", ret);
1583		/* not permanent so read failed handshake can retry */
1584		r2net_ensure_shutdown(nn, sc, 0);
1585	}
1586
1587	sc_put(sc);
1588}
1589
1590static int r2net_set_nodelay(struct socket *sock)
1591{
1592	int ret, val = 1;
1593	mm_segment_t oldfs;
1594
1595	oldfs = get_fs();
1596	set_fs(KERNEL_DS);
1597
1598	/*
1599	 * Dear unsuspecting programmer,
1600	 *
1601	 * Don't use sock_setsockopt() for SOL_TCP.  It doesn't check its level
1602	 * argument and assumes SOL_SOCKET so, say, your TCP_NODELAY will
1603	 * silently turn into SO_DEBUG.
1604	 *
1605	 * Yours,
1606	 * Keeper of hilariously fragile interfaces.
1607	 */
1608	ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY,
1609				    (char __user *)&val, sizeof(val));
1610
1611	set_fs(oldfs);
1612	return ret;
1613}
1614
1615static void r2net_initialize_handshake(void)
1616{
1617	r2net_hand->r2hb_heartbeat_timeout_ms = cpu_to_be32(
1618		R2HB_MAX_WRITE_TIMEOUT_MS);
1619	r2net_hand->r2net_idle_timeout_ms = cpu_to_be32(r2net_idle_timeout());
1620	r2net_hand->r2net_keepalive_delay_ms = cpu_to_be32(
1621		r2net_keepalive_delay());
1622	r2net_hand->r2net_reconnect_delay_ms = cpu_to_be32(
1623		r2net_reconnect_delay());
1624}
1625
1626/* ------------------------------------------------------------ */
1627
1628/* called when a connect completes and after a sock is accepted.  the
1629 * rx path will see the response and mark the sc valid */
1630static void r2net_sc_connect_completed(struct work_struct *work)
1631{
1632	struct r2net_sock_container *sc =
1633			container_of(work, struct r2net_sock_container,
1634			     sc_connect_work);
1635
1636	mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
1637		(unsigned long long)R2NET_PROTOCOL_VERSION,
1638		(unsigned long long)be64_to_cpu(r2net_hand->connector_id));
1639
1640	r2net_initialize_handshake();
1641	r2net_sendpage(sc, r2net_hand, sizeof(*r2net_hand));
1642	sc_put(sc);
1643}
1644
1645/* this is called as a work_struct func. */
1646static void r2net_sc_send_keep_req(struct work_struct *work)
1647{
1648	struct r2net_sock_container *sc =
1649		container_of(work, struct r2net_sock_container,
1650			     sc_keepalive_work.work);
1651
1652	r2net_sendpage(sc, r2net_keep_req, sizeof(*r2net_keep_req));
1653	sc_put(sc);
1654}
1655
1656/* socket shutdown does a del_timer_sync against this as it tears down.
1657 * we can't start this timer until we've got to the point in sc buildup
1658 * where shutdown is going to be involved */
1659static void r2net_idle_timer(unsigned long data)
1660{
1661	struct r2net_sock_container *sc = (struct r2net_sock_container *)data;
1662#ifdef CONFIG_DEBUG_FS
1663	unsigned long msecs = ktime_to_ms(ktime_get()) -
1664		ktime_to_ms(sc->sc_tv_timer);
1665#else
1666	unsigned long msecs = r2net_idle_timeout();
1667#endif
1668
1669	printk(KERN_NOTICE "ramster: Connection to " SC_NODEF_FMT " has been "
1670	       "idle for %lu.%lu secs, shutting it down.\n",
1671		sc->sc_node->nd_name, sc->sc_node->nd_num,
1672		&sc->sc_node->nd_ipv4_address, ntohs(sc->sc_node->nd_ipv4_port),
1673	       msecs / 1000, msecs % 1000);
1674
1675	/*
1676	 * Initialize the nn_timeout so that the next connection attempt
1677	 * will continue in r2net_start_connect.
1678	 */
1679	/* Avoid spurious shutdowns... not sure if this is still necessary */
1680	pr_err("ramster_idle_timer, skipping shutdown work\n");
1681#if 0
1682	/* old code used to do these two lines */
1683	atomic_set(&nn->nn_timeout, 1);
1684	r2net_sc_queue_work(sc, &sc->sc_shutdown_work);
1685#endif
1686}
1687
1688static void r2net_sc_reset_idle_timer(struct r2net_sock_container *sc)
1689{
1690	r2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
1691	r2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
1692		      msecs_to_jiffies(r2net_keepalive_delay()));
1693	r2net_set_sock_timer(sc);
1694	mod_timer(&sc->sc_idle_timeout,
1695	       jiffies + msecs_to_jiffies(r2net_idle_timeout()));
1696}
1697
1698static void r2net_sc_postpone_idle(struct r2net_sock_container *sc)
1699{
1700	/* Only push out an existing timer */
1701	if (timer_pending(&sc->sc_idle_timeout))
1702		r2net_sc_reset_idle_timer(sc);
1703}
1704
1705/* this work func is kicked whenever a path sets the nn state which doesn't
1706 * have valid set.  This includes seeing hb come up, losing a connection,
1707 * having a connect attempt fail, etc. This centralizes the logic which decides
1708 * if a connect attempt should be made or if we should give up and all future
1709 * transmit attempts should fail */
1710static void r2net_start_connect(struct work_struct *work)
1711{
1712	struct r2net_node *nn =
1713		container_of(work, struct r2net_node, nn_connect_work.work);
1714	struct r2net_sock_container *sc = NULL;
1715	struct r2nm_node *node = NULL, *mynode = NULL;
1716	struct socket *sock = NULL;
1717	struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
1718	int ret = 0, stop;
1719	unsigned int timeout;
1720
1721	/* if we're greater we initiate tx, otherwise we accept */
1722	if (r2nm_this_node() <= r2net_num_from_nn(nn))
1723		goto out;
1724
1725	/* watch for racing with tearing a node down */
1726	node = r2nm_get_node_by_num(r2net_num_from_nn(nn));
1727	if (node == NULL) {
1728		ret = 0;
1729		goto out;
1730	}
1731
1732	mynode = r2nm_get_node_by_num(r2nm_this_node());
1733	if (mynode == NULL) {
1734		ret = 0;
1735		goto out;
1736	}
1737
1738	spin_lock(&nn->nn_lock);
1739	/*
1740	 * see if we already have one pending or have given up.
1741	 * For nn_timeout, it is set when we close the connection
1742	 * because of the idle time out. So it means that we have
1743	 * at least connected to that node successfully once,
1744	 * now try to connect to it again.
1745	 */
1746	timeout = atomic_read(&nn->nn_timeout);
1747	stop = (nn->nn_sc ||
1748		(nn->nn_persistent_error &&
1749		(nn->nn_persistent_error != -ENOTCONN || timeout == 0)));
1750	spin_unlock(&nn->nn_lock);
1751	if (stop)
1752		goto out;
1753
1754	nn->nn_last_connect_attempt = jiffies;
1755
1756	sc = sc_alloc(node);
1757	if (sc == NULL) {
1758		mlog(0, "couldn't allocate sc\n");
1759		ret = -ENOMEM;
1760		goto out;
1761	}
1762
1763	ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1764	if (ret < 0) {
1765		mlog(0, "can't create socket: %d\n", ret);
1766		goto out;
1767	}
1768	sc->sc_sock = sock; /* freed by sc_kref_release */
1769
1770	sock->sk->sk_allocation = GFP_ATOMIC;
1771
1772	myaddr.sin_family = AF_INET;
1773	myaddr.sin_addr.s_addr = mynode->nd_ipv4_address;
1774	myaddr.sin_port = htons(0); /* any port */
1775
1776	ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr,
1777			      sizeof(myaddr));
1778	if (ret) {
1779		mlog(ML_ERROR, "bind failed with %d at address %pI4\n",
1780		     ret, &mynode->nd_ipv4_address);
1781		goto out;
1782	}
1783
1784	ret = r2net_set_nodelay(sc->sc_sock);
1785	if (ret) {
1786		mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret);
1787		goto out;
1788	}
1789
1790	r2net_register_callbacks(sc->sc_sock->sk, sc);
1791
1792	spin_lock(&nn->nn_lock);
1793	/* handshake completion will set nn->nn_sc_valid */
1794	r2net_set_nn_state(nn, sc, 0, 0);
1795	spin_unlock(&nn->nn_lock);
1796
1797	remoteaddr.sin_family = AF_INET;
1798	remoteaddr.sin_addr.s_addr = node->nd_ipv4_address;
1799	remoteaddr.sin_port = node->nd_ipv4_port;
1800
1801	ret = sc->sc_sock->ops->connect(sc->sc_sock,
1802					(struct sockaddr *)&remoteaddr,
1803					sizeof(remoteaddr),
1804					O_NONBLOCK);
1805	if (ret == -EINPROGRESS)
1806		ret = 0;
1807
1808out:
1809	if (ret) {
1810		printk(KERN_NOTICE "ramster: Connect attempt to " SC_NODEF_FMT
1811		       " failed with errno %d\n", sc->sc_node->nd_name,
1812			sc->sc_node->nd_num, &sc->sc_node->nd_ipv4_address,
1813			ntohs(sc->sc_node->nd_ipv4_port), ret);
1814		/* 0 err so that another will be queued and attempted
1815		 * from set_nn_state */
1816		if (sc)
1817			r2net_ensure_shutdown(nn, sc, 0);
1818	}
1819	if (sc)
1820		sc_put(sc);
1821	if (node)
1822		r2nm_node_put(node);
1823	if (mynode)
1824		r2nm_node_put(mynode);
1825
1826	return;
1827}
1828
1829static void r2net_connect_expired(struct work_struct *work)
1830{
1831	struct r2net_node *nn =
1832		container_of(work, struct r2net_node, nn_connect_expired.work);
1833
1834	spin_lock(&nn->nn_lock);
1835	if (!nn->nn_sc_valid) {
1836		printk(KERN_NOTICE "ramster: No connection established with "
1837		       "node %u after %u.%u seconds, giving up.\n",
1838		     r2net_num_from_nn(nn),
1839		     r2net_idle_timeout() / 1000,
1840		     r2net_idle_timeout() % 1000);
1841
1842		r2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
1843	}
1844	spin_unlock(&nn->nn_lock);
1845}
1846
1847static void r2net_still_up(struct work_struct *work)
1848{
1849}
1850
1851/* ------------------------------------------------------------ */
1852
1853void r2net_disconnect_node(struct r2nm_node *node)
1854{
1855	struct r2net_node *nn = r2net_nn_from_num(node->nd_num);
1856
1857	/* don't reconnect until it's heartbeating again */
1858	spin_lock(&nn->nn_lock);
1859	atomic_set(&nn->nn_timeout, 0);
1860	r2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
1861	spin_unlock(&nn->nn_lock);
1862
1863	if (r2net_wq) {
1864		cancel_delayed_work(&nn->nn_connect_expired);
1865		cancel_delayed_work(&nn->nn_connect_work);
1866		cancel_delayed_work(&nn->nn_still_up);
1867		flush_workqueue(r2net_wq);
1868	}
1869}
1870
1871static void r2net_hb_node_down_cb(struct r2nm_node *node, int node_num,
1872				  void *data)
1873{
1874	if (!node)
1875		return;
1876
1877	if (node_num != r2nm_this_node())
1878		r2net_disconnect_node(node);
1879
1880	BUG_ON(atomic_read(&r2net_connected_peers) < 0);
1881}
1882
1883static void r2net_hb_node_up_cb(struct r2nm_node *node, int node_num,
1884				void *data)
1885{
1886	struct r2net_node *nn = r2net_nn_from_num(node_num);
1887
1888	BUG_ON(!node);
1889
1890	/* ensure an immediate connect attempt */
1891	nn->nn_last_connect_attempt = jiffies -
1892		(msecs_to_jiffies(r2net_reconnect_delay()) + 1);
1893
1894	if (node_num != r2nm_this_node()) {
1895		/* believe it or not, accept and node hearbeating testing
1896		 * can succeed for this node before we got here.. so
1897		 * only use set_nn_state to clear the persistent error
1898		 * if that hasn't already happened */
1899		spin_lock(&nn->nn_lock);
1900		atomic_set(&nn->nn_timeout, 0);
1901		if (nn->nn_persistent_error)
1902			r2net_set_nn_state(nn, NULL, 0, 0);
1903		spin_unlock(&nn->nn_lock);
1904	}
1905}
1906
1907void r2net_unregister_hb_callbacks(void)
1908{
1909	r2hb_unregister_callback(NULL, &r2net_hb_up);
1910	r2hb_unregister_callback(NULL, &r2net_hb_down);
1911}
1912
1913int r2net_register_hb_callbacks(void)
1914{
1915	int ret;
1916
1917	r2hb_setup_callback(&r2net_hb_down, R2HB_NODE_DOWN_CB,
1918			    r2net_hb_node_down_cb, NULL, R2NET_HB_PRI);
1919	r2hb_setup_callback(&r2net_hb_up, R2HB_NODE_UP_CB,
1920			    r2net_hb_node_up_cb, NULL, R2NET_HB_PRI);
1921
1922	ret = r2hb_register_callback(NULL, &r2net_hb_up);
1923	if (ret == 0)
1924		ret = r2hb_register_callback(NULL, &r2net_hb_down);
1925
1926	if (ret)
1927		r2net_unregister_hb_callbacks();
1928
1929	return ret;
1930}
1931
1932/* ------------------------------------------------------------ */
1933
1934static int r2net_accept_one(struct socket *sock)
1935{
1936	int ret, slen;
1937	struct sockaddr_in sin;
1938	struct socket *new_sock = NULL;
1939	struct r2nm_node *node = NULL;
1940	struct r2nm_node *local_node = NULL;
1941	struct r2net_sock_container *sc = NULL;
1942	struct r2net_node *nn;
1943
1944	BUG_ON(sock == NULL);
1945	ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
1946			       sock->sk->sk_protocol, &new_sock);
1947	if (ret)
1948		goto out;
1949
1950	new_sock->type = sock->type;
1951	new_sock->ops = sock->ops;
1952	ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
1953	if (ret < 0)
1954		goto out;
1955
1956	new_sock->sk->sk_allocation = GFP_ATOMIC;
1957
1958	ret = r2net_set_nodelay(new_sock);
1959	if (ret) {
1960		mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret);
1961		goto out;
1962	}
1963
1964	slen = sizeof(sin);
1965	ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
1966				       &slen, 1);
1967	if (ret < 0)
1968		goto out;
1969
1970	node = r2nm_get_node_by_ip(sin.sin_addr.s_addr);
1971	if (node == NULL) {
1972		printk(KERN_NOTICE "ramster: Attempt to connect from unknown "
1973		       "node at %pI4:%d\n", &sin.sin_addr.s_addr,
1974		       ntohs(sin.sin_port));
1975		ret = -EINVAL;
1976		goto out;
1977	}
1978
1979	if (r2nm_this_node() >= node->nd_num) {
1980		local_node = r2nm_get_node_by_num(r2nm_this_node());
1981		printk(KERN_NOTICE "ramster: Unexpected connect attempt seen "
1982		       "at node '%s' (%u, %pI4:%d) from node '%s' (%u, "
1983		       "%pI4:%d)\n", local_node->nd_name, local_node->nd_num,
1984		       &(local_node->nd_ipv4_address),
1985		       ntohs(local_node->nd_ipv4_port), node->nd_name,
1986		       node->nd_num, &sin.sin_addr.s_addr, ntohs(sin.sin_port));
1987		ret = -EINVAL;
1988		goto out;
1989	}
1990
1991	/* this happens all the time when the other node sees our heartbeat
1992	 * and tries to connect before we see their heartbeat */
1993	if (!r2hb_check_node_heartbeating_from_callback(node->nd_num)) {
1994		mlog(ML_CONN, "attempt to connect from node '%s' at "
1995		     "%pI4:%d but it isn't heartbeating\n",
1996		     node->nd_name, &sin.sin_addr.s_addr,
1997		     ntohs(sin.sin_port));
1998		ret = -EINVAL;
1999		goto out;
2000	}
2001
2002	nn = r2net_nn_from_num(node->nd_num);
2003
2004	spin_lock(&nn->nn_lock);
2005	if (nn->nn_sc)
2006		ret = -EBUSY;
2007	else
2008		ret = 0;
2009	spin_unlock(&nn->nn_lock);
2010	if (ret) {
2011		printk(KERN_NOTICE "ramster: Attempt to connect from node '%s' "
2012		       "at %pI4:%d but it already has an open connection\n",
2013		       node->nd_name, &sin.sin_addr.s_addr,
2014		       ntohs(sin.sin_port));
2015		goto out;
2016	}
2017
2018	sc = sc_alloc(node);
2019	if (sc == NULL) {
2020		ret = -ENOMEM;
2021		goto out;
2022	}
2023
2024	sc->sc_sock = new_sock;
2025	new_sock = NULL;
2026
2027	spin_lock(&nn->nn_lock);
2028	atomic_set(&nn->nn_timeout, 0);
2029	r2net_set_nn_state(nn, sc, 0, 0);
2030	spin_unlock(&nn->nn_lock);
2031
2032	r2net_register_callbacks(sc->sc_sock->sk, sc);
2033	r2net_sc_queue_work(sc, &sc->sc_rx_work);
2034
2035	r2net_initialize_handshake();
2036	r2net_sendpage(sc, r2net_hand, sizeof(*r2net_hand));
2037
2038out:
2039	if (new_sock)
2040		sock_release(new_sock);
2041	if (node)
2042		r2nm_node_put(node);
2043	if (local_node)
2044		r2nm_node_put(local_node);
2045	if (sc)
2046		sc_put(sc);
2047	return ret;
2048}
2049
2050static void r2net_accept_many(struct work_struct *work)
2051{
2052	struct socket *sock = r2net_listen_sock;
2053	while (r2net_accept_one(sock) == 0)
2054		cond_resched();
2055}
2056
2057static void r2net_listen_data_ready(struct sock *sk, int bytes)
2058{
2059	void (*ready)(struct sock *sk, int bytes);
2060
2061	read_lock(&sk->sk_callback_lock);
2062	ready = sk->sk_user_data;
2063	if (ready == NULL) { /* check for teardown race */
2064		ready = sk->sk_data_ready;
2065		goto out;
2066	}
2067
2068	/* ->sk_data_ready is also called for a newly established child socket
2069	 * before it has been accepted and the acceptor has set up their
2070	 * data_ready.. we only want to queue listen work for our listening
2071	 * socket */
2072	if (sk->sk_state == TCP_LISTEN) {
2073		mlog(ML_TCP, "bytes: %d\n", bytes);
2074		queue_work(r2net_wq, &r2net_listen_work);
2075	}
2076
2077out:
2078	read_unlock(&sk->sk_callback_lock);
2079	ready(sk, bytes);
2080}
2081
2082static int r2net_open_listening_sock(__be32 addr, __be16 port)
2083{
2084	struct socket *sock = NULL;
2085	int ret;
2086	struct sockaddr_in sin = {
2087		.sin_family = PF_INET,
2088		.sin_addr = { .s_addr = addr },
2089		.sin_port = port,
2090	};
2091
2092	ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
2093	if (ret < 0) {
2094		printk(KERN_ERR "ramster: Error %d while creating socket\n",
2095			ret);
2096		goto out;
2097	}
2098
2099	sock->sk->sk_allocation = GFP_ATOMIC;
2100
2101	write_lock_bh(&sock->sk->sk_callback_lock);
2102	sock->sk->sk_user_data = sock->sk->sk_data_ready;
2103	sock->sk->sk_data_ready = r2net_listen_data_ready;
2104	write_unlock_bh(&sock->sk->sk_callback_lock);
2105
2106	r2net_listen_sock = sock;
2107	INIT_WORK(&r2net_listen_work, r2net_accept_many);
2108
2109	sock->sk->sk_reuse = SK_CAN_REUSE;
2110	ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
2111	if (ret < 0) {
2112		printk(KERN_ERR "ramster: Error %d while binding socket at "
2113			"%pI4:%u\n", ret, &addr, ntohs(port));
2114		goto out;
2115	}
2116
2117	ret = sock->ops->listen(sock, 64);
2118	if (ret < 0)
2119		printk(KERN_ERR "ramster: Error %d while listening on %pI4:%u\n",
2120		       ret, &addr, ntohs(port));
2121
2122out:
2123	if (ret) {
2124		r2net_listen_sock = NULL;
2125		if (sock)
2126			sock_release(sock);
2127	}
2128	return ret;
2129}
2130
2131/*
2132 * called from node manager when we should bring up our network listening
2133 * socket.  node manager handles all the serialization to only call this
2134 * once and to match it with r2net_stop_listening().  note,
2135 * r2nm_this_node() doesn't work yet as we're being called while it
2136 * is being set up.
2137 */
2138int r2net_start_listening(struct r2nm_node *node)
2139{
2140	int ret = 0;
2141
2142	BUG_ON(r2net_wq != NULL);
2143	BUG_ON(r2net_listen_sock != NULL);
2144
2145	mlog(ML_KTHREAD, "starting r2net thread...\n");
2146	r2net_wq = create_singlethread_workqueue("r2net");
2147	if (r2net_wq == NULL) {
2148		mlog(ML_ERROR, "unable to launch r2net thread\n");
2149		return -ENOMEM; /* ? */
2150	}
2151
2152	ret = r2net_open_listening_sock(node->nd_ipv4_address,
2153					node->nd_ipv4_port);
2154	if (ret) {
2155		destroy_workqueue(r2net_wq);
2156		r2net_wq = NULL;
2157	}
2158
2159	return ret;
2160}
2161
2162/* again, r2nm_this_node() doesn't work here as we're involved in
2163 * tearing it down */
2164void r2net_stop_listening(struct r2nm_node *node)
2165{
2166	struct socket *sock = r2net_listen_sock;
2167	size_t i;
2168
2169	BUG_ON(r2net_wq == NULL);
2170	BUG_ON(r2net_listen_sock == NULL);
2171
2172	/* stop the listening socket from generating work */
2173	write_lock_bh(&sock->sk->sk_callback_lock);
2174	sock->sk->sk_data_ready = sock->sk->sk_user_data;
2175	sock->sk->sk_user_data = NULL;
2176	write_unlock_bh(&sock->sk->sk_callback_lock);
2177
2178	for (i = 0; i < ARRAY_SIZE(r2net_nodes); i++) {
2179		struct r2nm_node *node = r2nm_get_node_by_num(i);
2180		if (node) {
2181			r2net_disconnect_node(node);
2182			r2nm_node_put(node);
2183		}
2184	}
2185
2186	/* finish all work and tear down the work queue */
2187	mlog(ML_KTHREAD, "waiting for r2net thread to exit....\n");
2188	destroy_workqueue(r2net_wq);
2189	r2net_wq = NULL;
2190
2191	sock_release(r2net_listen_sock);
2192	r2net_listen_sock = NULL;
2193}
2194
2195void r2net_hb_node_up_manual(int node_num)
2196{
2197	struct r2nm_node dummy;
2198	if (r2nm_single_cluster == NULL)
2199		pr_err("ramster: cluster not alive, node_up_manual ignored\n");
2200	else {
2201		r2hb_manual_set_node_heartbeating(node_num);
2202		r2net_hb_node_up_cb(&dummy, node_num, NULL);
2203	}
2204}
2205
2206/* ------------------------------------------------------------ */
2207
2208int r2net_init(void)
2209{
2210	unsigned long i;
2211
2212	if (r2net_debugfs_init())
2213		return -ENOMEM;
2214
2215	r2net_hand = kzalloc(sizeof(struct r2net_handshake), GFP_KERNEL);
2216	r2net_keep_req = kzalloc(sizeof(struct r2net_msg), GFP_KERNEL);
2217	r2net_keep_resp = kzalloc(sizeof(struct r2net_msg), GFP_KERNEL);
2218	if (!r2net_hand || !r2net_keep_req || !r2net_keep_resp) {
2219		kfree(r2net_hand);
2220		kfree(r2net_keep_req);
2221		kfree(r2net_keep_resp);
2222		return -ENOMEM;
2223	}
2224
2225	r2net_hand->protocol_version = cpu_to_be64(R2NET_PROTOCOL_VERSION);
2226	r2net_hand->connector_id = cpu_to_be64(1);
2227
2228	r2net_keep_req->magic = cpu_to_be16(R2NET_MSG_KEEP_REQ_MAGIC);
2229	r2net_keep_resp->magic = cpu_to_be16(R2NET_MSG_KEEP_RESP_MAGIC);
2230
2231	for (i = 0; i < ARRAY_SIZE(r2net_nodes); i++) {
2232		struct r2net_node *nn = r2net_nn_from_num(i);
2233
2234		atomic_set(&nn->nn_timeout, 0);
2235		spin_lock_init(&nn->nn_lock);
2236		INIT_DELAYED_WORK(&nn->nn_connect_work, r2net_start_connect);
2237		INIT_DELAYED_WORK(&nn->nn_connect_expired,
2238				  r2net_connect_expired);
2239		INIT_DELAYED_WORK(&nn->nn_still_up, r2net_still_up);
2240		/* until we see hb from a node we'll return einval */
2241		nn->nn_persistent_error = -ENOTCONN;
2242		init_waitqueue_head(&nn->nn_sc_wq);
2243		idr_init(&nn->nn_status_idr);
2244		INIT_LIST_HEAD(&nn->nn_status_list);
2245	}
2246
2247	return 0;
2248}
2249
2250void r2net_exit(void)
2251{
2252	kfree(r2net_hand);
2253	kfree(r2net_keep_req);
2254	kfree(r2net_keep_resp);
2255	r2net_debugfs_exit();
2256}