Linux Audio

Check our new training course

Loading...
   1/*
   2   drbd_receiver.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23 */
  24
  25
  26#include <linux/module.h>
  27
  28#include <asm/uaccess.h>
  29#include <net/sock.h>
  30
  31#include <linux/drbd.h>
  32#include <linux/fs.h>
  33#include <linux/file.h>
  34#include <linux/in.h>
  35#include <linux/mm.h>
  36#include <linux/memcontrol.h>
  37#include <linux/mm_inline.h>
  38#include <linux/slab.h>
  39#include <linux/pkt_sched.h>
  40#define __KERNEL_SYSCALLS__
  41#include <linux/unistd.h>
  42#include <linux/vmalloc.h>
  43#include <linux/random.h>
  44#include <linux/string.h>
  45#include <linux/scatterlist.h>
  46#include "drbd_int.h"
  47#include "drbd_req.h"
  48
  49#include "drbd_vli.h"
  50
  51enum finish_epoch {
  52	FE_STILL_LIVE,
  53	FE_DESTROYED,
  54	FE_RECYCLED,
  55};
  56
  57static int drbd_do_handshake(struct drbd_conf *mdev);
  58static int drbd_do_auth(struct drbd_conf *mdev);
  59
  60static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
  61static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
  62
  63
  64#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
  65
  66/*
  67 * some helper functions to deal with single linked page lists,
  68 * page->private being our "next" pointer.
  69 */
  70
  71/* If at least n pages are linked at head, get n pages off.
  72 * Otherwise, don't modify head, and return NULL.
  73 * Locking is the responsibility of the caller.
  74 */
  75static struct page *page_chain_del(struct page **head, int n)
  76{
  77	struct page *page;
  78	struct page *tmp;
  79
  80	BUG_ON(!n);
  81	BUG_ON(!head);
  82
  83	page = *head;
  84
  85	if (!page)
  86		return NULL;
  87
  88	while (page) {
  89		tmp = page_chain_next(page);
  90		if (--n == 0)
  91			break; /* found sufficient pages */
  92		if (tmp == NULL)
  93			/* insufficient pages, don't use any of them. */
  94			return NULL;
  95		page = tmp;
  96	}
  97
  98	/* add end of list marker for the returned list */
  99	set_page_private(page, 0);
 100	/* actual return value, and adjustment of head */
 101	page = *head;
 102	*head = tmp;
 103	return page;
 104}
 105
 106/* may be used outside of locks to find the tail of a (usually short)
 107 * "private" page chain, before adding it back to a global chain head
 108 * with page_chain_add() under a spinlock. */
 109static struct page *page_chain_tail(struct page *page, int *len)
 110{
 111	struct page *tmp;
 112	int i = 1;
 113	while ((tmp = page_chain_next(page)))
 114		++i, page = tmp;
 115	if (len)
 116		*len = i;
 117	return page;
 118}
 119
 120static int page_chain_free(struct page *page)
 121{
 122	struct page *tmp;
 123	int i = 0;
 124	page_chain_for_each_safe(page, tmp) {
 125		put_page(page);
 126		++i;
 127	}
 128	return i;
 129}
 130
 131static void page_chain_add(struct page **head,
 132		struct page *chain_first, struct page *chain_last)
 133{
 134#if 1
 135	struct page *tmp;
 136	tmp = page_chain_tail(chain_first, NULL);
 137	BUG_ON(tmp != chain_last);
 138#endif
 139
 140	/* add chain to head */
 141	set_page_private(chain_last, (unsigned long)*head);
 142	*head = chain_first;
 143}
 144
 145static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
 146{
 147	struct page *page = NULL;
 148	struct page *tmp = NULL;
 149	int i = 0;
 150
 151	/* Yes, testing drbd_pp_vacant outside the lock is racy.
 152	 * So what. It saves a spin_lock. */
 153	if (drbd_pp_vacant >= number) {
 154		spin_lock(&drbd_pp_lock);
 155		page = page_chain_del(&drbd_pp_pool, number);
 156		if (page)
 157			drbd_pp_vacant -= number;
 158		spin_unlock(&drbd_pp_lock);
 159		if (page)
 160			return page;
 161	}
 162
 163	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 164	 * "criss-cross" setup, that might cause write-out on some other DRBD,
 165	 * which in turn might block on the other node at this very place.  */
 166	for (i = 0; i < number; i++) {
 167		tmp = alloc_page(GFP_TRY);
 168		if (!tmp)
 169			break;
 170		set_page_private(tmp, (unsigned long)page);
 171		page = tmp;
 172	}
 173
 174	if (i == number)
 175		return page;
 176
 177	/* Not enough pages immediately available this time.
 178	 * No need to jump around here, drbd_pp_alloc will retry this
 179	 * function "soon". */
 180	if (page) {
 181		tmp = page_chain_tail(page, NULL);
 182		spin_lock(&drbd_pp_lock);
 183		page_chain_add(&drbd_pp_pool, page, tmp);
 184		drbd_pp_vacant += i;
 185		spin_unlock(&drbd_pp_lock);
 186	}
 187	return NULL;
 188}
 189
 190static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
 191{
 192	struct drbd_epoch_entry *e;
 193	struct list_head *le, *tle;
 194
 195	/* The EEs are always appended to the end of the list. Since
 196	   they are sent in order over the wire, they have to finish
 197	   in order. As soon as we see the first not finished we can
 198	   stop to examine the list... */
 199
 200	list_for_each_safe(le, tle, &mdev->net_ee) {
 201		e = list_entry(le, struct drbd_epoch_entry, w.list);
 202		if (drbd_ee_has_active_page(e))
 203			break;
 204		list_move(le, to_be_freed);
 205	}
 206}
 207
 208static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
 209{
 210	LIST_HEAD(reclaimed);
 211	struct drbd_epoch_entry *e, *t;
 212
 213	spin_lock_irq(&mdev->req_lock);
 214	reclaim_net_ee(mdev, &reclaimed);
 215	spin_unlock_irq(&mdev->req_lock);
 216
 217	list_for_each_entry_safe(e, t, &reclaimed, w.list)
 218		drbd_free_net_ee(mdev, e);
 219}
 220
 221/**
 222 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
 223 * @mdev:	DRBD device.
 224 * @number:	number of pages requested
 225 * @retry:	whether to retry, if not enough pages are available right now
 226 *
 227 * Tries to allocate number pages, first from our own page pool, then from
 228 * the kernel, unless this allocation would exceed the max_buffers setting.
 229 * Possibly retry until DRBD frees sufficient pages somewhere else.
 230 *
 231 * Returns a page chain linked via page->private.
 232 */
 233static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
 234{
 235	struct page *page = NULL;
 236	DEFINE_WAIT(wait);
 237
 238	/* Yes, we may run up to @number over max_buffers. If we
 239	 * follow it strictly, the admin will get it wrong anyways. */
 240	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
 241		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
 242
 243	while (page == NULL) {
 244		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 245
 246		drbd_kick_lo_and_reclaim_net(mdev);
 247
 248		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
 249			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
 250			if (page)
 251				break;
 252		}
 253
 254		if (!retry)
 255			break;
 256
 257		if (signal_pending(current)) {
 258			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
 259			break;
 260		}
 261
 262		schedule();
 263	}
 264	finish_wait(&drbd_pp_wait, &wait);
 265
 266	if (page)
 267		atomic_add(number, &mdev->pp_in_use);
 268	return page;
 269}
 270
 271/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
 272 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
 273 * Either links the page chain back to the global pool,
 274 * or returns all pages to the system. */
 275static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
 276{
 277	atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
 278	int i;
 279
 280	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
 281		i = page_chain_free(page);
 282	else {
 283		struct page *tmp;
 284		tmp = page_chain_tail(page, &i);
 285		spin_lock(&drbd_pp_lock);
 286		page_chain_add(&drbd_pp_pool, page, tmp);
 287		drbd_pp_vacant += i;
 288		spin_unlock(&drbd_pp_lock);
 289	}
 290	i = atomic_sub_return(i, a);
 291	if (i < 0)
 292		dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
 293			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
 294	wake_up(&drbd_pp_wait);
 295}
 296
 297/*
 298You need to hold the req_lock:
 299 _drbd_wait_ee_list_empty()
 300
 301You must not have the req_lock:
 302 drbd_free_ee()
 303 drbd_alloc_ee()
 304 drbd_init_ee()
 305 drbd_release_ee()
 306 drbd_ee_fix_bhs()
 307 drbd_process_done_ee()
 308 drbd_clear_done_ee()
 309 drbd_wait_ee_list_empty()
 310*/
 311
 312struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
 313				     u64 id,
 314				     sector_t sector,
 315				     unsigned int data_size,
 316				     gfp_t gfp_mask) __must_hold(local)
 317{
 318	struct drbd_epoch_entry *e;
 319	struct page *page;
 320	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 321
 322	if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
 323		return NULL;
 324
 325	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
 326	if (!e) {
 327		if (!(gfp_mask & __GFP_NOWARN))
 328			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
 329		return NULL;
 330	}
 331
 332	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
 333	if (!page)
 334		goto fail;
 335
 336	INIT_HLIST_NODE(&e->collision);
 337	e->epoch = NULL;
 338	e->mdev = mdev;
 339	e->pages = page;
 340	atomic_set(&e->pending_bios, 0);
 341	e->size = data_size;
 342	e->flags = 0;
 343	e->sector = sector;
 344	e->block_id = id;
 345
 346	return e;
 347
 348 fail:
 349	mempool_free(e, drbd_ee_mempool);
 350	return NULL;
 351}
 352
 353void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
 354{
 355	if (e->flags & EE_HAS_DIGEST)
 356		kfree(e->digest);
 357	drbd_pp_free(mdev, e->pages, is_net);
 358	D_ASSERT(atomic_read(&e->pending_bios) == 0);
 359	D_ASSERT(hlist_unhashed(&e->collision));
 360	mempool_free(e, drbd_ee_mempool);
 361}
 362
 363int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
 364{
 365	LIST_HEAD(work_list);
 366	struct drbd_epoch_entry *e, *t;
 367	int count = 0;
 368	int is_net = list == &mdev->net_ee;
 369
 370	spin_lock_irq(&mdev->req_lock);
 371	list_splice_init(list, &work_list);
 372	spin_unlock_irq(&mdev->req_lock);
 373
 374	list_for_each_entry_safe(e, t, &work_list, w.list) {
 375		drbd_free_some_ee(mdev, e, is_net);
 376		count++;
 377	}
 378	return count;
 379}
 380
 381
 382/*
 383 * This function is called from _asender only_
 384 * but see also comments in _req_mod(,barrier_acked)
 385 * and receive_Barrier.
 386 *
 387 * Move entries from net_ee to done_ee, if ready.
 388 * Grab done_ee, call all callbacks, free the entries.
 389 * The callbacks typically send out ACKs.
 390 */
 391static int drbd_process_done_ee(struct drbd_conf *mdev)
 392{
 393	LIST_HEAD(work_list);
 394	LIST_HEAD(reclaimed);
 395	struct drbd_epoch_entry *e, *t;
 396	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
 397
 398	spin_lock_irq(&mdev->req_lock);
 399	reclaim_net_ee(mdev, &reclaimed);
 400	list_splice_init(&mdev->done_ee, &work_list);
 401	spin_unlock_irq(&mdev->req_lock);
 402
 403	list_for_each_entry_safe(e, t, &reclaimed, w.list)
 404		drbd_free_net_ee(mdev, e);
 405
 406	/* possible callbacks here:
 407	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
 408	 * all ignore the last argument.
 409	 */
 410	list_for_each_entry_safe(e, t, &work_list, w.list) {
 411		/* list_del not necessary, next/prev members not touched */
 412		ok = e->w.cb(mdev, &e->w, !ok) && ok;
 413		drbd_free_ee(mdev, e);
 414	}
 415	wake_up(&mdev->ee_wait);
 416
 417	return ok;
 418}
 419
 420void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
 421{
 422	DEFINE_WAIT(wait);
 423
 424	/* avoids spin_lock/unlock
 425	 * and calling prepare_to_wait in the fast path */
 426	while (!list_empty(head)) {
 427		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
 428		spin_unlock_irq(&mdev->req_lock);
 429		io_schedule();
 430		finish_wait(&mdev->ee_wait, &wait);
 431		spin_lock_irq(&mdev->req_lock);
 432	}
 433}
 434
 435void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
 436{
 437	spin_lock_irq(&mdev->req_lock);
 438	_drbd_wait_ee_list_empty(mdev, head);
 439	spin_unlock_irq(&mdev->req_lock);
 440}
 441
 442/* see also kernel_accept; which is only present since 2.6.18.
 443 * also we want to log which part of it failed, exactly */
 444static int drbd_accept(struct drbd_conf *mdev, const char **what,
 445		struct socket *sock, struct socket **newsock)
 446{
 447	struct sock *sk = sock->sk;
 448	int err = 0;
 449
 450	*what = "listen";
 451	err = sock->ops->listen(sock, 5);
 452	if (err < 0)
 453		goto out;
 454
 455	*what = "sock_create_lite";
 456	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
 457			       newsock);
 458	if (err < 0)
 459		goto out;
 460
 461	*what = "accept";
 462	err = sock->ops->accept(sock, *newsock, 0);
 463	if (err < 0) {
 464		sock_release(*newsock);
 465		*newsock = NULL;
 466		goto out;
 467	}
 468	(*newsock)->ops  = sock->ops;
 469	__module_get((*newsock)->ops->owner);
 470
 471out:
 472	return err;
 473}
 474
 475static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
 476		    void *buf, size_t size, int flags)
 477{
 478	mm_segment_t oldfs;
 479	struct kvec iov = {
 480		.iov_base = buf,
 481		.iov_len = size,
 482	};
 483	struct msghdr msg = {
 484		.msg_iovlen = 1,
 485		.msg_iov = (struct iovec *)&iov,
 486		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
 487	};
 488	int rv;
 489
 490	oldfs = get_fs();
 491	set_fs(KERNEL_DS);
 492	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
 493	set_fs(oldfs);
 494
 495	return rv;
 496}
 497
 498static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
 499{
 500	mm_segment_t oldfs;
 501	struct kvec iov = {
 502		.iov_base = buf,
 503		.iov_len = size,
 504	};
 505	struct msghdr msg = {
 506		.msg_iovlen = 1,
 507		.msg_iov = (struct iovec *)&iov,
 508		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
 509	};
 510	int rv;
 511
 512	oldfs = get_fs();
 513	set_fs(KERNEL_DS);
 514
 515	for (;;) {
 516		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
 517		if (rv == size)
 518			break;
 519
 520		/* Note:
 521		 * ECONNRESET	other side closed the connection
 522		 * ERESTARTSYS	(on  sock) we got a signal
 523		 */
 524
 525		if (rv < 0) {
 526			if (rv == -ECONNRESET)
 527				dev_info(DEV, "sock was reset by peer\n");
 528			else if (rv != -ERESTARTSYS)
 529				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
 530			break;
 531		} else if (rv == 0) {
 532			dev_info(DEV, "sock was shut down by peer\n");
 533			break;
 534		} else	{
 535			/* signal came in, or peer/link went down,
 536			 * after we read a partial message
 537			 */
 538			/* D_ASSERT(signal_pending(current)); */
 539			break;
 540		}
 541	};
 542
 543	set_fs(oldfs);
 544
 545	if (rv != size)
 546		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
 547
 548	return rv;
 549}
 550
 551/* quoting tcp(7):
 552 *   On individual connections, the socket buffer size must be set prior to the
 553 *   listen(2) or connect(2) calls in order to have it take effect.
 554 * This is our wrapper to do so.
 555 */
 556static void drbd_setbufsize(struct socket *sock, unsigned int snd,
 557		unsigned int rcv)
 558{
 559	/* open coded SO_SNDBUF, SO_RCVBUF */
 560	if (snd) {
 561		sock->sk->sk_sndbuf = snd;
 562		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
 563	}
 564	if (rcv) {
 565		sock->sk->sk_rcvbuf = rcv;
 566		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
 567	}
 568}
 569
 570static struct socket *drbd_try_connect(struct drbd_conf *mdev)
 571{
 572	const char *what;
 573	struct socket *sock;
 574	struct sockaddr_in6 src_in6;
 575	int err;
 576	int disconnect_on_error = 1;
 577
 578	if (!get_net_conf(mdev))
 579		return NULL;
 580
 581	what = "sock_create_kern";
 582	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
 583		SOCK_STREAM, IPPROTO_TCP, &sock);
 584	if (err < 0) {
 585		sock = NULL;
 586		goto out;
 587	}
 588
 589	sock->sk->sk_rcvtimeo =
 590	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
 591	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
 592			mdev->net_conf->rcvbuf_size);
 593
 594       /* explicitly bind to the configured IP as source IP
 595	*  for the outgoing connections.
 596	*  This is needed for multihomed hosts and to be
 597	*  able to use lo: interfaces for drbd.
 598	* Make sure to use 0 as port number, so linux selects
 599	*  a free one dynamically.
 600	*/
 601	memcpy(&src_in6, mdev->net_conf->my_addr,
 602	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
 603	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
 604		src_in6.sin6_port = 0;
 605	else
 606		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
 607
 608	what = "bind before connect";
 609	err = sock->ops->bind(sock,
 610			      (struct sockaddr *) &src_in6,
 611			      mdev->net_conf->my_addr_len);
 612	if (err < 0)
 613		goto out;
 614
 615	/* connect may fail, peer not yet available.
 616	 * stay C_WF_CONNECTION, don't go Disconnecting! */
 617	disconnect_on_error = 0;
 618	what = "connect";
 619	err = sock->ops->connect(sock,
 620				 (struct sockaddr *)mdev->net_conf->peer_addr,
 621				 mdev->net_conf->peer_addr_len, 0);
 622
 623out:
 624	if (err < 0) {
 625		if (sock) {
 626			sock_release(sock);
 627			sock = NULL;
 628		}
 629		switch (-err) {
 630			/* timeout, busy, signal pending */
 631		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
 632		case EINTR: case ERESTARTSYS:
 633			/* peer not (yet) available, network problem */
 634		case ECONNREFUSED: case ENETUNREACH:
 635		case EHOSTDOWN:    case EHOSTUNREACH:
 636			disconnect_on_error = 0;
 637			break;
 638		default:
 639			dev_err(DEV, "%s failed, err = %d\n", what, err);
 640		}
 641		if (disconnect_on_error)
 642			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
 643	}
 644	put_net_conf(mdev);
 645	return sock;
 646}
 647
 648static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
 649{
 650	int timeo, err;
 651	struct socket *s_estab = NULL, *s_listen;
 652	const char *what;
 653
 654	if (!get_net_conf(mdev))
 655		return NULL;
 656
 657	what = "sock_create_kern";
 658	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
 659		SOCK_STREAM, IPPROTO_TCP, &s_listen);
 660	if (err) {
 661		s_listen = NULL;
 662		goto out;
 663	}
 664
 665	timeo = mdev->net_conf->try_connect_int * HZ;
 666	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
 667
 668	s_listen->sk->sk_reuse    = SK_CAN_REUSE; /* SO_REUSEADDR */
 669	s_listen->sk->sk_rcvtimeo = timeo;
 670	s_listen->sk->sk_sndtimeo = timeo;
 671	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
 672			mdev->net_conf->rcvbuf_size);
 673
 674	what = "bind before listen";
 675	err = s_listen->ops->bind(s_listen,
 676			      (struct sockaddr *) mdev->net_conf->my_addr,
 677			      mdev->net_conf->my_addr_len);
 678	if (err < 0)
 679		goto out;
 680
 681	err = drbd_accept(mdev, &what, s_listen, &s_estab);
 682
 683out:
 684	if (s_listen)
 685		sock_release(s_listen);
 686	if (err < 0) {
 687		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 688			dev_err(DEV, "%s failed, err = %d\n", what, err);
 689			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
 690		}
 691	}
 692	put_net_conf(mdev);
 693
 694	return s_estab;
 695}
 696
 697static int drbd_send_fp(struct drbd_conf *mdev,
 698	struct socket *sock, enum drbd_packets cmd)
 699{
 700	struct p_header80 *h = &mdev->data.sbuf.header.h80;
 701
 702	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
 703}
 704
 705static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
 706{
 707	struct p_header80 *h = &mdev->data.rbuf.header.h80;
 708	int rr;
 709
 710	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
 711
 712	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
 713		return be16_to_cpu(h->command);
 714
 715	return 0xffff;
 716}
 717
 718/**
 719 * drbd_socket_okay() - Free the socket if its connection is not okay
 720 * @mdev:	DRBD device.
 721 * @sock:	pointer to the pointer to the socket.
 722 */
 723static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
 724{
 725	int rr;
 726	char tb[4];
 727
 728	if (!*sock)
 729		return false;
 730
 731	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
 732
 733	if (rr > 0 || rr == -EAGAIN) {
 734		return true;
 735	} else {
 736		sock_release(*sock);
 737		*sock = NULL;
 738		return false;
 739	}
 740}
 741
 742/*
 743 * return values:
 744 *   1 yes, we have a valid connection
 745 *   0 oops, did not work out, please try again
 746 *  -1 peer talks different language,
 747 *     no point in trying again, please go standalone.
 748 *  -2 We do not have a network config...
 749 */
 750static int drbd_connect(struct drbd_conf *mdev)
 751{
 752	struct socket *s, *sock, *msock;
 753	int try, h, ok;
 754	enum drbd_state_rv rv;
 755
 756	D_ASSERT(!mdev->data.socket);
 757
 758	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
 759		return -2;
 760
 761	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
 762
 763	sock  = NULL;
 764	msock = NULL;
 765
 766	do {
 767		for (try = 0;;) {
 768			/* 3 tries, this should take less than a second! */
 769			s = drbd_try_connect(mdev);
 770			if (s || ++try >= 3)
 771				break;
 772			/* give the other side time to call bind() & listen() */
 773			schedule_timeout_interruptible(HZ / 10);
 774		}
 775
 776		if (s) {
 777			if (!sock) {
 778				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
 779				sock = s;
 780				s = NULL;
 781			} else if (!msock) {
 782				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
 783				msock = s;
 784				s = NULL;
 785			} else {
 786				dev_err(DEV, "Logic error in drbd_connect()\n");
 787				goto out_release_sockets;
 788			}
 789		}
 790
 791		if (sock && msock) {
 792			schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
 793			ok = drbd_socket_okay(mdev, &sock);
 794			ok = drbd_socket_okay(mdev, &msock) && ok;
 795			if (ok)
 796				break;
 797		}
 798
 799retry:
 800		s = drbd_wait_for_connect(mdev);
 801		if (s) {
 802			try = drbd_recv_fp(mdev, s);
 803			drbd_socket_okay(mdev, &sock);
 804			drbd_socket_okay(mdev, &msock);
 805			switch (try) {
 806			case P_HAND_SHAKE_S:
 807				if (sock) {
 808					dev_warn(DEV, "initial packet S crossed\n");
 809					sock_release(sock);
 810				}
 811				sock = s;
 812				break;
 813			case P_HAND_SHAKE_M:
 814				if (msock) {
 815					dev_warn(DEV, "initial packet M crossed\n");
 816					sock_release(msock);
 817				}
 818				msock = s;
 819				set_bit(DISCARD_CONCURRENT, &mdev->flags);
 820				break;
 821			default:
 822				dev_warn(DEV, "Error receiving initial packet\n");
 823				sock_release(s);
 824				if (random32() & 1)
 825					goto retry;
 826			}
 827		}
 828
 829		if (mdev->state.conn <= C_DISCONNECTING)
 830			goto out_release_sockets;
 831		if (signal_pending(current)) {
 832			flush_signals(current);
 833			smp_rmb();
 834			if (get_t_state(&mdev->receiver) == Exiting)
 835				goto out_release_sockets;
 836		}
 837
 838		if (sock && msock) {
 839			ok = drbd_socket_okay(mdev, &sock);
 840			ok = drbd_socket_okay(mdev, &msock) && ok;
 841			if (ok)
 842				break;
 843		}
 844	} while (1);
 845
 846	msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 847	sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 848
 849	sock->sk->sk_allocation = GFP_NOIO;
 850	msock->sk->sk_allocation = GFP_NOIO;
 851
 852	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
 853	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
 854
 855	/* NOT YET ...
 856	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
 857	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
 858	 * first set it to the P_HAND_SHAKE timeout,
 859	 * which we set to 4x the configured ping_timeout. */
 860	sock->sk->sk_sndtimeo =
 861	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
 862
 863	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
 864	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
 865
 866	/* we don't want delays.
 867	 * we use TCP_CORK where appropriate, though */
 868	drbd_tcp_nodelay(sock);
 869	drbd_tcp_nodelay(msock);
 870
 871	mdev->data.socket = sock;
 872	mdev->meta.socket = msock;
 873	mdev->last_received = jiffies;
 874
 875	D_ASSERT(mdev->asender.task == NULL);
 876
 877	h = drbd_do_handshake(mdev);
 878	if (h <= 0)
 879		return h;
 880
 881	if (mdev->cram_hmac_tfm) {
 882		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
 883		switch (drbd_do_auth(mdev)) {
 884		case -1:
 885			dev_err(DEV, "Authentication of peer failed\n");
 886			return -1;
 887		case 0:
 888			dev_err(DEV, "Authentication of peer failed, trying again.\n");
 889			return 0;
 890		}
 891	}
 892
 893	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
 894	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
 895
 896	atomic_set(&mdev->packet_seq, 0);
 897	mdev->peer_seq = 0;
 898
 899	if (drbd_send_protocol(mdev) == -1)
 900		return -1;
 901	set_bit(STATE_SENT, &mdev->flags);
 902	drbd_send_sync_param(mdev, &mdev->sync_conf);
 903	drbd_send_sizes(mdev, 0, 0);
 904	drbd_send_uuids(mdev);
 905	drbd_send_current_state(mdev);
 906	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
 907	clear_bit(RESIZE_PENDING, &mdev->flags);
 908
 909	spin_lock_irq(&mdev->req_lock);
 910	rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
 911	if (mdev->state.conn != C_WF_REPORT_PARAMS)
 912		clear_bit(STATE_SENT, &mdev->flags);
 913	spin_unlock_irq(&mdev->req_lock);
 914
 915	if (rv < SS_SUCCESS)
 916		return 0;
 917
 918	drbd_thread_start(&mdev->asender);
 919	mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
 920
 921	return 1;
 922
 923out_release_sockets:
 924	if (sock)
 925		sock_release(sock);
 926	if (msock)
 927		sock_release(msock);
 928	return -1;
 929}
 930
 931static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
 932{
 933	union p_header *h = &mdev->data.rbuf.header;
 934	int r;
 935
 936	r = drbd_recv(mdev, h, sizeof(*h));
 937	if (unlikely(r != sizeof(*h))) {
 938		if (!signal_pending(current))
 939			dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
 940		return false;
 941	}
 942
 943	if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
 944		*cmd = be16_to_cpu(h->h80.command);
 945		*packet_size = be16_to_cpu(h->h80.length);
 946	} else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
 947		*cmd = be16_to_cpu(h->h95.command);
 948		*packet_size = be32_to_cpu(h->h95.length);
 949	} else {
 950		dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
 951		    be32_to_cpu(h->h80.magic),
 952		    be16_to_cpu(h->h80.command),
 953		    be16_to_cpu(h->h80.length));
 954		return false;
 955	}
 956	mdev->last_received = jiffies;
 957
 958	return true;
 959}
 960
 961static void drbd_flush(struct drbd_conf *mdev)
 962{
 963	int rv;
 964
 965	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
 966		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
 967					NULL);
 968		if (rv) {
 969			dev_info(DEV, "local disk flush failed with status %d\n", rv);
 970			/* would rather check on EOPNOTSUPP, but that is not reliable.
 971			 * don't try again for ANY return value != 0
 972			 * if (rv == -EOPNOTSUPP) */
 973			drbd_bump_write_ordering(mdev, WO_drain_io);
 974		}
 975		put_ldev(mdev);
 976	}
 977}
 978
 979/**
 980 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
 981 * @mdev:	DRBD device.
 982 * @epoch:	Epoch object.
 983 * @ev:		Epoch event.
 984 */
 985static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
 986					       struct drbd_epoch *epoch,
 987					       enum epoch_event ev)
 988{
 989	int epoch_size;
 990	struct drbd_epoch *next_epoch;
 991	enum finish_epoch rv = FE_STILL_LIVE;
 992
 993	spin_lock(&mdev->epoch_lock);
 994	do {
 995		next_epoch = NULL;
 996
 997		epoch_size = atomic_read(&epoch->epoch_size);
 998
 999		switch (ev & ~EV_CLEANUP) {
1000		case EV_PUT:
1001			atomic_dec(&epoch->active);
1002			break;
1003		case EV_GOT_BARRIER_NR:
1004			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1005			break;
1006		case EV_BECAME_LAST:
1007			/* nothing to do*/
1008			break;
1009		}
1010
1011		if (epoch_size != 0 &&
1012		    atomic_read(&epoch->active) == 0 &&
1013		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1014			if (!(ev & EV_CLEANUP)) {
1015				spin_unlock(&mdev->epoch_lock);
1016				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1017				spin_lock(&mdev->epoch_lock);
1018			}
1019			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1020				dec_unacked(mdev);
1021
1022			if (mdev->current_epoch != epoch) {
1023				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1024				list_del(&epoch->list);
1025				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1026				mdev->epochs--;
1027				kfree(epoch);
1028
1029				if (rv == FE_STILL_LIVE)
1030					rv = FE_DESTROYED;
1031			} else {
1032				epoch->flags = 0;
1033				atomic_set(&epoch->epoch_size, 0);
1034				/* atomic_set(&epoch->active, 0); is already zero */
1035				if (rv == FE_STILL_LIVE)
1036					rv = FE_RECYCLED;
1037				wake_up(&mdev->ee_wait);
1038			}
1039		}
1040
1041		if (!next_epoch)
1042			break;
1043
1044		epoch = next_epoch;
1045	} while (1);
1046
1047	spin_unlock(&mdev->epoch_lock);
1048
1049	return rv;
1050}
1051
1052/**
1053 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1054 * @mdev:	DRBD device.
1055 * @wo:		Write ordering method to try.
1056 */
1057void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1058{
1059	enum write_ordering_e pwo;
1060	static char *write_ordering_str[] = {
1061		[WO_none] = "none",
1062		[WO_drain_io] = "drain",
1063		[WO_bdev_flush] = "flush",
1064	};
1065
1066	pwo = mdev->write_ordering;
1067	wo = min(pwo, wo);
1068	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1069		wo = WO_drain_io;
1070	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1071		wo = WO_none;
1072	mdev->write_ordering = wo;
1073	if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1074		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1075}
1076
1077/**
1078 * drbd_submit_ee()
1079 * @mdev:	DRBD device.
1080 * @e:		epoch entry
1081 * @rw:		flag field, see bio->bi_rw
1082 *
1083 * May spread the pages to multiple bios,
1084 * depending on bio_add_page restrictions.
1085 *
1086 * Returns 0 if all bios have been submitted,
1087 * -ENOMEM if we could not allocate enough bios,
1088 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1089 *  single page to an empty bio (which should never happen and likely indicates
1090 *  that the lower level IO stack is in some way broken). This has been observed
1091 *  on certain Xen deployments.
1092 */
1093/* TODO allocate from our own bio_set. */
1094int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1095		const unsigned rw, const int fault_type)
1096{
1097	struct bio *bios = NULL;
1098	struct bio *bio;
1099	struct page *page = e->pages;
1100	sector_t sector = e->sector;
1101	unsigned ds = e->size;
1102	unsigned n_bios = 0;
1103	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1104	int err = -ENOMEM;
1105
1106	/* In most cases, we will only need one bio.  But in case the lower
1107	 * level restrictions happen to be different at this offset on this
1108	 * side than those of the sending peer, we may need to submit the
1109	 * request in more than one bio.
1110	 *
1111	 * Plain bio_alloc is good enough here, this is no DRBD internally
1112	 * generated bio, but a bio allocated on behalf of the peer.
1113	 */
1114next_bio:
1115	bio = bio_alloc(GFP_NOIO, nr_pages);
1116	if (!bio) {
1117		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1118		goto fail;
1119	}
1120	/* > e->sector, unless this is the first bio */
1121	bio->bi_sector = sector;
1122	bio->bi_bdev = mdev->ldev->backing_bdev;
1123	bio->bi_rw = rw;
1124	bio->bi_private = e;
1125	bio->bi_end_io = drbd_endio_sec;
1126
1127	bio->bi_next = bios;
1128	bios = bio;
1129	++n_bios;
1130
1131	page_chain_for_each(page) {
1132		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1133		if (!bio_add_page(bio, page, len, 0)) {
1134			/* A single page must always be possible!
1135			 * But in case it fails anyways,
1136			 * we deal with it, and complain (below). */
1137			if (bio->bi_vcnt == 0) {
1138				dev_err(DEV,
1139					"bio_add_page failed for len=%u, "
1140					"bi_vcnt=0 (bi_sector=%llu)\n",
1141					len, (unsigned long long)bio->bi_sector);
1142				err = -ENOSPC;
1143				goto fail;
1144			}
1145			goto next_bio;
1146		}
1147		ds -= len;
1148		sector += len >> 9;
1149		--nr_pages;
1150	}
1151	D_ASSERT(page == NULL);
1152	D_ASSERT(ds == 0);
1153
1154	atomic_set(&e->pending_bios, n_bios);
1155	do {
1156		bio = bios;
1157		bios = bios->bi_next;
1158		bio->bi_next = NULL;
1159
1160		drbd_generic_make_request(mdev, fault_type, bio);
1161	} while (bios);
1162	return 0;
1163
1164fail:
1165	while (bios) {
1166		bio = bios;
1167		bios = bios->bi_next;
1168		bio_put(bio);
1169	}
1170	return err;
1171}
1172
1173static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1174{
1175	int rv;
1176	struct p_barrier *p = &mdev->data.rbuf.barrier;
1177	struct drbd_epoch *epoch;
1178
1179	inc_unacked(mdev);
1180
1181	mdev->current_epoch->barrier_nr = p->barrier;
1182	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1183
1184	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1185	 * the activity log, which means it would not be resynced in case the
1186	 * R_PRIMARY crashes now.
1187	 * Therefore we must send the barrier_ack after the barrier request was
1188	 * completed. */
1189	switch (mdev->write_ordering) {
1190	case WO_none:
1191		if (rv == FE_RECYCLED)
1192			return true;
1193
1194		/* receiver context, in the writeout path of the other node.
1195		 * avoid potential distributed deadlock */
1196		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1197		if (epoch)
1198			break;
1199		else
1200			dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1201			/* Fall through */
1202
1203	case WO_bdev_flush:
1204	case WO_drain_io:
1205		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1206		drbd_flush(mdev);
1207
1208		if (atomic_read(&mdev->current_epoch->epoch_size)) {
1209			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1210			if (epoch)
1211				break;
1212		}
1213
1214		epoch = mdev->current_epoch;
1215		wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1216
1217		D_ASSERT(atomic_read(&epoch->active) == 0);
1218		D_ASSERT(epoch->flags == 0);
1219
1220		return true;
1221	default:
1222		dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1223		return false;
1224	}
1225
1226	epoch->flags = 0;
1227	atomic_set(&epoch->epoch_size, 0);
1228	atomic_set(&epoch->active, 0);
1229
1230	spin_lock(&mdev->epoch_lock);
1231	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1232		list_add(&epoch->list, &mdev->current_epoch->list);
1233		mdev->current_epoch = epoch;
1234		mdev->epochs++;
1235	} else {
1236		/* The current_epoch got recycled while we allocated this one... */
1237		kfree(epoch);
1238	}
1239	spin_unlock(&mdev->epoch_lock);
1240
1241	return true;
1242}
1243
1244/* used from receive_RSDataReply (recv_resync_read)
1245 * and from receive_Data */
1246static struct drbd_epoch_entry *
1247read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1248{
1249	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1250	struct drbd_epoch_entry *e;
1251	struct page *page;
1252	int dgs, ds, rr;
1253	void *dig_in = mdev->int_dig_in;
1254	void *dig_vv = mdev->int_dig_vv;
1255	unsigned long *data;
1256
1257	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1258		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1259
1260	if (dgs) {
1261		rr = drbd_recv(mdev, dig_in, dgs);
1262		if (rr != dgs) {
1263			if (!signal_pending(current))
1264				dev_warn(DEV,
1265					"short read receiving data digest: read %d expected %d\n",
1266					rr, dgs);
1267			return NULL;
1268		}
1269	}
1270
1271	data_size -= dgs;
1272
1273	ERR_IF(data_size == 0) return NULL;
1274	ERR_IF(data_size &  0x1ff) return NULL;
1275	ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1276
1277	/* even though we trust out peer,
1278	 * we sometimes have to double check. */
1279	if (sector + (data_size>>9) > capacity) {
1280		dev_err(DEV, "request from peer beyond end of local disk: "
1281			"capacity: %llus < sector: %llus + size: %u\n",
1282			(unsigned long long)capacity,
1283			(unsigned long long)sector, data_size);
1284		return NULL;
1285	}
1286
1287	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1288	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1289	 * which in turn might block on the other node at this very place.  */
1290	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1291	if (!e)
1292		return NULL;
1293
1294	ds = data_size;
1295	page = e->pages;
1296	page_chain_for_each(page) {
1297		unsigned len = min_t(int, ds, PAGE_SIZE);
1298		data = kmap(page);
1299		rr = drbd_recv(mdev, data, len);
1300		if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1301			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1302			data[0] = data[0] ^ (unsigned long)-1;
1303		}
1304		kunmap(page);
1305		if (rr != len) {
1306			drbd_free_ee(mdev, e);
1307			if (!signal_pending(current))
1308				dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1309				rr, len);
1310			return NULL;
1311		}
1312		ds -= rr;
1313	}
1314
1315	if (dgs) {
1316		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1317		if (memcmp(dig_in, dig_vv, dgs)) {
1318			dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1319				(unsigned long long)sector, data_size);
1320			drbd_bcast_ee(mdev, "digest failed",
1321					dgs, dig_in, dig_vv, e);
1322			drbd_free_ee(mdev, e);
1323			return NULL;
1324		}
1325	}
1326	mdev->recv_cnt += data_size>>9;
1327	return e;
1328}
1329
1330/* drbd_drain_block() just takes a data block
1331 * out of the socket input buffer, and discards it.
1332 */
1333static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1334{
1335	struct page *page;
1336	int rr, rv = 1;
1337	void *data;
1338
1339	if (!data_size)
1340		return true;
1341
1342	page = drbd_pp_alloc(mdev, 1, 1);
1343
1344	data = kmap(page);
1345	while (data_size) {
1346		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1347		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1348			rv = 0;
1349			if (!signal_pending(current))
1350				dev_warn(DEV,
1351					"short read receiving data: read %d expected %d\n",
1352					rr, min_t(int, data_size, PAGE_SIZE));
1353			break;
1354		}
1355		data_size -= rr;
1356	}
1357	kunmap(page);
1358	drbd_pp_free(mdev, page, 0);
1359	return rv;
1360}
1361
1362static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1363			   sector_t sector, int data_size)
1364{
1365	struct bio_vec *bvec;
1366	struct bio *bio;
1367	int dgs, rr, i, expect;
1368	void *dig_in = mdev->int_dig_in;
1369	void *dig_vv = mdev->int_dig_vv;
1370
1371	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1372		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1373
1374	if (dgs) {
1375		rr = drbd_recv(mdev, dig_in, dgs);
1376		if (rr != dgs) {
1377			if (!signal_pending(current))
1378				dev_warn(DEV,
1379					"short read receiving data reply digest: read %d expected %d\n",
1380					rr, dgs);
1381			return 0;
1382		}
1383	}
1384
1385	data_size -= dgs;
1386
1387	/* optimistically update recv_cnt.  if receiving fails below,
1388	 * we disconnect anyways, and counters will be reset. */
1389	mdev->recv_cnt += data_size>>9;
1390
1391	bio = req->master_bio;
1392	D_ASSERT(sector == bio->bi_sector);
1393
1394	bio_for_each_segment(bvec, bio, i) {
1395		expect = min_t(int, data_size, bvec->bv_len);
1396		rr = drbd_recv(mdev,
1397			     kmap(bvec->bv_page)+bvec->bv_offset,
1398			     expect);
1399		kunmap(bvec->bv_page);
1400		if (rr != expect) {
1401			if (!signal_pending(current))
1402				dev_warn(DEV, "short read receiving data reply: "
1403					"read %d expected %d\n",
1404					rr, expect);
1405			return 0;
1406		}
1407		data_size -= rr;
1408	}
1409
1410	if (dgs) {
1411		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1412		if (memcmp(dig_in, dig_vv, dgs)) {
1413			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1414			return 0;
1415		}
1416	}
1417
1418	D_ASSERT(data_size == 0);
1419	return 1;
1420}
1421
1422/* e_end_resync_block() is called via
1423 * drbd_process_done_ee() by asender only */
1424static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1425{
1426	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1427	sector_t sector = e->sector;
1428	int ok;
1429
1430	D_ASSERT(hlist_unhashed(&e->collision));
1431
1432	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1433		drbd_set_in_sync(mdev, sector, e->size);
1434		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1435	} else {
1436		/* Record failure to sync */
1437		drbd_rs_failed_io(mdev, sector, e->size);
1438
1439		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1440	}
1441	dec_unacked(mdev);
1442
1443	return ok;
1444}
1445
1446static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1447{
1448	struct drbd_epoch_entry *e;
1449
1450	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1451	if (!e)
1452		goto fail;
1453
1454	dec_rs_pending(mdev);
1455
1456	inc_unacked(mdev);
1457	/* corresponding dec_unacked() in e_end_resync_block()
1458	 * respective _drbd_clear_done_ee */
1459
1460	e->w.cb = e_end_resync_block;
1461
1462	spin_lock_irq(&mdev->req_lock);
1463	list_add(&e->w.list, &mdev->sync_ee);
1464	spin_unlock_irq(&mdev->req_lock);
1465
1466	atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1467	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1468		return true;
1469
1470	/* don't care for the reason here */
1471	dev_err(DEV, "submit failed, triggering re-connect\n");
1472	spin_lock_irq(&mdev->req_lock);
1473	list_del(&e->w.list);
1474	spin_unlock_irq(&mdev->req_lock);
1475
1476	drbd_free_ee(mdev, e);
1477fail:
1478	put_ldev(mdev);
1479	return false;
1480}
1481
1482static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1483{
1484	struct drbd_request *req;
1485	sector_t sector;
1486	int ok;
1487	struct p_data *p = &mdev->data.rbuf.data;
1488
1489	sector = be64_to_cpu(p->sector);
1490
1491	spin_lock_irq(&mdev->req_lock);
1492	req = _ar_id_to_req(mdev, p->block_id, sector);
1493	spin_unlock_irq(&mdev->req_lock);
1494	if (unlikely(!req)) {
1495		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1496		return false;
1497	}
1498
1499	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1500	 * special casing it there for the various failure cases.
1501	 * still no race with drbd_fail_pending_reads */
1502	ok = recv_dless_read(mdev, req, sector, data_size);
1503
1504	if (ok)
1505		req_mod(req, data_received);
1506	/* else: nothing. handled from drbd_disconnect...
1507	 * I don't think we may complete this just yet
1508	 * in case we are "on-disconnect: freeze" */
1509
1510	return ok;
1511}
1512
1513static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1514{
1515	sector_t sector;
1516	int ok;
1517	struct p_data *p = &mdev->data.rbuf.data;
1518
1519	sector = be64_to_cpu(p->sector);
1520	D_ASSERT(p->block_id == ID_SYNCER);
1521
1522	if (get_ldev(mdev)) {
1523		/* data is submitted to disk within recv_resync_read.
1524		 * corresponding put_ldev done below on error,
1525		 * or in drbd_endio_write_sec. */
1526		ok = recv_resync_read(mdev, sector, data_size);
1527	} else {
1528		if (__ratelimit(&drbd_ratelimit_state))
1529			dev_err(DEV, "Can not write resync data to local disk.\n");
1530
1531		ok = drbd_drain_block(mdev, data_size);
1532
1533		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1534	}
1535
1536	atomic_add(data_size >> 9, &mdev->rs_sect_in);
1537
1538	return ok;
1539}
1540
1541/* e_end_block() is called via drbd_process_done_ee().
1542 * this means this function only runs in the asender thread
1543 */
1544static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1545{
1546	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1547	sector_t sector = e->sector;
1548	int ok = 1, pcmd;
1549
1550	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1551		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1552			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1553				mdev->state.conn <= C_PAUSED_SYNC_T &&
1554				e->flags & EE_MAY_SET_IN_SYNC) ?
1555				P_RS_WRITE_ACK : P_WRITE_ACK;
1556			ok &= drbd_send_ack(mdev, pcmd, e);
1557			if (pcmd == P_RS_WRITE_ACK)
1558				drbd_set_in_sync(mdev, sector, e->size);
1559		} else {
1560			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1561			/* we expect it to be marked out of sync anyways...
1562			 * maybe assert this?  */
1563		}
1564		dec_unacked(mdev);
1565	}
1566	/* we delete from the conflict detection hash _after_ we sent out the
1567	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1568	if (mdev->net_conf->two_primaries) {
1569		spin_lock_irq(&mdev->req_lock);
1570		D_ASSERT(!hlist_unhashed(&e->collision));
1571		hlist_del_init(&e->collision);
1572		spin_unlock_irq(&mdev->req_lock);
1573	} else {
1574		D_ASSERT(hlist_unhashed(&e->collision));
1575	}
1576
1577	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1578
1579	return ok;
1580}
1581
1582static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1583{
1584	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1585	int ok = 1;
1586
1587	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1588	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1589
1590	spin_lock_irq(&mdev->req_lock);
1591	D_ASSERT(!hlist_unhashed(&e->collision));
1592	hlist_del_init(&e->collision);
1593	spin_unlock_irq(&mdev->req_lock);
1594
1595	dec_unacked(mdev);
1596
1597	return ok;
1598}
1599
1600static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1601{
1602
1603	struct drbd_epoch_entry *rs_e;
1604	bool rv = 0;
1605
1606	spin_lock_irq(&mdev->req_lock);
1607	list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1608		if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1609			rv = 1;
1610			break;
1611		}
1612	}
1613	spin_unlock_irq(&mdev->req_lock);
1614
1615	return rv;
1616}
1617
1618/* Called from receive_Data.
1619 * Synchronize packets on sock with packets on msock.
1620 *
1621 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1622 * packet traveling on msock, they are still processed in the order they have
1623 * been sent.
1624 *
1625 * Note: we don't care for Ack packets overtaking P_DATA packets.
1626 *
1627 * In case packet_seq is larger than mdev->peer_seq number, there are
1628 * outstanding packets on the msock. We wait for them to arrive.
1629 * In case we are the logically next packet, we update mdev->peer_seq
1630 * ourselves. Correctly handles 32bit wrap around.
1631 *
1632 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1633 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1634 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1635 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1636 *
1637 * returns 0 if we may process the packet,
1638 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1639static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1640{
1641	DEFINE_WAIT(wait);
1642	unsigned int p_seq;
1643	long timeout;
1644	int ret = 0;
1645	spin_lock(&mdev->peer_seq_lock);
1646	for (;;) {
1647		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1648		if (seq_le(packet_seq, mdev->peer_seq+1))
1649			break;
1650		if (signal_pending(current)) {
1651			ret = -ERESTARTSYS;
1652			break;
1653		}
1654		p_seq = mdev->peer_seq;
1655		spin_unlock(&mdev->peer_seq_lock);
1656		timeout = schedule_timeout(30*HZ);
1657		spin_lock(&mdev->peer_seq_lock);
1658		if (timeout == 0 && p_seq == mdev->peer_seq) {
1659			ret = -ETIMEDOUT;
1660			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1661			break;
1662		}
1663	}
1664	finish_wait(&mdev->seq_wait, &wait);
1665	if (mdev->peer_seq+1 == packet_seq)
1666		mdev->peer_seq++;
1667	spin_unlock(&mdev->peer_seq_lock);
1668	return ret;
1669}
1670
1671/* see also bio_flags_to_wire()
1672 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1673 * flags and back. We may replicate to other kernel versions. */
1674static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1675{
1676	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1677		(dpf & DP_FUA ? REQ_FUA : 0) |
1678		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1679		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
1680}
1681
1682/* mirrored write */
1683static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1684{
1685	sector_t sector;
1686	struct drbd_epoch_entry *e;
1687	struct p_data *p = &mdev->data.rbuf.data;
1688	int rw = WRITE;
1689	u32 dp_flags;
1690
1691	if (!get_ldev(mdev)) {
1692		spin_lock(&mdev->peer_seq_lock);
1693		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1694			mdev->peer_seq++;
1695		spin_unlock(&mdev->peer_seq_lock);
1696
1697		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1698		atomic_inc(&mdev->current_epoch->epoch_size);
1699		return drbd_drain_block(mdev, data_size);
1700	}
1701
1702	/* get_ldev(mdev) successful.
1703	 * Corresponding put_ldev done either below (on various errors),
1704	 * or in drbd_endio_write_sec, if we successfully submit the data at
1705	 * the end of this function. */
1706
1707	sector = be64_to_cpu(p->sector);
1708	e = read_in_block(mdev, p->block_id, sector, data_size);
1709	if (!e) {
1710		put_ldev(mdev);
1711		return false;
1712	}
1713
1714	e->w.cb = e_end_block;
1715
1716	dp_flags = be32_to_cpu(p->dp_flags);
1717	rw |= wire_flags_to_bio(mdev, dp_flags);
1718
1719	if (dp_flags & DP_MAY_SET_IN_SYNC)
1720		e->flags |= EE_MAY_SET_IN_SYNC;
1721
1722	spin_lock(&mdev->epoch_lock);
1723	e->epoch = mdev->current_epoch;
1724	atomic_inc(&e->epoch->epoch_size);
1725	atomic_inc(&e->epoch->active);
1726	spin_unlock(&mdev->epoch_lock);
1727
1728	/* I'm the receiver, I do hold a net_cnt reference. */
1729	if (!mdev->net_conf->two_primaries) {
1730		spin_lock_irq(&mdev->req_lock);
1731	} else {
1732		/* don't get the req_lock yet,
1733		 * we may sleep in drbd_wait_peer_seq */
1734		const int size = e->size;
1735		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1736		DEFINE_WAIT(wait);
1737		struct drbd_request *i;
1738		struct hlist_node *n;
1739		struct hlist_head *slot;
1740		int first;
1741
1742		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1743		BUG_ON(mdev->ee_hash == NULL);
1744		BUG_ON(mdev->tl_hash == NULL);
1745
1746		/* conflict detection and handling:
1747		 * 1. wait on the sequence number,
1748		 *    in case this data packet overtook ACK packets.
1749		 * 2. check our hash tables for conflicting requests.
1750		 *    we only need to walk the tl_hash, since an ee can not
1751		 *    have a conflict with an other ee: on the submitting
1752		 *    node, the corresponding req had already been conflicting,
1753		 *    and a conflicting req is never sent.
1754		 *
1755		 * Note: for two_primaries, we are protocol C,
1756		 * so there cannot be any request that is DONE
1757		 * but still on the transfer log.
1758		 *
1759		 * unconditionally add to the ee_hash.
1760		 *
1761		 * if no conflicting request is found:
1762		 *    submit.
1763		 *
1764		 * if any conflicting request is found
1765		 * that has not yet been acked,
1766		 * AND I have the "discard concurrent writes" flag:
1767		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1768		 *
1769		 * if any conflicting request is found:
1770		 *	 block the receiver, waiting on misc_wait
1771		 *	 until no more conflicting requests are there,
1772		 *	 or we get interrupted (disconnect).
1773		 *
1774		 *	 we do not just write after local io completion of those
1775		 *	 requests, but only after req is done completely, i.e.
1776		 *	 we wait for the P_DISCARD_ACK to arrive!
1777		 *
1778		 *	 then proceed normally, i.e. submit.
1779		 */
1780		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1781			goto out_interrupted;
1782
1783		spin_lock_irq(&mdev->req_lock);
1784
1785		hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1786
1787#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1788		slot = tl_hash_slot(mdev, sector);
1789		first = 1;
1790		for (;;) {
1791			int have_unacked = 0;
1792			int have_conflict = 0;
1793			prepare_to_wait(&mdev->misc_wait, &wait,
1794				TASK_INTERRUPTIBLE);
1795			hlist_for_each_entry(i, n, slot, collision) {
1796				if (OVERLAPS) {
1797					/* only ALERT on first iteration,
1798					 * we may be woken up early... */
1799					if (first)
1800						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1801						      "	new: %llus +%u; pending: %llus +%u\n",
1802						      current->comm, current->pid,
1803						      (unsigned long long)sector, size,
1804						      (unsigned long long)i->sector, i->size);
1805					if (i->rq_state & RQ_NET_PENDING)
1806						++have_unacked;
1807					++have_conflict;
1808				}
1809			}
1810#undef OVERLAPS
1811			if (!have_conflict)
1812				break;
1813
1814			/* Discard Ack only for the _first_ iteration */
1815			if (first && discard && have_unacked) {
1816				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1817				     (unsigned long long)sector);
1818				inc_unacked(mdev);
1819				e->w.cb = e_send_discard_ack;
1820				list_add_tail(&e->w.list, &mdev->done_ee);
1821
1822				spin_unlock_irq(&mdev->req_lock);
1823
1824				/* we could probably send that P_DISCARD_ACK ourselves,
1825				 * but I don't like the receiver using the msock */
1826
1827				put_ldev(mdev);
1828				wake_asender(mdev);
1829				finish_wait(&mdev->misc_wait, &wait);
1830				return true;
1831			}
1832
1833			if (signal_pending(current)) {
1834				hlist_del_init(&e->collision);
1835
1836				spin_unlock_irq(&mdev->req_lock);
1837
1838				finish_wait(&mdev->misc_wait, &wait);
1839				goto out_interrupted;
1840			}
1841
1842			spin_unlock_irq(&mdev->req_lock);
1843			if (first) {
1844				first = 0;
1845				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1846				     "sec=%llus\n", (unsigned long long)sector);
1847			} else if (discard) {
1848				/* we had none on the first iteration.
1849				 * there must be none now. */
1850				D_ASSERT(have_unacked == 0);
1851			}
1852			schedule();
1853			spin_lock_irq(&mdev->req_lock);
1854		}
1855		finish_wait(&mdev->misc_wait, &wait);
1856	}
1857
1858	list_add(&e->w.list, &mdev->active_ee);
1859	spin_unlock_irq(&mdev->req_lock);
1860
1861	if (mdev->state.conn == C_SYNC_TARGET)
1862		wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1863
1864	switch (mdev->net_conf->wire_protocol) {
1865	case DRBD_PROT_C:
1866		inc_unacked(mdev);
1867		/* corresponding dec_unacked() in e_end_block()
1868		 * respective _drbd_clear_done_ee */
1869		break;
1870	case DRBD_PROT_B:
1871		/* I really don't like it that the receiver thread
1872		 * sends on the msock, but anyways */
1873		drbd_send_ack(mdev, P_RECV_ACK, e);
1874		break;
1875	case DRBD_PROT_A:
1876		/* nothing to do */
1877		break;
1878	}
1879
1880	if (mdev->state.pdsk < D_INCONSISTENT) {
1881		/* In case we have the only disk of the cluster, */
1882		drbd_set_out_of_sync(mdev, e->sector, e->size);
1883		e->flags |= EE_CALL_AL_COMPLETE_IO;
1884		e->flags &= ~EE_MAY_SET_IN_SYNC;
1885		drbd_al_begin_io(mdev, e->sector);
1886	}
1887
1888	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1889		return true;
1890
1891	/* don't care for the reason here */
1892	dev_err(DEV, "submit failed, triggering re-connect\n");
1893	spin_lock_irq(&mdev->req_lock);
1894	list_del(&e->w.list);
1895	hlist_del_init(&e->collision);
1896	spin_unlock_irq(&mdev->req_lock);
1897	if (e->flags & EE_CALL_AL_COMPLETE_IO)
1898		drbd_al_complete_io(mdev, e->sector);
1899
1900out_interrupted:
1901	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1902	put_ldev(mdev);
1903	drbd_free_ee(mdev, e);
1904	return false;
1905}
1906
1907/* We may throttle resync, if the lower device seems to be busy,
1908 * and current sync rate is above c_min_rate.
1909 *
1910 * To decide whether or not the lower device is busy, we use a scheme similar
1911 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1912 * (more than 64 sectors) of activity we cannot account for with our own resync
1913 * activity, it obviously is "busy".
1914 *
1915 * The current sync rate used here uses only the most recent two step marks,
1916 * to have a short time average so we can react faster.
1917 */
1918int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1919{
1920	struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1921	unsigned long db, dt, dbdt;
1922	struct lc_element *tmp;
1923	int curr_events;
1924	int throttle = 0;
1925
1926	/* feature disabled? */
1927	if (mdev->sync_conf.c_min_rate == 0)
1928		return 0;
1929
1930	spin_lock_irq(&mdev->al_lock);
1931	tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1932	if (tmp) {
1933		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1934		if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1935			spin_unlock_irq(&mdev->al_lock);
1936			return 0;
1937		}
1938		/* Do not slow down if app IO is already waiting for this extent */
1939	}
1940	spin_unlock_irq(&mdev->al_lock);
1941
1942	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1943		      (int)part_stat_read(&disk->part0, sectors[1]) -
1944			atomic_read(&mdev->rs_sect_ev);
1945
1946	if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1947		unsigned long rs_left;
1948		int i;
1949
1950		mdev->rs_last_events = curr_events;
1951
1952		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1953		 * approx. */
1954		i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1955
1956		if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1957			rs_left = mdev->ov_left;
1958		else
1959			rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1960
1961		dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1962		if (!dt)
1963			dt++;
1964		db = mdev->rs_mark_left[i] - rs_left;
1965		dbdt = Bit2KB(db/dt);
1966
1967		if (dbdt > mdev->sync_conf.c_min_rate)
1968			throttle = 1;
1969	}
1970	return throttle;
1971}
1972
1973
1974static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1975{
1976	sector_t sector;
1977	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1978	struct drbd_epoch_entry *e;
1979	struct digest_info *di = NULL;
1980	int size, verb;
1981	unsigned int fault_type;
1982	struct p_block_req *p =	&mdev->data.rbuf.block_req;
1983
1984	sector = be64_to_cpu(p->sector);
1985	size   = be32_to_cpu(p->blksize);
1986
1987	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1988		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1989				(unsigned long long)sector, size);
1990		return false;
1991	}
1992	if (sector + (size>>9) > capacity) {
1993		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1994				(unsigned long long)sector, size);
1995		return false;
1996	}
1997
1998	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1999		verb = 1;
2000		switch (cmd) {
2001		case P_DATA_REQUEST:
2002			drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2003			break;
2004		case P_RS_DATA_REQUEST:
2005		case P_CSUM_RS_REQUEST:
2006		case P_OV_REQUEST:
2007			drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2008			break;
2009		case P_OV_REPLY:
2010			verb = 0;
2011			dec_rs_pending(mdev);
2012			drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2013			break;
2014		default:
2015			dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2016				cmdname(cmd));
2017		}
2018		if (verb && __ratelimit(&drbd_ratelimit_state))
2019			dev_err(DEV, "Can not satisfy peer's read request, "
2020			    "no local data.\n");
2021
2022		/* drain possibly payload */
2023		return drbd_drain_block(mdev, digest_size);
2024	}
2025
2026	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2027	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2028	 * which in turn might block on the other node at this very place.  */
2029	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2030	if (!e) {
2031		put_ldev(mdev);
2032		return false;
2033	}
2034
2035	switch (cmd) {
2036	case P_DATA_REQUEST:
2037		e->w.cb = w_e_end_data_req;
2038		fault_type = DRBD_FAULT_DT_RD;
2039		/* application IO, don't drbd_rs_begin_io */
2040		goto submit;
2041
2042	case P_RS_DATA_REQUEST:
2043		e->w.cb = w_e_end_rsdata_req;
2044		fault_type = DRBD_FAULT_RS_RD;
2045		/* used in the sector offset progress display */
2046		mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2047		break;
2048
2049	case P_OV_REPLY:
2050	case P_CSUM_RS_REQUEST:
2051		fault_type = DRBD_FAULT_RS_RD;
2052		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2053		if (!di)
2054			goto out_free_e;
2055
2056		di->digest_size = digest_size;
2057		di->digest = (((char *)di)+sizeof(struct digest_info));
2058
2059		e->digest = di;
2060		e->flags |= EE_HAS_DIGEST;
2061
2062		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2063			goto out_free_e;
2064
2065		if (cmd == P_CSUM_RS_REQUEST) {
2066			D_ASSERT(mdev->agreed_pro_version >= 89);
2067			e->w.cb = w_e_end_csum_rs_req;
2068			/* used in the sector offset progress display */
2069			mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2070		} else if (cmd == P_OV_REPLY) {
2071			/* track progress, we may need to throttle */
2072			atomic_add(size >> 9, &mdev->rs_sect_in);
2073			e->w.cb = w_e_end_ov_reply;
2074			dec_rs_pending(mdev);
2075			/* drbd_rs_begin_io done when we sent this request,
2076			 * but accounting still needs to be done. */
2077			goto submit_for_resync;
2078		}
2079		break;
2080
2081	case P_OV_REQUEST:
2082		if (mdev->ov_start_sector == ~(sector_t)0 &&
2083		    mdev->agreed_pro_version >= 90) {
2084			unsigned long now = jiffies;
2085			int i;
2086			mdev->ov_start_sector = sector;
2087			mdev->ov_position = sector;
2088			mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2089			mdev->rs_total = mdev->ov_left;
2090			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2091				mdev->rs_mark_left[i] = mdev->ov_left;
2092				mdev->rs_mark_time[i] = now;
2093			}
2094			dev_info(DEV, "Online Verify start sector: %llu\n",
2095					(unsigned long long)sector);
2096		}
2097		e->w.cb = w_e_end_ov_req;
2098		fault_type = DRBD_FAULT_RS_RD;
2099		break;
2100
2101	default:
2102		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2103		    cmdname(cmd));
2104		fault_type = DRBD_FAULT_MAX;
2105		goto out_free_e;
2106	}
2107
2108	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2109	 * wrt the receiver, but it is not as straightforward as it may seem.
2110	 * Various places in the resync start and stop logic assume resync
2111	 * requests are processed in order, requeuing this on the worker thread
2112	 * introduces a bunch of new code for synchronization between threads.
2113	 *
2114	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2115	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2116	 * for application writes for the same time.  For now, just throttle
2117	 * here, where the rest of the code expects the receiver to sleep for
2118	 * a while, anyways.
2119	 */
2120
2121	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2122	 * this defers syncer requests for some time, before letting at least
2123	 * on request through.  The resync controller on the receiving side
2124	 * will adapt to the incoming rate accordingly.
2125	 *
2126	 * We cannot throttle here if remote is Primary/SyncTarget:
2127	 * we would also throttle its application reads.
2128	 * In that case, throttling is done on the SyncTarget only.
2129	 */
2130	if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2131		schedule_timeout_uninterruptible(HZ/10);
2132	if (drbd_rs_begin_io(mdev, sector))
2133		goto out_free_e;
2134
2135submit_for_resync:
2136	atomic_add(size >> 9, &mdev->rs_sect_ev);
2137
2138submit:
2139	inc_unacked(mdev);
2140	spin_lock_irq(&mdev->req_lock);
2141	list_add_tail(&e->w.list, &mdev->read_ee);
2142	spin_unlock_irq(&mdev->req_lock);
2143
2144	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2145		return true;
2146
2147	/* don't care for the reason here */
2148	dev_err(DEV, "submit failed, triggering re-connect\n");
2149	spin_lock_irq(&mdev->req_lock);
2150	list_del(&e->w.list);
2151	spin_unlock_irq(&mdev->req_lock);
2152	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2153
2154out_free_e:
2155	put_ldev(mdev);
2156	drbd_free_ee(mdev, e);
2157	return false;
2158}
2159
2160static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2161{
2162	int self, peer, rv = -100;
2163	unsigned long ch_self, ch_peer;
2164
2165	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2166	peer = mdev->p_uuid[UI_BITMAP] & 1;
2167
2168	ch_peer = mdev->p_uuid[UI_SIZE];
2169	ch_self = mdev->comm_bm_set;
2170
2171	switch (mdev->net_conf->after_sb_0p) {
2172	case ASB_CONSENSUS:
2173	case ASB_DISCARD_SECONDARY:
2174	case ASB_CALL_HELPER:
2175		dev_err(DEV, "Configuration error.\n");
2176		break;
2177	case ASB_DISCONNECT:
2178		break;
2179	case ASB_DISCARD_YOUNGER_PRI:
2180		if (self == 0 && peer == 1) {
2181			rv = -1;
2182			break;
2183		}
2184		if (self == 1 && peer == 0) {
2185			rv =  1;
2186			break;
2187		}
2188		/* Else fall through to one of the other strategies... */
2189	case ASB_DISCARD_OLDER_PRI:
2190		if (self == 0 && peer == 1) {
2191			rv = 1;
2192			break;
2193		}
2194		if (self == 1 && peer == 0) {
2195			rv = -1;
2196			break;
2197		}
2198		/* Else fall through to one of the other strategies... */
2199		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2200		     "Using discard-least-changes instead\n");
2201	case ASB_DISCARD_ZERO_CHG:
2202		if (ch_peer == 0 && ch_self == 0) {
2203			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2204				? -1 : 1;
2205			break;
2206		} else {
2207			if (ch_peer == 0) { rv =  1; break; }
2208			if (ch_self == 0) { rv = -1; break; }
2209		}
2210		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2211			break;
2212	case ASB_DISCARD_LEAST_CHG:
2213		if	(ch_self < ch_peer)
2214			rv = -1;
2215		else if (ch_self > ch_peer)
2216			rv =  1;
2217		else /* ( ch_self == ch_peer ) */
2218		     /* Well, then use something else. */
2219			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2220				? -1 : 1;
2221		break;
2222	case ASB_DISCARD_LOCAL:
2223		rv = -1;
2224		break;
2225	case ASB_DISCARD_REMOTE:
2226		rv =  1;
2227	}
2228
2229	return rv;
2230}
2231
2232static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2233{
2234	int hg, rv = -100;
2235
2236	switch (mdev->net_conf->after_sb_1p) {
2237	case ASB_DISCARD_YOUNGER_PRI:
2238	case ASB_DISCARD_OLDER_PRI:
2239	case ASB_DISCARD_LEAST_CHG:
2240	case ASB_DISCARD_LOCAL:
2241	case ASB_DISCARD_REMOTE:
2242		dev_err(DEV, "Configuration error.\n");
2243		break;
2244	case ASB_DISCONNECT:
2245		break;
2246	case ASB_CONSENSUS:
2247		hg = drbd_asb_recover_0p(mdev);
2248		if (hg == -1 && mdev->state.role == R_SECONDARY)
2249			rv = hg;
2250		if (hg == 1  && mdev->state.role == R_PRIMARY)
2251			rv = hg;
2252		break;
2253	case ASB_VIOLENTLY:
2254		rv = drbd_asb_recover_0p(mdev);
2255		break;
2256	case ASB_DISCARD_SECONDARY:
2257		return mdev->state.role == R_PRIMARY ? 1 : -1;
2258	case ASB_CALL_HELPER:
2259		hg = drbd_asb_recover_0p(mdev);
2260		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2261			enum drbd_state_rv rv2;
2262
2263			drbd_set_role(mdev, R_SECONDARY, 0);
2264			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2265			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2266			  * we do not need to wait for the after state change work either. */
2267			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2268			if (rv2 != SS_SUCCESS) {
2269				drbd_khelper(mdev, "pri-lost-after-sb");
2270			} else {
2271				dev_warn(DEV, "Successfully gave up primary role.\n");
2272				rv = hg;
2273			}
2274		} else
2275			rv = hg;
2276	}
2277
2278	return rv;
2279}
2280
2281static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2282{
2283	int hg, rv = -100;
2284
2285	switch (mdev->net_conf->after_sb_2p) {
2286	case ASB_DISCARD_YOUNGER_PRI:
2287	case ASB_DISCARD_OLDER_PRI:
2288	case ASB_DISCARD_LEAST_CHG:
2289	case ASB_DISCARD_LOCAL:
2290	case ASB_DISCARD_REMOTE:
2291	case ASB_CONSENSUS:
2292	case ASB_DISCARD_SECONDARY:
2293		dev_err(DEV, "Configuration error.\n");
2294		break;
2295	case ASB_VIOLENTLY:
2296		rv = drbd_asb_recover_0p(mdev);
2297		break;
2298	case ASB_DISCONNECT:
2299		break;
2300	case ASB_CALL_HELPER:
2301		hg = drbd_asb_recover_0p(mdev);
2302		if (hg == -1) {
2303			enum drbd_state_rv rv2;
2304
2305			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2306			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2307			  * we do not need to wait for the after state change work either. */
2308			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2309			if (rv2 != SS_SUCCESS) {
2310				drbd_khelper(mdev, "pri-lost-after-sb");
2311			} else {
2312				dev_warn(DEV, "Successfully gave up primary role.\n");
2313				rv = hg;
2314			}
2315		} else
2316			rv = hg;
2317	}
2318
2319	return rv;
2320}
2321
2322static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2323			   u64 bits, u64 flags)
2324{
2325	if (!uuid) {
2326		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2327		return;
2328	}
2329	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2330	     text,
2331	     (unsigned long long)uuid[UI_CURRENT],
2332	     (unsigned long long)uuid[UI_BITMAP],
2333	     (unsigned long long)uuid[UI_HISTORY_START],
2334	     (unsigned long long)uuid[UI_HISTORY_END],
2335	     (unsigned long long)bits,
2336	     (unsigned long long)flags);
2337}
2338
2339/*
2340  100	after split brain try auto recover
2341    2	C_SYNC_SOURCE set BitMap
2342    1	C_SYNC_SOURCE use BitMap
2343    0	no Sync
2344   -1	C_SYNC_TARGET use BitMap
2345   -2	C_SYNC_TARGET set BitMap
2346 -100	after split brain, disconnect
2347-1000	unrelated data
2348-1091   requires proto 91
2349-1096   requires proto 96
2350 */
2351static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2352{
2353	u64 self, peer;
2354	int i, j;
2355
2356	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2357	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2358
2359	*rule_nr = 10;
2360	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2361		return 0;
2362
2363	*rule_nr = 20;
2364	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2365	     peer != UUID_JUST_CREATED)
2366		return -2;
2367
2368	*rule_nr = 30;
2369	if (self != UUID_JUST_CREATED &&
2370	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2371		return 2;
2372
2373	if (self == peer) {
2374		int rct, dc; /* roles at crash time */
2375
2376		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2377
2378			if (mdev->agreed_pro_version < 91)
2379				return -1091;
2380
2381			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2382			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2383				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2384				drbd_uuid_set_bm(mdev, 0UL);
2385
2386				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2387					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2388				*rule_nr = 34;
2389			} else {
2390				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2391				*rule_nr = 36;
2392			}
2393
2394			return 1;
2395		}
2396
2397		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2398
2399			if (mdev->agreed_pro_version < 91)
2400				return -1091;
2401
2402			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2403			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2404				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2405
2406				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2407				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2408				mdev->p_uuid[UI_BITMAP] = 0UL;
2409
2410				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2411				*rule_nr = 35;
2412			} else {
2413				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2414				*rule_nr = 37;
2415			}
2416
2417			return -1;
2418		}
2419
2420		/* Common power [off|failure] */
2421		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2422			(mdev->p_uuid[UI_FLAGS] & 2);
2423		/* lowest bit is set when we were primary,
2424		 * next bit (weight 2) is set when peer was primary */
2425		*rule_nr = 40;
2426
2427		switch (rct) {
2428		case 0: /* !self_pri && !peer_pri */ return 0;
2429		case 1: /*  self_pri && !peer_pri */ return 1;
2430		case 2: /* !self_pri &&  peer_pri */ return -1;
2431		case 3: /*  self_pri &&  peer_pri */
2432			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2433			return dc ? -1 : 1;
2434		}
2435	}
2436
2437	*rule_nr = 50;
2438	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2439	if (self == peer)
2440		return -1;
2441
2442	*rule_nr = 51;
2443	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2444	if (self == peer) {
2445		if (mdev->agreed_pro_version < 96 ?
2446		    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2447		    (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2448		    peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2449			/* The last P_SYNC_UUID did not get though. Undo the last start of
2450			   resync as sync source modifications of the peer's UUIDs. */
2451
2452			if (mdev->agreed_pro_version < 91)
2453				return -1091;
2454
2455			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2456			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2457
2458			dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2459			drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2460
2461			return -1;
2462		}
2463	}
2464
2465	*rule_nr = 60;
2466	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2467	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2468		peer = mdev->p_uuid[i] & ~((u64)1);
2469		if (self == peer)
2470			return -2;
2471	}
2472
2473	*rule_nr = 70;
2474	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2475	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2476	if (self == peer)
2477		return 1;
2478
2479	*rule_nr = 71;
2480	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2481	if (self == peer) {
2482		if (mdev->agreed_pro_version < 96 ?
2483		    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2484		    (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2485		    self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2486			/* The last P_SYNC_UUID did not get though. Undo the last start of
2487			   resync as sync source modifications of our UUIDs. */
2488
2489			if (mdev->agreed_pro_version < 91)
2490				return -1091;
2491
2492			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2493			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2494
2495			dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2496			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2497				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2498
2499			return 1;
2500		}
2501	}
2502
2503
2504	*rule_nr = 80;
2505	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2506	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2507		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2508		if (self == peer)
2509			return 2;
2510	}
2511
2512	*rule_nr = 90;
2513	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2514	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2515	if (self == peer && self != ((u64)0))
2516		return 100;
2517
2518	*rule_nr = 100;
2519	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2520		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2521		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2522			peer = mdev->p_uuid[j] & ~((u64)1);
2523			if (self == peer)
2524				return -100;
2525		}
2526	}
2527
2528	return -1000;
2529}
2530
2531/* drbd_sync_handshake() returns the new conn state on success, or
2532   CONN_MASK (-1) on failure.
2533 */
2534static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2535					   enum drbd_disk_state peer_disk) __must_hold(local)
2536{
2537	int hg, rule_nr;
2538	enum drbd_conns rv = C_MASK;
2539	enum drbd_disk_state mydisk;
2540
2541	mydisk = mdev->state.disk;
2542	if (mydisk == D_NEGOTIATING)
2543		mydisk = mdev->new_state_tmp.disk;
2544
2545	dev_info(DEV, "drbd_sync_handshake:\n");
2546	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2547	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2548		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2549
2550	hg = drbd_uuid_compare(mdev, &rule_nr);
2551
2552	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2553
2554	if (hg == -1000) {
2555		dev_alert(DEV, "Unrelated data, aborting!\n");
2556		return C_MASK;
2557	}
2558	if (hg < -1000) {
2559		dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2560		return C_MASK;
2561	}
2562
2563	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2564	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2565		int f = (hg == -100) || abs(hg) == 2;
2566		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2567		if (f)
2568			hg = hg*2;
2569		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2570		     hg > 0 ? "source" : "target");
2571	}
2572
2573	if (abs(hg) == 100)
2574		drbd_khelper(mdev, "initial-split-brain");
2575
2576	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2577		int pcount = (mdev->state.role == R_PRIMARY)
2578			   + (peer_role == R_PRIMARY);
2579		int forced = (hg == -100);
2580
2581		switch (pcount) {
2582		case 0:
2583			hg = drbd_asb_recover_0p(mdev);
2584			break;
2585		case 1:
2586			hg = drbd_asb_recover_1p(mdev);
2587			break;
2588		case 2:
2589			hg = drbd_asb_recover_2p(mdev);
2590			break;
2591		}
2592		if (abs(hg) < 100) {
2593			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2594			     "automatically solved. Sync from %s node\n",
2595			     pcount, (hg < 0) ? "peer" : "this");
2596			if (forced) {
2597				dev_warn(DEV, "Doing a full sync, since"
2598				     " UUIDs where ambiguous.\n");
2599				hg = hg*2;
2600			}
2601		}
2602	}
2603
2604	if (hg == -100) {
2605		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2606			hg = -1;
2607		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2608			hg = 1;
2609
2610		if (abs(hg) < 100)
2611			dev_warn(DEV, "Split-Brain detected, manually solved. "
2612			     "Sync from %s node\n",
2613			     (hg < 0) ? "peer" : "this");
2614	}
2615
2616	if (hg == -100) {
2617		/* FIXME this log message is not correct if we end up here
2618		 * after an attempted attach on a diskless node.
2619		 * We just refuse to attach -- well, we drop the "connection"
2620		 * to that disk, in a way... */
2621		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2622		drbd_khelper(mdev, "split-brain");
2623		return C_MASK;
2624	}
2625
2626	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2627		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2628		return C_MASK;
2629	}
2630
2631	if (hg < 0 && /* by intention we do not use mydisk here. */
2632	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2633		switch (mdev->net_conf->rr_conflict) {
2634		case ASB_CALL_HELPER:
2635			drbd_khelper(mdev, "pri-lost");
2636			/* fall through */
2637		case ASB_DISCONNECT:
2638			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2639			return C_MASK;
2640		case ASB_VIOLENTLY:
2641			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2642			     "assumption\n");
2643		}
2644	}
2645
2646	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2647		if (hg == 0)
2648			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2649		else
2650			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2651				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2652				 abs(hg) >= 2 ? "full" : "bit-map based");
2653		return C_MASK;
2654	}
2655
2656	if (abs(hg) >= 2) {
2657		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2658		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2659					BM_LOCKED_SET_ALLOWED))
2660			return C_MASK;
2661	}
2662
2663	if (hg > 0) { /* become sync source. */
2664		rv = C_WF_BITMAP_S;
2665	} else if (hg < 0) { /* become sync target */
2666		rv = C_WF_BITMAP_T;
2667	} else {
2668		rv = C_CONNECTED;
2669		if (drbd_bm_total_weight(mdev)) {
2670			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2671			     drbd_bm_total_weight(mdev));
2672		}
2673	}
2674
2675	return rv;
2676}
2677
2678/* returns 1 if invalid */
2679static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2680{
2681	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2682	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2683	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2684		return 0;
2685
2686	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2687	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2688	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2689		return 1;
2690
2691	/* everything else is valid if they are equal on both sides. */
2692	if (peer == self)
2693		return 0;
2694
2695	/* everything es is invalid. */
2696	return 1;
2697}
2698
2699static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2700{
2701	struct p_protocol *p = &mdev->data.rbuf.protocol;
2702	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2703	int p_want_lose, p_two_primaries, cf;
2704	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2705
2706	p_proto		= be32_to_cpu(p->protocol);
2707	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2708	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2709	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2710	p_two_primaries = be32_to_cpu(p->two_primaries);
2711	cf		= be32_to_cpu(p->conn_flags);
2712	p_want_lose = cf & CF_WANT_LOSE;
2713
2714	clear_bit(CONN_DRY_RUN, &mdev->flags);
2715
2716	if (cf & CF_DRY_RUN)
2717		set_bit(CONN_DRY_RUN, &mdev->flags);
2718
2719	if (p_proto != mdev->net_conf->wire_protocol) {
2720		dev_err(DEV, "incompatible communication protocols\n");
2721		goto disconnect;
2722	}
2723
2724	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2725		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2726		goto disconnect;
2727	}
2728
2729	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2730		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2731		goto disconnect;
2732	}
2733
2734	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2735		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2736		goto disconnect;
2737	}
2738
2739	if (p_want_lose && mdev->net_conf->want_lose) {
2740		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2741		goto disconnect;
2742	}
2743
2744	if (p_two_primaries != mdev->net_conf->two_primaries) {
2745		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2746		goto disconnect;
2747	}
2748
2749	if (mdev->agreed_pro_version >= 87) {
2750		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2751
2752		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2753			return false;
2754
2755		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2756		if (strcmp(p_integrity_alg, my_alg)) {
2757			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2758			goto disconnect;
2759		}
2760		dev_info(DEV, "data-integrity-alg: %s\n",
2761		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2762	}
2763
2764	return true;
2765
2766disconnect:
2767	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2768	return false;
2769}
2770
2771/* helper function
2772 * input: alg name, feature name
2773 * return: NULL (alg name was "")
2774 *         ERR_PTR(error) if something goes wrong
2775 *         or the crypto hash ptr, if it worked out ok. */
2776struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2777		const char *alg, const char *name)
2778{
2779	struct crypto_hash *tfm;
2780
2781	if (!alg[0])
2782		return NULL;
2783
2784	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2785	if (IS_ERR(tfm)) {
2786		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2787			alg, name, PTR_ERR(tfm));
2788		return tfm;
2789	}
2790	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2791		crypto_free_hash(tfm);
2792		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2793		return ERR_PTR(-EINVAL);
2794	}
2795	return tfm;
2796}
2797
2798static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2799{
2800	int ok = true;
2801	struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2802	unsigned int header_size, data_size, exp_max_sz;
2803	struct crypto_hash *verify_tfm = NULL;
2804	struct crypto_hash *csums_tfm = NULL;
2805	const int apv = mdev->agreed_pro_version;
2806	int *rs_plan_s = NULL;
2807	int fifo_size = 0;
2808
2809	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2810		    : apv == 88 ? sizeof(struct p_rs_param)
2811					+ SHARED_SECRET_MAX
2812		    : apv <= 94 ? sizeof(struct p_rs_param_89)
2813		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2814
2815	if (packet_size > exp_max_sz) {
2816		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2817		    packet_size, exp_max_sz);
2818		return false;
2819	}
2820
2821	if (apv <= 88) {
2822		header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2823		data_size   = packet_size  - header_size;
2824	} else if (apv <= 94) {
2825		header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2826		data_size   = packet_size  - header_size;
2827		D_ASSERT(data_size == 0);
2828	} else {
2829		header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2830		data_size   = packet_size  - header_size;
2831		D_ASSERT(data_size == 0);
2832	}
2833
2834	/* initialize verify_alg and csums_alg */
2835	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2836
2837	if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2838		return false;
2839
2840	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2841
2842	if (apv >= 88) {
2843		if (apv == 88) {
2844			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2845				dev_err(DEV, "verify-alg of wrong size, "
2846					"peer wants %u, accepting only up to %u byte\n",
2847					data_size, SHARED_SECRET_MAX);
2848				return false;
2849			}
2850
2851			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2852				return false;
2853
2854			/* we expect NUL terminated string */
2855			/* but just in case someone tries to be evil */
2856			D_ASSERT(p->verify_alg[data_size-1] == 0);
2857			p->verify_alg[data_size-1] = 0;
2858
2859		} else /* apv >= 89 */ {
2860			/* we still expect NUL terminated strings */
2861			/* but just in case someone tries to be evil */
2862			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2863			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2864			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2865			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2866		}
2867
2868		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2869			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2870				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2871				    mdev->sync_conf.verify_alg, p->verify_alg);
2872				goto disconnect;
2873			}
2874			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2875					p->verify_alg, "verify-alg");
2876			if (IS_ERR(verify_tfm)) {
2877				verify_tfm = NULL;
2878				goto disconnect;
2879			}
2880		}
2881
2882		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2883			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2884				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2885				    mdev->sync_conf.csums_alg, p->csums_alg);
2886				goto disconnect;
2887			}
2888			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2889					p->csums_alg, "csums-alg");
2890			if (IS_ERR(csums_tfm)) {
2891				csums_tfm = NULL;
2892				goto disconnect;
2893			}
2894		}
2895
2896		if (apv > 94) {
2897			mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2898			mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2899			mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2900			mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2901			mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2902
2903			fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2904			if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2905				rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2906				if (!rs_plan_s) {
2907					dev_err(DEV, "kmalloc of fifo_buffer failed");
2908					goto disconnect;
2909				}
2910			}
2911		}
2912
2913		spin_lock(&mdev->peer_seq_lock);
2914		/* lock against drbd_nl_syncer_conf() */
2915		if (verify_tfm) {
2916			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2917			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2918			crypto_free_hash(mdev->verify_tfm);
2919			mdev->verify_tfm = verify_tfm;
2920			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2921		}
2922		if (csums_tfm) {
2923			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2924			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2925			crypto_free_hash(mdev->csums_tfm);
2926			mdev->csums_tfm = csums_tfm;
2927			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2928		}
2929		if (fifo_size != mdev->rs_plan_s.size) {
2930			kfree(mdev->rs_plan_s.values);
2931			mdev->rs_plan_s.values = rs_plan_s;
2932			mdev->rs_plan_s.size   = fifo_size;
2933			mdev->rs_planed = 0;
2934		}
2935		spin_unlock(&mdev->peer_seq_lock);
2936	}
2937
2938	return ok;
2939disconnect:
2940	/* just for completeness: actually not needed,
2941	 * as this is not reached if csums_tfm was ok. */
2942	crypto_free_hash(csums_tfm);
2943	/* but free the verify_tfm again, if csums_tfm did not work out */
2944	crypto_free_hash(verify_tfm);
2945	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2946	return false;
2947}
2948
2949/* warn if the arguments differ by more than 12.5% */
2950static void warn_if_differ_considerably(struct drbd_conf *mdev,
2951	const char *s, sector_t a, sector_t b)
2952{
2953	sector_t d;
2954	if (a == 0 || b == 0)
2955		return;
2956	d = (a > b) ? (a - b) : (b - a);
2957	if (d > (a>>3) || d > (b>>3))
2958		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2959		     (unsigned long long)a, (unsigned long long)b);
2960}
2961
2962static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2963{
2964	struct p_sizes *p = &mdev->data.rbuf.sizes;
2965	enum determine_dev_size dd = unchanged;
2966	sector_t p_size, p_usize, my_usize;
2967	int ldsc = 0; /* local disk size changed */
2968	enum dds_flags ddsf;
2969
2970	p_size = be64_to_cpu(p->d_size);
2971	p_usize = be64_to_cpu(p->u_size);
2972
2973	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2974		dev_err(DEV, "some backing storage is needed\n");
2975		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2976		return false;
2977	}
2978
2979	/* just store the peer's disk size for now.
2980	 * we still need to figure out whether we accept that. */
2981	mdev->p_size = p_size;
2982
2983	if (get_ldev(mdev)) {
2984		warn_if_differ_considerably(mdev, "lower level device sizes",
2985			   p_size, drbd_get_max_capacity(mdev->ldev));
2986		warn_if_differ_considerably(mdev, "user requested size",
2987					    p_usize, mdev->ldev->dc.disk_size);
2988
2989		/* if this is the first connect, or an otherwise expected
2990		 * param exchange, choose the minimum */
2991		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2992			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2993					     p_usize);
2994
2995		my_usize = mdev->ldev->dc.disk_size;
2996
2997		if (mdev->ldev->dc.disk_size != p_usize) {
2998			mdev->ldev->dc.disk_size = p_usize;
2999			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3000			     (unsigned long)mdev->ldev->dc.disk_size);
3001		}
3002
3003		/* Never shrink a device with usable data during connect.
3004		   But allow online shrinking if we are connected. */
3005		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3006		   drbd_get_capacity(mdev->this_bdev) &&
3007		   mdev->state.disk >= D_OUTDATED &&
3008		   mdev->state.conn < C_CONNECTED) {
3009			dev_err(DEV, "The peer's disk size is too small!\n");
3010			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3011			mdev->ldev->dc.disk_size = my_usize;
3012			put_ldev(mdev);
3013			return false;
3014		}
3015		put_ldev(mdev);
3016	}
3017
3018	ddsf = be16_to_cpu(p->dds_flags);
3019	if (get_ldev(mdev)) {
3020		dd = drbd_determine_dev_size(mdev, ddsf);
3021		put_ldev(mdev);
3022		if (dd == dev_size_error)
3023			return false;
3024		drbd_md_sync(mdev);
3025	} else {
3026		/* I am diskless, need to accept the peer's size. */
3027		drbd_set_my_capacity(mdev, p_size);
3028	}
3029
3030	mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3031	drbd_reconsider_max_bio_size(mdev);
3032
3033	if (get_ldev(mdev)) {
3034		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3035			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3036			ldsc = 1;
3037		}
3038
3039		put_ldev(mdev);
3040	}
3041
3042	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3043		if (be64_to_cpu(p->c_size) !=
3044		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3045			/* we have different sizes, probably peer
3046			 * needs to know my new size... */
3047			drbd_send_sizes(mdev, 0, ddsf);
3048		}
3049		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3050		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3051			if (mdev->state.pdsk >= D_INCONSISTENT &&
3052			    mdev->state.disk >= D_INCONSISTENT) {
3053				if (ddsf & DDSF_NO_RESYNC)
3054					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3055				else
3056					resync_after_online_grow(mdev);
3057			} else
3058				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3059		}
3060	}
3061
3062	return true;
3063}
3064
3065static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3066{
3067	struct p_uuids *p = &mdev->data.rbuf.uuids;
3068	u64 *p_uuid;
3069	int i, updated_uuids = 0;
3070
3071	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3072
3073	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3074		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3075
3076	kfree(mdev->p_uuid);
3077	mdev->p_uuid = p_uuid;
3078
3079	if (mdev->state.conn < C_CONNECTED &&
3080	    mdev->state.disk < D_INCONSISTENT &&
3081	    mdev->state.role == R_PRIMARY &&
3082	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3083		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3084		    (unsigned long long)mdev->ed_uuid);
3085		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3086		return false;
3087	}
3088
3089	if (get_ldev(mdev)) {
3090		int skip_initial_sync =
3091			mdev->state.conn == C_CONNECTED &&
3092			mdev->agreed_pro_version >= 90 &&
3093			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3094			(p_uuid[UI_FLAGS] & 8);
3095		if (skip_initial_sync) {
3096			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3097			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3098					"clear_n_write from receive_uuids",
3099					BM_LOCKED_TEST_ALLOWED);
3100			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3101			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3102			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3103					CS_VERBOSE, NULL);
3104			drbd_md_sync(mdev);
3105			updated_uuids = 1;
3106		}
3107		put_ldev(mdev);
3108	} else if (mdev->state.disk < D_INCONSISTENT &&
3109		   mdev->state.role == R_PRIMARY) {
3110		/* I am a diskless primary, the peer just created a new current UUID
3111		   for me. */
3112		updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3113	}
3114
3115	/* Before we test for the disk state, we should wait until an eventually
3116	   ongoing cluster wide state change is finished. That is important if
3117	   we are primary and are detaching from our disk. We need to see the
3118	   new disk state... */
3119	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3120	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3121		updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3122
3123	if (updated_uuids)
3124		drbd_print_uuids(mdev, "receiver updated UUIDs to");
3125
3126	return true;
3127}
3128
3129/**
3130 * convert_state() - Converts the peer's view of the cluster state to our point of view
3131 * @ps:		The state as seen by the peer.
3132 */
3133static union drbd_state convert_state(union drbd_state ps)
3134{
3135	union drbd_state ms;
3136
3137	static enum drbd_conns c_tab[] = {
3138		[C_CONNECTED] = C_CONNECTED,
3139
3140		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3141		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3142		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3143		[C_VERIFY_S]       = C_VERIFY_T,
3144		[C_MASK]   = C_MASK,
3145	};
3146
3147	ms.i = ps.i;
3148
3149	ms.conn = c_tab[ps.conn];
3150	ms.peer = ps.role;
3151	ms.role = ps.peer;
3152	ms.pdsk = ps.disk;
3153	ms.disk = ps.pdsk;
3154	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3155
3156	return ms;
3157}
3158
3159static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3160{
3161	struct p_req_state *p = &mdev->data.rbuf.req_state;
3162	union drbd_state mask, val;
3163	enum drbd_state_rv rv;
3164
3165	mask.i = be32_to_cpu(p->mask);
3166	val.i = be32_to_cpu(p->val);
3167
3168	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3169	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3170		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3171		return true;
3172	}
3173
3174	mask = convert_state(mask);
3175	val = convert_state(val);
3176
3177	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3178
3179	drbd_send_sr_reply(mdev, rv);
3180	drbd_md_sync(mdev);
3181
3182	return true;
3183}
3184
3185static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3186{
3187	struct p_state *p = &mdev->data.rbuf.state;
3188	union drbd_state os, ns, peer_state;
3189	enum drbd_disk_state real_peer_disk;
3190	enum chg_state_flags cs_flags;
3191	int rv;
3192
3193	peer_state.i = be32_to_cpu(p->state);
3194
3195	real_peer_disk = peer_state.disk;
3196	if (peer_state.disk == D_NEGOTIATING) {
3197		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3198		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3199	}
3200
3201	spin_lock_irq(&mdev->req_lock);
3202 retry:
3203	os = ns = mdev->state;
3204	spin_unlock_irq(&mdev->req_lock);
3205
3206	/* If some other part of the code (asender thread, timeout)
3207	 * already decided to close the connection again,
3208	 * we must not "re-establish" it here. */
3209	if (os.conn <= C_TEAR_DOWN)
3210		return false;
3211
3212	/* If this is the "end of sync" confirmation, usually the peer disk
3213	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3214	 * set) resync started in PausedSyncT, or if the timing of pause-/
3215	 * unpause-sync events has been "just right", the peer disk may
3216	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3217	 */
3218	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3219	    real_peer_disk == D_UP_TO_DATE &&
3220	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3221		/* If we are (becoming) SyncSource, but peer is still in sync
3222		 * preparation, ignore its uptodate-ness to avoid flapping, it
3223		 * will change to inconsistent once the peer reaches active
3224		 * syncing states.
3225		 * It may have changed syncer-paused flags, however, so we
3226		 * cannot ignore this completely. */
3227		if (peer_state.conn > C_CONNECTED &&
3228		    peer_state.conn < C_SYNC_SOURCE)
3229			real_peer_disk = D_INCONSISTENT;
3230
3231		/* if peer_state changes to connected at the same time,
3232		 * it explicitly notifies us that it finished resync.
3233		 * Maybe we should finish it up, too? */
3234		else if (os.conn >= C_SYNC_SOURCE &&
3235			 peer_state.conn == C_CONNECTED) {
3236			if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3237				drbd_resync_finished(mdev);
3238			return true;
3239		}
3240	}
3241
3242	/* peer says his disk is inconsistent, while we think it is uptodate,
3243	 * and this happens while the peer still thinks we have a sync going on,
3244	 * but we think we are already done with the sync.
3245	 * We ignore this to avoid flapping pdsk.
3246	 * This should not happen, if the peer is a recent version of drbd. */
3247	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3248	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3249		real_peer_disk = D_UP_TO_DATE;
3250
3251	if (ns.conn == C_WF_REPORT_PARAMS)
3252		ns.conn = C_CONNECTED;
3253
3254	if (peer_state.conn == C_AHEAD)
3255		ns.conn = C_BEHIND;
3256
3257	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3258	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3259		int cr; /* consider resync */
3260
3261		/* if we established a new connection */
3262		cr  = (os.conn < C_CONNECTED);
3263		/* if we had an established connection
3264		 * and one of the nodes newly attaches a disk */
3265		cr |= (os.conn == C_CONNECTED &&
3266		       (peer_state.disk == D_NEGOTIATING ||
3267			os.disk == D_NEGOTIATING));
3268		/* if we have both been inconsistent, and the peer has been
3269		 * forced to be UpToDate with --overwrite-data */
3270		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3271		/* if we had been plain connected, and the admin requested to
3272		 * start a sync by "invalidate" or "invalidate-remote" */
3273		cr |= (os.conn == C_CONNECTED &&
3274				(peer_state.conn >= C_STARTING_SYNC_S &&
3275				 peer_state.conn <= C_WF_BITMAP_T));
3276
3277		if (cr)
3278			ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3279
3280		put_ldev(mdev);
3281		if (ns.conn == C_MASK) {
3282			ns.conn = C_CONNECTED;
3283			if (mdev->state.disk == D_NEGOTIATING) {
3284				drbd_force_state(mdev, NS(disk, D_FAILED));
3285			} else if (peer_state.disk == D_NEGOTIATING) {
3286				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3287				peer_state.disk = D_DISKLESS;
3288				real_peer_disk = D_DISKLESS;
3289			} else {
3290				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3291					return false;
3292				D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3293				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3294				return false;
3295			}
3296		}
3297	}
3298
3299	spin_lock_irq(&mdev->req_lock);
3300	if (mdev->state.i != os.i)
3301		goto retry;
3302	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3303	ns.peer = peer_state.role;
3304	ns.pdsk = real_peer_disk;
3305	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3306	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3307		ns.disk = mdev->new_state_tmp.disk;
3308	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3309	if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3310	    test_bit(NEW_CUR_UUID, &mdev->flags)) {
3311		/* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3312		   for temporal network outages! */
3313		spin_unlock_irq(&mdev->req_lock);
3314		dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3315		tl_clear(mdev);
3316		drbd_uuid_new_current(mdev);
3317		clear_bit(NEW_CUR_UUID, &mdev->flags);
3318		drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3319		return false;
3320	}
3321	rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3322	ns = mdev->state;
3323	spin_unlock_irq(&mdev->req_lock);
3324
3325	if (rv < SS_SUCCESS) {
3326		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3327		return false;
3328	}
3329
3330	if (os.conn > C_WF_REPORT_PARAMS) {
3331		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3332		    peer_state.disk != D_NEGOTIATING ) {
3333			/* we want resync, peer has not yet decided to sync... */
3334			/* Nowadays only used when forcing a node into primary role and
3335			   setting its disk to UpToDate with that */
3336			drbd_send_uuids(mdev);
3337			drbd_send_current_state(mdev);
3338		}
3339	}
3340
3341	mdev->net_conf->want_lose = 0;
3342
3343	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3344
3345	return true;
3346}
3347
3348static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3349{
3350	struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3351
3352	wait_event(mdev->misc_wait,
3353		   mdev->state.conn == C_WF_SYNC_UUID ||
3354		   mdev->state.conn == C_BEHIND ||
3355		   mdev->state.conn < C_CONNECTED ||
3356		   mdev->state.disk < D_NEGOTIATING);
3357
3358	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3359
3360	/* Here the _drbd_uuid_ functions are right, current should
3361	   _not_ be rotated into the history */
3362	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3363		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3364		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3365
3366		drbd_print_uuids(mdev, "updated sync uuid");
3367		drbd_start_resync(mdev, C_SYNC_TARGET);
3368
3369		put_ldev(mdev);
3370	} else
3371		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3372
3373	return true;
3374}
3375
3376/**
3377 * receive_bitmap_plain
3378 *
3379 * Return 0 when done, 1 when another iteration is needed, and a negative error
3380 * code upon failure.
3381 */
3382static int
3383receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3384		     unsigned long *buffer, struct bm_xfer_ctx *c)
3385{
3386	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3387	unsigned want = num_words * sizeof(long);
3388	int err;
3389
3390	if (want != data_size) {
3391		dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3392		return -EIO;
3393	}
3394	if (want == 0)
3395		return 0;
3396	err = drbd_recv(mdev, buffer, want);
3397	if (err != want) {
3398		if (err >= 0)
3399			err = -EIO;
3400		return err;
3401	}
3402
3403	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3404
3405	c->word_offset += num_words;
3406	c->bit_offset = c->word_offset * BITS_PER_LONG;
3407	if (c->bit_offset > c->bm_bits)
3408		c->bit_offset = c->bm_bits;
3409
3410	return 1;
3411}
3412
3413/**
3414 * recv_bm_rle_bits
3415 *
3416 * Return 0 when done, 1 when another iteration is needed, and a negative error
3417 * code upon failure.
3418 */
3419static int
3420recv_bm_rle_bits(struct drbd_conf *mdev,
3421		struct p_compressed_bm *p,
3422		struct bm_xfer_ctx *c)
3423{
3424	struct bitstream bs;
3425	u64 look_ahead;
3426	u64 rl;
3427	u64 tmp;
3428	unsigned long s = c->bit_offset;
3429	unsigned long e;
3430	int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3431	int toggle = DCBP_get_start(p);
3432	int have;
3433	int bits;
3434
3435	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3436
3437	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3438	if (bits < 0)
3439		return -EIO;
3440
3441	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3442		bits = vli_decode_bits(&rl, look_ahead);
3443		if (bits <= 0)
3444			return -EIO;
3445
3446		if (toggle) {
3447			e = s + rl -1;
3448			if (e >= c->bm_bits) {
3449				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3450				return -EIO;
3451			}
3452			_drbd_bm_set_bits(mdev, s, e);
3453		}
3454
3455		if (have < bits) {
3456			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3457				have, bits, look_ahead,
3458				(unsigned int)(bs.cur.b - p->code),
3459				(unsigned int)bs.buf_len);
3460			return -EIO;
3461		}
3462		look_ahead >>= bits;
3463		have -= bits;
3464
3465		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3466		if (bits < 0)
3467			return -EIO;
3468		look_ahead |= tmp << have;
3469		have += bits;
3470	}
3471
3472	c->bit_offset = s;
3473	bm_xfer_ctx_bit_to_word_offset(c);
3474
3475	return (s != c->bm_bits);
3476}
3477
3478/**
3479 * decode_bitmap_c
3480 *
3481 * Return 0 when done, 1 when another iteration is needed, and a negative error
3482 * code upon failure.
3483 */
3484static int
3485decode_bitmap_c(struct drbd_conf *mdev,
3486		struct p_compressed_bm *p,
3487		struct bm_xfer_ctx *c)
3488{
3489	if (DCBP_get_code(p) == RLE_VLI_Bits)
3490		return recv_bm_rle_bits(mdev, p, c);
3491
3492	/* other variants had been implemented for evaluation,
3493	 * but have been dropped as this one turned out to be "best"
3494	 * during all our tests. */
3495
3496	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3497	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3498	return -EIO;
3499}
3500
3501void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3502		const char *direction, struct bm_xfer_ctx *c)
3503{
3504	/* what would it take to transfer it "plaintext" */
3505	unsigned plain = sizeof(struct p_header80) *
3506		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3507		+ c->bm_words * sizeof(long);
3508	unsigned total = c->bytes[0] + c->bytes[1];
3509	unsigned r;
3510
3511	/* total can not be zero. but just in case: */
3512	if (total == 0)
3513		return;
3514
3515	/* don't report if not compressed */
3516	if (total >= plain)
3517		return;
3518
3519	/* total < plain. check for overflow, still */
3520	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3521		                    : (1000 * total / plain);
3522
3523	if (r > 1000)
3524		r = 1000;
3525
3526	r = 1000 - r;
3527	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3528	     "total %u; compression: %u.%u%%\n",
3529			direction,
3530			c->bytes[1], c->packets[1],
3531			c->bytes[0], c->packets[0],
3532			total, r/10, r % 10);
3533}
3534
3535/* Since we are processing the bitfield from lower addresses to higher,
3536   it does not matter if the process it in 32 bit chunks or 64 bit
3537   chunks as long as it is little endian. (Understand it as byte stream,
3538   beginning with the lowest byte...) If we would use big endian
3539   we would need to process it from the highest address to the lowest,
3540   in order to be agnostic to the 32 vs 64 bits issue.
3541
3542   returns 0 on failure, 1 if we successfully received it. */
3543static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3544{
3545	struct bm_xfer_ctx c;
3546	void *buffer;
3547	int err;
3548	int ok = false;
3549	struct p_header80 *h = &mdev->data.rbuf.header.h80;
3550
3551	drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3552	/* you are supposed to send additional out-of-sync information
3553	 * if you actually set bits during this phase */
3554
3555	/* maybe we should use some per thread scratch page,
3556	 * and allocate that during initial device creation? */
3557	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3558	if (!buffer) {
3559		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3560		goto out;
3561	}
3562
3563	c = (struct bm_xfer_ctx) {
3564		.bm_bits = drbd_bm_bits(mdev),
3565		.bm_words = drbd_bm_words(mdev),
3566	};
3567
3568	for(;;) {
3569		if (cmd == P_BITMAP) {
3570			err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3571		} else if (cmd == P_COMPRESSED_BITMAP) {
3572			/* MAYBE: sanity check that we speak proto >= 90,
3573			 * and the feature is enabled! */
3574			struct p_compressed_bm *p;
3575
3576			if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3577				dev_err(DEV, "ReportCBitmap packet too large\n");
3578				goto out;
3579			}
3580			/* use the page buff */
3581			p = buffer;
3582			memcpy(p, h, sizeof(*h));
3583			if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3584				goto out;
3585			if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3586				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3587				goto out;
3588			}
3589			err = decode_bitmap_c(mdev, p, &c);
3590		} else {
3591			dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3592			goto out;
3593		}
3594
3595		c.packets[cmd == P_BITMAP]++;
3596		c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3597
3598		if (err <= 0) {
3599			if (err < 0)
3600				goto out;
3601			break;
3602		}
3603		if (!drbd_recv_header(mdev, &cmd, &data_size))
3604			goto out;
3605	}
3606
3607	INFO_bm_xfer_stats(mdev, "receive", &c);
3608
3609	if (mdev->state.conn == C_WF_BITMAP_T) {
3610		enum drbd_state_rv rv;
3611
3612		ok = !drbd_send_bitmap(mdev);
3613		if (!ok)
3614			goto out;
3615		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3616		rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3617		D_ASSERT(rv == SS_SUCCESS);
3618	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3619		/* admin may have requested C_DISCONNECTING,
3620		 * other threads may have noticed network errors */
3621		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3622		    drbd_conn_str(mdev->state.conn));
3623	}
3624
3625	ok = true;
3626 out:
3627	drbd_bm_unlock(mdev);
3628	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3629		drbd_start_resync(mdev, C_SYNC_SOURCE);
3630	free_page((unsigned long) buffer);
3631	return ok;
3632}
3633
3634static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3635{
3636	/* TODO zero copy sink :) */
3637	static char sink[128];
3638	int size, want, r;
3639
3640	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3641		 cmd, data_size);
3642
3643	size = data_size;
3644	while (size > 0) {
3645		want = min_t(int, size, sizeof(sink));
3646		r = drbd_recv(mdev, sink, want);
3647		ERR_IF(r <= 0) break;
3648		size -= r;
3649	}
3650	return size == 0;
3651}
3652
3653static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3654{
3655	/* Make sure we've acked all the TCP data associated
3656	 * with the data requests being unplugged */
3657	drbd_tcp_quickack(mdev->data.socket);
3658
3659	return true;
3660}
3661
3662static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3663{
3664	struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3665
3666	switch (mdev->state.conn) {
3667	case C_WF_SYNC_UUID:
3668	case C_WF_BITMAP_T:
3669	case C_BEHIND:
3670			break;
3671	default:
3672		dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3673				drbd_conn_str(mdev->state.conn));
3674	}
3675
3676	drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3677
3678	return true;
3679}
3680
3681typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3682
3683struct data_cmd {
3684	int expect_payload;
3685	size_t pkt_size;
3686	drbd_cmd_handler_f function;
3687};
3688
3689static struct data_cmd drbd_cmd_handler[] = {
3690	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
3691	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
3692	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3693	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3694	[P_BITMAP]	    = { 1, sizeof(struct p_header80), receive_bitmap } ,
3695	[P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3696	[P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3697	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3698	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3699	[P_SYNC_PARAM]	    = { 1, sizeof(struct p_header80), receive_SyncParam },
3700	[P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3701	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3702	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
3703	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
3704	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
3705	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3706	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3707	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3708	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3709	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3710	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3711	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3712	/* anything missing from this table is in
3713	 * the asender_tbl, see get_asender_cmd */
3714	[P_MAX_CMD]	    = { 0, 0, NULL },
3715};
3716
3717/* All handler functions that expect a sub-header get that sub-heder in
3718   mdev->data.rbuf.header.head.payload.
3719
3720   Usually in mdev->data.rbuf.header.head the callback can find the usual
3721   p_header, but they may not rely on that. Since there is also p_header95 !
3722 */
3723
3724static void drbdd(struct drbd_conf *mdev)
3725{
3726	union p_header *header = &mdev->data.rbuf.header;
3727	unsigned int packet_size;
3728	enum drbd_packets cmd;
3729	size_t shs; /* sub header size */
3730	int rv;
3731
3732	while (get_t_state(&mdev->receiver) == Running) {
3733		drbd_thread_current_set_cpu(mdev);
3734		if (!drbd_recv_header(mdev, &cmd, &packet_size))
3735			goto err_out;
3736
3737		if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3738			dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3739			goto err_out;
3740		}
3741
3742		shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3743		if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3744			dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3745			goto err_out;
3746		}
3747
3748		if (shs) {
3749			rv = drbd_recv(mdev, &header->h80.payload, shs);
3750			if (unlikely(rv != shs)) {
3751				if (!signal_pending(current))
3752					dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3753				goto err_out;
3754			}
3755		}
3756
3757		rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3758
3759		if (unlikely(!rv)) {
3760			dev_err(DEV, "error receiving %s, l: %d!\n",
3761			    cmdname(cmd), packet_size);
3762			goto err_out;
3763		}
3764	}
3765
3766	if (0) {
3767	err_out:
3768		drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3769	}
3770	/* If we leave here, we probably want to update at least the
3771	 * "Connected" indicator on stable storage. Do so explicitly here. */
3772	drbd_md_sync(mdev);
3773}
3774
3775void drbd_flush_workqueue(struct drbd_conf *mdev)
3776{
3777	struct drbd_wq_barrier barr;
3778
3779	barr.w.cb = w_prev_work_done;
3780	init_completion(&barr.done);
3781	drbd_queue_work(&mdev->data.work, &barr.w);
3782	wait_for_completion(&barr.done);
3783}
3784
3785void drbd_free_tl_hash(struct drbd_conf *mdev)
3786{
3787	struct hlist_head *h;
3788
3789	spin_lock_irq(&mdev->req_lock);
3790
3791	if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3792		spin_unlock_irq(&mdev->req_lock);
3793		return;
3794	}
3795	/* paranoia code */
3796	for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3797		if (h->first)
3798			dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3799				(int)(h - mdev->ee_hash), h->first);
3800	kfree(mdev->ee_hash);
3801	mdev->ee_hash = NULL;
3802	mdev->ee_hash_s = 0;
3803
3804	/* paranoia code */
3805	for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3806		if (h->first)
3807			dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3808				(int)(h - mdev->tl_hash), h->first);
3809	kfree(mdev->tl_hash);
3810	mdev->tl_hash = NULL;
3811	mdev->tl_hash_s = 0;
3812	spin_unlock_irq(&mdev->req_lock);
3813}
3814
3815static void drbd_disconnect(struct drbd_conf *mdev)
3816{
3817	enum drbd_fencing_p fp;
3818	union drbd_state os, ns;
3819	int rv = SS_UNKNOWN_ERROR;
3820	unsigned int i;
3821
3822	if (mdev->state.conn == C_STANDALONE)
3823		return;
3824
3825	/* We are about to start the cleanup after connection loss.
3826	 * Make sure drbd_make_request knows about that.
3827	 * Usually we should be in some network failure state already,
3828	 * but just in case we are not, we fix it up here.
3829	 */
3830	drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
3831
3832	/* asender does not clean up anything. it must not interfere, either */
3833	drbd_thread_stop(&mdev->asender);
3834	drbd_free_sock(mdev);
3835
3836	/* wait for current activity to cease. */
3837	spin_lock_irq(&mdev->req_lock);
3838	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3839	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3840	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3841	spin_unlock_irq(&mdev->req_lock);
3842
3843	/* We do not have data structures that would allow us to
3844	 * get the rs_pending_cnt down to 0 again.
3845	 *  * On C_SYNC_TARGET we do not have any data structures describing
3846	 *    the pending RSDataRequest's we have sent.
3847	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3848	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3849	 *  And no, it is not the sum of the reference counts in the
3850	 *  resync_LRU. The resync_LRU tracks the whole operation including
3851	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3852	 *  on the fly. */
3853	drbd_rs_cancel_all(mdev);
3854	mdev->rs_total = 0;
3855	mdev->rs_failed = 0;
3856	atomic_set(&mdev->rs_pending_cnt, 0);
3857	wake_up(&mdev->misc_wait);
3858
3859	/* make sure syncer is stopped and w_resume_next_sg queued */
3860	del_timer_sync(&mdev->resync_timer);
3861	resync_timer_fn((unsigned long)mdev);
3862
3863	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3864	 * w_make_resync_request etc. which may still be on the worker queue
3865	 * to be "canceled" */
3866	drbd_flush_workqueue(mdev);
3867
3868	/* This also does reclaim_net_ee().  If we do this too early, we might
3869	 * miss some resync ee and pages.*/
3870	drbd_process_done_ee(mdev);
3871
3872	kfree(mdev->p_uuid);
3873	mdev->p_uuid = NULL;
3874
3875	if (!is_susp(mdev->state))
3876		tl_clear(mdev);
3877
3878	dev_info(DEV, "Connection closed\n");
3879
3880	drbd_md_sync(mdev);
3881
3882	fp = FP_DONT_CARE;
3883	if (get_ldev(mdev)) {
3884		fp = mdev->ldev->dc.fencing;
3885		put_ldev(mdev);
3886	}
3887
3888	if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3889		drbd_try_outdate_peer_async(mdev);
3890
3891	spin_lock_irq(&mdev->req_lock);
3892	os = mdev->state;
3893	if (os.conn >= C_UNCONNECTED) {
3894		/* Do not restart in case we are C_DISCONNECTING */
3895		ns = os;
3896		ns.conn = C_UNCONNECTED;
3897		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3898	}
3899	spin_unlock_irq(&mdev->req_lock);
3900
3901	if (os.conn == C_DISCONNECTING) {
3902		wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3903
3904		crypto_free_hash(mdev->cram_hmac_tfm);
3905		mdev->cram_hmac_tfm = NULL;
3906
3907		kfree(mdev->net_conf);
3908		mdev->net_conf = NULL;
3909		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3910	}
3911
3912	/* serialize with bitmap writeout triggered by the state change,
3913	 * if any. */
3914	wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3915
3916	/* tcp_close and release of sendpage pages can be deferred.  I don't
3917	 * want to use SO_LINGER, because apparently it can be deferred for
3918	 * more than 20 seconds (longest time I checked).
3919	 *
3920	 * Actually we don't care for exactly when the network stack does its
3921	 * put_page(), but release our reference on these pages right here.
3922	 */
3923	i = drbd_release_ee(mdev, &mdev->net_ee);
3924	if (i)
3925		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3926	i = atomic_read(&mdev->pp_in_use_by_net);
3927	if (i)
3928		dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3929	i = atomic_read(&mdev->pp_in_use);
3930	if (i)
3931		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3932
3933	D_ASSERT(list_empty(&mdev->read_ee));
3934	D_ASSERT(list_empty(&mdev->active_ee));
3935	D_ASSERT(list_empty(&mdev->sync_ee));
3936	D_ASSERT(list_empty(&mdev->done_ee));
3937
3938	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3939	atomic_set(&mdev->current_epoch->epoch_size, 0);
3940	D_ASSERT(list_empty(&mdev->current_epoch->list));
3941}
3942
3943/*
3944 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3945 * we can agree on is stored in agreed_pro_version.
3946 *
3947 * feature flags and the reserved array should be enough room for future
3948 * enhancements of the handshake protocol, and possible plugins...
3949 *
3950 * for now, they are expected to be zero, but ignored.
3951 */
3952static int drbd_send_handshake(struct drbd_conf *mdev)
3953{
3954	/* ASSERT current == mdev->receiver ... */
3955	struct p_handshake *p = &mdev->data.sbuf.handshake;
3956	int ok;
3957
3958	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3959		dev_err(DEV, "interrupted during initial handshake\n");
3960		return 0; /* interrupted. not ok. */
3961	}
3962
3963	if (mdev->data.socket == NULL) {
3964		mutex_unlock(&mdev->data.mutex);
3965		return 0;
3966	}
3967
3968	memset(p, 0, sizeof(*p));
3969	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3970	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3971	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3972			     (struct p_header80 *)p, sizeof(*p), 0 );
3973	mutex_unlock(&mdev->data.mutex);
3974	return ok;
3975}
3976
3977/*
3978 * return values:
3979 *   1 yes, we have a valid connection
3980 *   0 oops, did not work out, please try again
3981 *  -1 peer talks different language,
3982 *     no point in trying again, please go standalone.
3983 */
3984static int drbd_do_handshake(struct drbd_conf *mdev)
3985{
3986	/* ASSERT current == mdev->receiver ... */
3987	struct p_handshake *p = &mdev->data.rbuf.handshake;
3988	const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3989	unsigned int length;
3990	enum drbd_packets cmd;
3991	int rv;
3992
3993	rv = drbd_send_handshake(mdev);
3994	if (!rv)
3995		return 0;
3996
3997	rv = drbd_recv_header(mdev, &cmd, &length);
3998	if (!rv)
3999		return 0;
4000
4001	if (cmd != P_HAND_SHAKE) {
4002		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4003		     cmdname(cmd), cmd);
4004		return -1;
4005	}
4006
4007	if (length != expect) {
4008		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4009		     expect, length);
4010		return -1;
4011	}
4012
4013	rv = drbd_recv(mdev, &p->head.payload, expect);
4014
4015	if (rv != expect) {
4016		if (!signal_pending(current))
4017			dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4018		return 0;
4019	}
4020
4021	p->protocol_min = be32_to_cpu(p->protocol_min);
4022	p->protocol_max = be32_to_cpu(p->protocol_max);
4023	if (p->protocol_max == 0)
4024		p->protocol_max = p->protocol_min;
4025
4026	if (PRO_VERSION_MAX < p->protocol_min ||
4027	    PRO_VERSION_MIN > p->protocol_max)
4028		goto incompat;
4029
4030	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4031
4032	dev_info(DEV, "Handshake successful: "
4033	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4034
4035	return 1;
4036
4037 incompat:
4038	dev_err(DEV, "incompatible DRBD dialects: "
4039	    "I support %d-%d, peer supports %d-%d\n",
4040	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4041	    p->protocol_min, p->protocol_max);
4042	return -1;
4043}
4044
4045#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4046static int drbd_do_auth(struct drbd_conf *mdev)
4047{
4048	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4049	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4050	return -1;
4051}
4052#else
4053#define CHALLENGE_LEN 64
4054
4055/* Return value:
4056	1 - auth succeeded,
4057	0 - failed, try again (network error),
4058	-1 - auth failed, don't try again.
4059*/
4060
4061static int drbd_do_auth(struct drbd_conf *mdev)
4062{
4063	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4064	struct scatterlist sg;
4065	char *response = NULL;
4066	char *right_response = NULL;
4067	char *peers_ch = NULL;
4068	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4069	unsigned int resp_size;
4070	struct hash_desc desc;
4071	enum drbd_packets cmd;
4072	unsigned int length;
4073	int rv;
4074
4075	desc.tfm = mdev->cram_hmac_tfm;
4076	desc.flags = 0;
4077
4078	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4079				(u8 *)mdev->net_conf->shared_secret, key_len);
4080	if (rv) {
4081		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4082		rv = -1;
4083		goto fail;
4084	}
4085
4086	get_random_bytes(my_challenge, CHALLENGE_LEN);
4087
4088	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4089	if (!rv)
4090		goto fail;
4091
4092	rv = drbd_recv_header(mdev, &cmd, &length);
4093	if (!rv)
4094		goto fail;
4095
4096	if (cmd != P_AUTH_CHALLENGE) {
4097		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4098		    cmdname(cmd), cmd);
4099		rv = 0;
4100		goto fail;
4101	}
4102
4103	if (length > CHALLENGE_LEN * 2) {
4104		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4105		rv = -1;
4106		goto fail;
4107	}
4108
4109	peers_ch = kmalloc(length, GFP_NOIO);
4110	if (peers_ch == NULL) {
4111		dev_err(DEV, "kmalloc of peers_ch failed\n");
4112		rv = -1;
4113		goto fail;
4114	}
4115
4116	rv = drbd_recv(mdev, peers_ch, length);
4117
4118	if (rv != length) {
4119		if (!signal_pending(current))
4120			dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4121		rv = 0;
4122		goto fail;
4123	}
4124
4125	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4126	response = kmalloc(resp_size, GFP_NOIO);
4127	if (response == NULL) {
4128		dev_err(DEV, "kmalloc of response failed\n");
4129		rv = -1;
4130		goto fail;
4131	}
4132
4133	sg_init_table(&sg, 1);
4134	sg_set_buf(&sg, peers_ch, length);
4135
4136	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4137	if (rv) {
4138		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4139		rv = -1;
4140		goto fail;
4141	}
4142
4143	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4144	if (!rv)
4145		goto fail;
4146
4147	rv = drbd_recv_header(mdev, &cmd, &length);
4148	if (!rv)
4149		goto fail;
4150
4151	if (cmd != P_AUTH_RESPONSE) {
4152		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4153			cmdname(cmd), cmd);
4154		rv = 0;
4155		goto fail;
4156	}
4157
4158	if (length != resp_size) {
4159		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4160		rv = 0;
4161		goto fail;
4162	}
4163
4164	rv = drbd_recv(mdev, response , resp_size);
4165
4166	if (rv != resp_size) {
4167		if (!signal_pending(current))
4168			dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4169		rv = 0;
4170		goto fail;
4171	}
4172
4173	right_response = kmalloc(resp_size, GFP_NOIO);
4174	if (right_response == NULL) {
4175		dev_err(DEV, "kmalloc of right_response failed\n");
4176		rv = -1;
4177		goto fail;
4178	}
4179
4180	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4181
4182	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4183	if (rv) {
4184		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4185		rv = -1;
4186		goto fail;
4187	}
4188
4189	rv = !memcmp(response, right_response, resp_size);
4190
4191	if (rv)
4192		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4193		     resp_size, mdev->net_conf->cram_hmac_alg);
4194	else
4195		rv = -1;
4196
4197 fail:
4198	kfree(peers_ch);
4199	kfree(response);
4200	kfree(right_response);
4201
4202	return rv;
4203}
4204#endif
4205
4206int drbdd_init(struct drbd_thread *thi)
4207{
4208	struct drbd_conf *mdev = thi->mdev;
4209	unsigned int minor = mdev_to_minor(mdev);
4210	int h;
4211
4212	sprintf(current->comm, "drbd%d_receiver", minor);
4213
4214	dev_info(DEV, "receiver (re)started\n");
4215
4216	do {
4217		h = drbd_connect(mdev);
4218		if (h == 0) {
4219			drbd_disconnect(mdev);
4220			schedule_timeout_interruptible(HZ);
4221		}
4222		if (h == -1) {
4223			dev_warn(DEV, "Discarding network configuration.\n");
4224			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4225		}
4226	} while (h == 0);
4227
4228	if (h > 0) {
4229		if (get_net_conf(mdev)) {
4230			drbdd(mdev);
4231			put_net_conf(mdev);
4232		}
4233	}
4234
4235	drbd_disconnect(mdev);
4236
4237	dev_info(DEV, "receiver terminated\n");
4238	return 0;
4239}
4240
4241/* ********* acknowledge sender ******** */
4242
4243static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4244{
4245	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4246
4247	int retcode = be32_to_cpu(p->retcode);
4248
4249	if (retcode >= SS_SUCCESS) {
4250		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4251	} else {
4252		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4253		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4254		    drbd_set_st_err_str(retcode), retcode);
4255	}
4256	wake_up(&mdev->state_wait);
4257
4258	return true;
4259}
4260
4261static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4262{
4263	return drbd_send_ping_ack(mdev);
4264
4265}
4266
4267static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4268{
4269	/* restore idle timeout */
4270	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4271	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4272		wake_up(&mdev->misc_wait);
4273
4274	return true;
4275}
4276
4277static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4278{
4279	struct p_block_ack *p = (struct p_block_ack *)h;
4280	sector_t sector = be64_to_cpu(p->sector);
4281	int blksize = be32_to_cpu(p->blksize);
4282
4283	D_ASSERT(mdev->agreed_pro_version >= 89);
4284
4285	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4286
4287	if (get_ldev(mdev)) {
4288		drbd_rs_complete_io(mdev, sector);
4289		drbd_set_in_sync(mdev, sector, blksize);
4290		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4291		mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4292		put_ldev(mdev);
4293	}
4294	dec_rs_pending(mdev);
4295	atomic_add(blksize >> 9, &mdev->rs_sect_in);
4296
4297	return true;
4298}
4299
4300/* when we receive the ACK for a write request,
4301 * verify that we actually know about it */
4302static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4303	u64 id, sector_t sector)
4304{
4305	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4306	struct hlist_node *n;
4307	struct drbd_request *req;
4308
4309	hlist_for_each_entry(req, n, slot, collision) {
4310		if ((unsigned long)req == (unsigned long)id) {
4311			if (req->sector != sector) {
4312				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4313				    "wrong sector (%llus versus %llus)\n", req,
4314				    (unsigned long long)req->sector,
4315				    (unsigned long long)sector);
4316				break;
4317			}
4318			return req;
4319		}
4320	}
4321	return NULL;
4322}
4323
4324typedef struct drbd_request *(req_validator_fn)
4325	(struct drbd_conf *mdev, u64 id, sector_t sector);
4326
4327static int validate_req_change_req_state(struct drbd_conf *mdev,
4328	u64 id, sector_t sector, req_validator_fn validator,
4329	const char *func, enum drbd_req_event what)
4330{
4331	struct drbd_request *req;
4332	struct bio_and_error m;
4333
4334	spin_lock_irq(&mdev->req_lock);
4335	req = validator(mdev, id, sector);
4336	if (unlikely(!req)) {
4337		spin_unlock_irq(&mdev->req_lock);
4338
4339		dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4340			(void *)(unsigned long)id, (unsigned long long)sector);
4341		return false;
4342	}
4343	__req_mod(req, what, &m);
4344	spin_unlock_irq(&mdev->req_lock);
4345
4346	if (m.bio)
4347		complete_master_bio(mdev, &m);
4348	return true;
4349}
4350
4351static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4352{
4353	struct p_block_ack *p = (struct p_block_ack *)h;
4354	sector_t sector = be64_to_cpu(p->sector);
4355	int blksize = be32_to_cpu(p->blksize);
4356	enum drbd_req_event what;
4357
4358	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4359
4360	if (is_syncer_block_id(p->block_id)) {
4361		drbd_set_in_sync(mdev, sector, blksize);
4362		dec_rs_pending(mdev);
4363		return true;
4364	}
4365	switch (be16_to_cpu(h->command)) {
4366	case P_RS_WRITE_ACK:
4367		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4368		what = write_acked_by_peer_and_sis;
4369		break;
4370	case P_WRITE_ACK:
4371		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4372		what = write_acked_by_peer;
4373		break;
4374	case P_RECV_ACK:
4375		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4376		what = recv_acked_by_peer;
4377		break;
4378	case P_DISCARD_ACK:
4379		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4380		what = conflict_discarded_by_peer;
4381		break;
4382	default:
4383		D_ASSERT(0);
4384		return false;
4385	}
4386
4387	return validate_req_change_req_state(mdev, p->block_id, sector,
4388		_ack_id_to_req, __func__ , what);
4389}
4390
4391static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4392{
4393	struct p_block_ack *p = (struct p_block_ack *)h;
4394	sector_t sector = be64_to_cpu(p->sector);
4395	int size = be32_to_cpu(p->blksize);
4396	struct drbd_request *req;
4397	struct bio_and_error m;
4398
4399	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4400
4401	if (is_syncer_block_id(p->block_id)) {
4402		dec_rs_pending(mdev);
4403		drbd_rs_failed_io(mdev, sector, size);
4404		return true;
4405	}
4406
4407	spin_lock_irq(&mdev->req_lock);
4408	req = _ack_id_to_req(mdev, p->block_id, sector);
4409	if (!req) {
4410		spin_unlock_irq(&mdev->req_lock);
4411		if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4412		    mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4413			/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4414			   The master bio might already be completed, therefore the
4415			   request is no longer in the collision hash.
4416			   => Do not try to validate block_id as request. */
4417			/* In Protocol B we might already have got a P_RECV_ACK
4418			   but then get a P_NEG_ACK after wards. */
4419			drbd_set_out_of_sync(mdev, sector, size);
4420			return true;
4421		} else {
4422			dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4423				(void *)(unsigned long)p->block_id, (unsigned long long)sector);
4424			return false;
4425		}
4426	}
4427	__req_mod(req, neg_acked, &m);
4428	spin_unlock_irq(&mdev->req_lock);
4429
4430	if (m.bio)
4431		complete_master_bio(mdev, &m);
4432	return true;
4433}
4434
4435static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4436{
4437	struct p_block_ack *p = (struct p_block_ack *)h;
4438	sector_t sector = be64_to_cpu(p->sector);
4439
4440	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4441	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4442	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4443
4444	return validate_req_change_req_state(mdev, p->block_id, sector,
4445		_ar_id_to_req, __func__ , neg_acked);
4446}
4447
4448static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4449{
4450	sector_t sector;
4451	int size;
4452	struct p_block_ack *p = (struct p_block_ack *)h;
4453
4454	sector = be64_to_cpu(p->sector);
4455	size = be32_to_cpu(p->blksize);
4456
4457	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4458
4459	dec_rs_pending(mdev);
4460
4461	if (get_ldev_if_state(mdev, D_FAILED)) {
4462		drbd_rs_complete_io(mdev, sector);
4463		switch (be16_to_cpu(h->command)) {
4464		case P_NEG_RS_DREPLY:
4465			drbd_rs_failed_io(mdev, sector, size);
4466		case P_RS_CANCEL:
4467			break;
4468		default:
4469			D_ASSERT(0);
4470			put_ldev(mdev);
4471			return false;
4472		}
4473		put_ldev(mdev);
4474	}
4475
4476	return true;
4477}
4478
4479static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4480{
4481	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4482
4483	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4484
4485	if (mdev->state.conn == C_AHEAD &&
4486	    atomic_read(&mdev->ap_in_flight) == 0 &&
4487	    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
4488		mdev->start_resync_timer.expires = jiffies + HZ;
4489		add_timer(&mdev->start_resync_timer);
4490	}
4491
4492	return true;
4493}
4494
4495static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4496{
4497	struct p_block_ack *p = (struct p_block_ack *)h;
4498	struct drbd_work *w;
4499	sector_t sector;
4500	int size;
4501
4502	sector = be64_to_cpu(p->sector);
4503	size = be32_to_cpu(p->blksize);
4504
4505	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4506
4507	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4508		drbd_ov_oos_found(mdev, sector, size);
4509	else
4510		ov_oos_print(mdev);
4511
4512	if (!get_ldev(mdev))
4513		return true;
4514
4515	drbd_rs_complete_io(mdev, sector);
4516	dec_rs_pending(mdev);
4517
4518	--mdev->ov_left;
4519
4520	/* let's advance progress step marks only for every other megabyte */
4521	if ((mdev->ov_left & 0x200) == 0x200)
4522		drbd_advance_rs_marks(mdev, mdev->ov_left);
4523
4524	if (mdev->ov_left == 0) {
4525		w = kmalloc(sizeof(*w), GFP_NOIO);
4526		if (w) {
4527			w->cb = w_ov_finished;
4528			drbd_queue_work_front(&mdev->data.work, w);
4529		} else {
4530			dev_err(DEV, "kmalloc(w) failed.");
4531			ov_oos_print(mdev);
4532			drbd_resync_finished(mdev);
4533		}
4534	}
4535	put_ldev(mdev);
4536	return true;
4537}
4538
4539static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4540{
4541	return true;
4542}
4543
4544struct asender_cmd {
4545	size_t pkt_size;
4546	int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4547};
4548
4549static struct asender_cmd *get_asender_cmd(int cmd)
4550{
4551	static struct asender_cmd asender_tbl[] = {
4552		/* anything missing from this table is in
4553		 * the drbd_cmd_handler (drbd_default_handler) table,
4554		 * see the beginning of drbdd() */
4555	[P_PING]	    = { sizeof(struct p_header80), got_Ping },
4556	[P_PING_ACK]	    = { sizeof(struct p_header80), got_PingAck },
4557	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4558	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4559	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4560	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4561	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4562	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4563	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4564	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4565	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4566	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4567	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4568	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4569	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4570	[P_MAX_CMD]	    = { 0, NULL },
4571	};
4572	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4573		return NULL;
4574	return &asender_tbl[cmd];
4575}
4576
4577int drbd_asender(struct drbd_thread *thi)
4578{
4579	struct drbd_conf *mdev = thi->mdev;
4580	struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4581	struct asender_cmd *cmd = NULL;
4582
4583	int rv, len;
4584	void *buf    = h;
4585	int received = 0;
4586	int expect   = sizeof(struct p_header80);
4587	int empty;
4588	int ping_timeout_active = 0;
4589
4590	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4591
4592	current->policy = SCHED_RR;  /* Make this a realtime task! */
4593	current->rt_priority = 2;    /* more important than all other tasks */
4594
4595	while (get_t_state(thi) == Running) {
4596		drbd_thread_current_set_cpu(mdev);
4597		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4598			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4599			mdev->meta.socket->sk->sk_rcvtimeo =
4600				mdev->net_conf->ping_timeo*HZ/10;
4601			ping_timeout_active = 1;
4602		}
4603
4604		/* conditionally cork;
4605		 * it may hurt latency if we cork without much to send */
4606		if (!mdev->net_conf->no_cork &&
4607			3 < atomic_read(&mdev->unacked_cnt))
4608			drbd_tcp_cork(mdev->meta.socket);
4609		while (1) {
4610			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4611			flush_signals(current);
4612			if (!drbd_process_done_ee(mdev))
4613				goto reconnect;
4614			/* to avoid race with newly queued ACKs */
4615			set_bit(SIGNAL_ASENDER, &mdev->flags);
4616			spin_lock_irq(&mdev->req_lock);
4617			empty = list_empty(&mdev->done_ee);
4618			spin_unlock_irq(&mdev->req_lock);
4619			/* new ack may have been queued right here,
4620			 * but then there is also a signal pending,
4621			 * and we start over... */
4622			if (empty)
4623				break;
4624		}
4625		/* but unconditionally uncork unless disabled */
4626		if (!mdev->net_conf->no_cork)
4627			drbd_tcp_uncork(mdev->meta.socket);
4628
4629		/* short circuit, recv_msg would return EINTR anyways. */
4630		if (signal_pending(current))
4631			continue;
4632
4633		rv = drbd_recv_short(mdev, mdev->meta.socket,
4634				     buf, expect-received, 0);
4635		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4636
4637		flush_signals(current);
4638
4639		/* Note:
4640		 * -EINTR	 (on meta) we got a signal
4641		 * -EAGAIN	 (on meta) rcvtimeo expired
4642		 * -ECONNRESET	 other side closed the connection
4643		 * -ERESTARTSYS  (on data) we got a signal
4644		 * rv <  0	 other than above: unexpected error!
4645		 * rv == expected: full header or command
4646		 * rv <  expected: "woken" by signal during receive
4647		 * rv == 0	 : "connection shut down by peer"
4648		 */
4649		if (likely(rv > 0)) {
4650			received += rv;
4651			buf	 += rv;
4652		} else if (rv == 0) {
4653			dev_err(DEV, "meta connection shut down by peer.\n");
4654			goto reconnect;
4655		} else if (rv == -EAGAIN) {
4656			/* If the data socket received something meanwhile,
4657			 * that is good enough: peer is still alive. */
4658			if (time_after(mdev->last_received,
4659				jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4660				continue;
4661			if (ping_timeout_active) {
4662				dev_err(DEV, "PingAck did not arrive in time.\n");
4663				goto reconnect;
4664			}
4665			set_bit(SEND_PING, &mdev->flags);
4666			continue;
4667		} else if (rv == -EINTR) {
4668			continue;
4669		} else {
4670			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4671			goto reconnect;
4672		}
4673
4674		if (received == expect && cmd == NULL) {
4675			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4676				dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4677				    be32_to_cpu(h->magic),
4678				    be16_to_cpu(h->command),
4679				    be16_to_cpu(h->length));
4680				goto reconnect;
4681			}
4682			cmd = get_asender_cmd(be16_to_cpu(h->command));
4683			len = be16_to_cpu(h->length);
4684			if (unlikely(cmd == NULL)) {
4685				dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4686				    be32_to_cpu(h->magic),
4687				    be16_to_cpu(h->command),
4688				    be16_to_cpu(h->length));
4689				goto disconnect;
4690			}
4691			expect = cmd->pkt_size;
4692			ERR_IF(len != expect-sizeof(struct p_header80))
4693				goto reconnect;
4694		}
4695		if (received == expect) {
4696			mdev->last_received = jiffies;
4697			D_ASSERT(cmd != NULL);
4698			if (!cmd->process(mdev, h))
4699				goto reconnect;
4700
4701			/* the idle_timeout (ping-int)
4702			 * has been restored in got_PingAck() */
4703			if (cmd == get_asender_cmd(P_PING_ACK))
4704				ping_timeout_active = 0;
4705
4706			buf	 = h;
4707			received = 0;
4708			expect	 = sizeof(struct p_header80);
4709			cmd	 = NULL;
4710		}
4711	}
4712
4713	if (0) {
4714reconnect:
4715		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4716		drbd_md_sync(mdev);
4717	}
4718	if (0) {
4719disconnect:
4720		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4721		drbd_md_sync(mdev);
4722	}
4723	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4724
4725	D_ASSERT(mdev->state.conn < C_CONNECTED);
4726	dev_info(DEV, "asender terminated\n");
4727
4728	return 0;
4729}