Linux Audio

Check our new training course

Loading...
v3.1
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24 */
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
 
  39#include "drbd_req.h"
  40
  41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
  42static int w_make_resync_request(struct drbd_conf *mdev,
  43				 struct drbd_work *w, int cancel);
  44
  45
  46
  47/* endio handlers:
  48 *   drbd_md_io_complete (defined here)
  49 *   drbd_endio_pri (defined here)
  50 *   drbd_endio_sec (defined here)
  51 *   bm_async_io_complete (defined in drbd_bitmap.c)
  52 *
  53 * For all these callbacks, note the following:
  54 * The callbacks will be called in irq context by the IDE drivers,
  55 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  56 * Try to get the locking right :)
  57 *
  58 */
  59
  60
  61/* About the global_state_lock
  62   Each state transition on an device holds a read lock. In case we have
  63   to evaluate the sync after dependencies, we grab a write lock, because
  64   we need stable states on all devices for that.  */
  65rwlock_t global_state_lock;
  66
  67/* used for synchronous meta data and bitmap IO
  68 * submitted by drbd_md_sync_page_io()
  69 */
  70void drbd_md_io_complete(struct bio *bio, int error)
  71{
  72	struct drbd_md_io *md_io;
 
  73
  74	md_io = (struct drbd_md_io *)bio->bi_private;
 
 
  75	md_io->error = error;
  76
  77	complete(&md_io->event);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  78}
  79
  80/* reads on behalf of the partner,
  81 * "submitted" by the receiver
  82 */
  83void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
  84{
  85	unsigned long flags = 0;
  86	struct drbd_conf *mdev = e->mdev;
  87
  88	D_ASSERT(e->block_id != ID_VACANT);
  89
  90	spin_lock_irqsave(&mdev->req_lock, flags);
  91	mdev->read_cnt += e->size >> 9;
  92	list_del(&e->w.list);
  93	if (list_empty(&mdev->read_ee))
  94		wake_up(&mdev->ee_wait);
  95	if (test_bit(__EE_WAS_ERROR, &e->flags))
  96		__drbd_chk_io_error(mdev, false);
  97	spin_unlock_irqrestore(&mdev->req_lock, flags);
  98
  99	drbd_queue_work(&mdev->data.work, &e->w);
 100	put_ldev(mdev);
 101}
 102
 103/* writes on behalf of the partner, or resync writes,
 104 * "submitted" by the receiver, final stage.  */
 105static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
 106{
 107	unsigned long flags = 0;
 108	struct drbd_conf *mdev = e->mdev;
 109	sector_t e_sector;
 
 110	int do_wake;
 111	int is_syncer_req;
 112	int do_al_complete_io;
 113
 114	D_ASSERT(e->block_id != ID_VACANT);
 115
 116	/* after we moved e to done_ee,
 117	 * we may no longer access it,
 118	 * it may be freed/reused already!
 119	 * (as soon as we release the req_lock) */
 120	e_sector = e->sector;
 121	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
 122	is_syncer_req = is_syncer_block_id(e->block_id);
 123
 124	spin_lock_irqsave(&mdev->req_lock, flags);
 125	mdev->writ_cnt += e->size >> 9;
 126	list_del(&e->w.list); /* has been on active_ee or sync_ee */
 127	list_add_tail(&e->w.list, &mdev->done_ee);
 128
 129	/* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
 130	 * neither did we wake possibly waiting conflicting requests.
 131	 * done from "drbd_process_done_ee" within the appropriate w.cb
 132	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
 133
 134	do_wake = is_syncer_req
 135		? list_empty(&mdev->sync_ee)
 136		: list_empty(&mdev->active_ee);
 137
 138	if (test_bit(__EE_WAS_ERROR, &e->flags))
 139		__drbd_chk_io_error(mdev, false);
 140	spin_unlock_irqrestore(&mdev->req_lock, flags);
 141
 142	if (is_syncer_req)
 143		drbd_rs_complete_io(mdev, e_sector);
 
 
 
 
 144
 145	if (do_wake)
 146		wake_up(&mdev->ee_wait);
 147
 148	if (do_al_complete_io)
 149		drbd_al_complete_io(mdev, e_sector);
 150
 151	wake_asender(mdev);
 152	put_ldev(mdev);
 153}
 154
 155/* writes on behalf of the partner, or resync writes,
 156 * "submitted" by the receiver.
 157 */
 158void drbd_endio_sec(struct bio *bio, int error)
 159{
 160	struct drbd_epoch_entry *e = bio->bi_private;
 161	struct drbd_conf *mdev = e->mdev;
 162	int uptodate = bio_flagged(bio, BIO_UPTODATE);
 163	int is_write = bio_data_dir(bio) == WRITE;
 164
 165	if (error && __ratelimit(&drbd_ratelimit_state))
 166		dev_warn(DEV, "%s: error=%d s=%llus\n",
 167				is_write ? "write" : "read", error,
 168				(unsigned long long)e->sector);
 169	if (!error && !uptodate) {
 170		if (__ratelimit(&drbd_ratelimit_state))
 171			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
 172					is_write ? "write" : "read",
 173					(unsigned long long)e->sector);
 174		/* strange behavior of some lower level drivers...
 175		 * fail the request by clearing the uptodate flag,
 176		 * but do not return any error?! */
 177		error = -EIO;
 178	}
 179
 180	if (error)
 181		set_bit(__EE_WAS_ERROR, &e->flags);
 182
 183	bio_put(bio); /* no need for the bio anymore */
 184	if (atomic_dec_and_test(&e->pending_bios)) {
 185		if (is_write)
 186			drbd_endio_write_sec_final(e);
 187		else
 188			drbd_endio_read_sec_final(e);
 189	}
 190}
 191
 192/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 193 */
 194void drbd_endio_pri(struct bio *bio, int error)
 195{
 196	unsigned long flags;
 197	struct drbd_request *req = bio->bi_private;
 198	struct drbd_conf *mdev = req->mdev;
 199	struct bio_and_error m;
 200	enum drbd_req_event what;
 201	int uptodate = bio_flagged(bio, BIO_UPTODATE);
 202
 203	if (!error && !uptodate) {
 204		dev_warn(DEV, "p %s: setting error to -EIO\n",
 205			 bio_data_dir(bio) == WRITE ? "write" : "read");
 206		/* strange behavior of some lower level drivers...
 207		 * fail the request by clearing the uptodate flag,
 208		 * but do not return any error?! */
 209		error = -EIO;
 210	}
 211
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 212	/* to avoid recursion in __req_mod */
 213	if (unlikely(error)) {
 214		what = (bio_data_dir(bio) == WRITE)
 215			? write_completed_with_error
 216			: (bio_rw(bio) == READ)
 217			  ? read_completed_with_error
 218			  : read_ahead_completed_with_error;
 219	} else
 220		what = completed_ok;
 221
 222	bio_put(req->private_bio);
 223	req->private_bio = ERR_PTR(error);
 224
 225	/* not req_mod(), we need irqsave here! */
 226	spin_lock_irqsave(&mdev->req_lock, flags);
 227	__req_mod(req, what, &m);
 228	spin_unlock_irqrestore(&mdev->req_lock, flags);
 
 229
 230	if (m.bio)
 231		complete_master_bio(mdev, &m);
 232}
 233
 234int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 235{
 236	struct drbd_request *req = container_of(w, struct drbd_request, w);
 237
 238	/* We should not detach for read io-error,
 239	 * but try to WRITE the P_DATA_REPLY to the failed location,
 240	 * to give the disk the chance to relocate that block */
 241
 242	spin_lock_irq(&mdev->req_lock);
 243	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
 244		_req_mod(req, read_retry_remote_canceled);
 245		spin_unlock_irq(&mdev->req_lock);
 246		return 1;
 247	}
 248	spin_unlock_irq(&mdev->req_lock);
 249
 250	return w_send_read_req(mdev, w, 0);
 251}
 252
 253void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
 254{
 255	struct hash_desc desc;
 256	struct scatterlist sg;
 257	struct page *page = e->pages;
 258	struct page *tmp;
 259	unsigned len;
 260
 261	desc.tfm = tfm;
 262	desc.flags = 0;
 263
 264	sg_init_table(&sg, 1);
 265	crypto_hash_init(&desc);
 266
 267	while ((tmp = page_chain_next(page))) {
 268		/* all but the last page will be fully used */
 269		sg_set_page(&sg, page, PAGE_SIZE, 0);
 270		crypto_hash_update(&desc, &sg, sg.length);
 271		page = tmp;
 272	}
 273	/* and now the last, possibly only partially used page */
 274	len = e->size & (PAGE_SIZE - 1);
 275	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 276	crypto_hash_update(&desc, &sg, sg.length);
 277	crypto_hash_final(&desc, digest);
 278}
 279
 280void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
 281{
 282	struct hash_desc desc;
 283	struct scatterlist sg;
 284	struct bio_vec *bvec;
 285	int i;
 286
 287	desc.tfm = tfm;
 288	desc.flags = 0;
 289
 290	sg_init_table(&sg, 1);
 291	crypto_hash_init(&desc);
 292
 293	__bio_for_each_segment(bvec, bio, i, 0) {
 294		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
 295		crypto_hash_update(&desc, &sg, sg.length);
 296	}
 297	crypto_hash_final(&desc, digest);
 298}
 299
 300/* TODO merge common code with w_e_end_ov_req */
 301int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 302{
 303	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 
 
 304	int digest_size;
 305	void *digest;
 306	int ok = 1;
 307
 308	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
 309
 310	if (unlikely(cancel))
 311		goto out;
 312
 313	if (likely((e->flags & EE_WAS_ERROR) != 0))
 314		goto out;
 315
 316	digest_size = crypto_hash_digestsize(mdev->csums_tfm);
 317	digest = kmalloc(digest_size, GFP_NOIO);
 318	if (digest) {
 319		sector_t sector = e->sector;
 320		unsigned int size = e->size;
 321		drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
 322		/* Free e and pages before send.
 323		 * In case we block on congestion, we could otherwise run into
 324		 * some distributed deadlock, if the other side blocks on
 325		 * congestion as well, because our receiver blocks in
 326		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
 327		drbd_free_ee(mdev, e);
 328		e = NULL;
 329		inc_rs_pending(mdev);
 330		ok = drbd_send_drequest_csum(mdev, sector, size,
 331					     digest, digest_size,
 332					     P_CSUM_RS_REQUEST);
 333		kfree(digest);
 334	} else {
 335		dev_err(DEV, "kmalloc() of digest failed.\n");
 336		ok = 0;
 337	}
 338
 339out:
 340	if (e)
 341		drbd_free_ee(mdev, e);
 342
 343	if (unlikely(!ok))
 344		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
 345	return ok;
 346}
 347
 348#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
 349
 350static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
 351{
 352	struct drbd_epoch_entry *e;
 
 353
 354	if (!get_ldev(mdev))
 355		return -EIO;
 356
 357	if (drbd_rs_should_slow_down(mdev, sector))
 358		goto defer;
 359
 360	/* GFP_TRY, because if there is no memory available right now, this may
 361	 * be rescheduled for later. It is "only" background resync, after all. */
 362	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
 363	if (!e)
 
 364		goto defer;
 365
 366	e->w.cb = w_e_send_csum;
 367	spin_lock_irq(&mdev->req_lock);
 368	list_add(&e->w.list, &mdev->read_ee);
 369	spin_unlock_irq(&mdev->req_lock);
 370
 371	atomic_add(size >> 9, &mdev->rs_sect_ev);
 372	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
 373		return 0;
 374
 375	/* If it failed because of ENOMEM, retry should help.  If it failed
 376	 * because bio_add_page failed (probably broken lower level driver),
 377	 * retry may or may not help.
 378	 * If it does not, you may need to force disconnect. */
 379	spin_lock_irq(&mdev->req_lock);
 380	list_del(&e->w.list);
 381	spin_unlock_irq(&mdev->req_lock);
 382
 383	drbd_free_ee(mdev, e);
 384defer:
 385	put_ldev(mdev);
 386	return -EAGAIN;
 387}
 388
 389int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 390{
 391	switch (mdev->state.conn) {
 
 
 
 392	case C_VERIFY_S:
 393		w_make_ov_request(mdev, w, cancel);
 394		break;
 395	case C_SYNC_TARGET:
 396		w_make_resync_request(mdev, w, cancel);
 397		break;
 398	}
 399
 400	return 1;
 401}
 402
 403void resync_timer_fn(unsigned long data)
 404{
 405	struct drbd_conf *mdev = (struct drbd_conf *) data;
 406
 407	if (list_empty(&mdev->resync_work.list))
 408		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
 
 409}
 410
 411static void fifo_set(struct fifo_buffer *fb, int value)
 412{
 413	int i;
 414
 415	for (i = 0; i < fb->size; i++)
 416		fb->values[i] = value;
 417}
 418
 419static int fifo_push(struct fifo_buffer *fb, int value)
 420{
 421	int ov;
 422
 423	ov = fb->values[fb->head_index];
 424	fb->values[fb->head_index++] = value;
 425
 426	if (fb->head_index >= fb->size)
 427		fb->head_index = 0;
 428
 429	return ov;
 430}
 431
 432static void fifo_add_val(struct fifo_buffer *fb, int value)
 433{
 434	int i;
 435
 436	for (i = 0; i < fb->size; i++)
 437		fb->values[i] += value;
 438}
 439
 440static int drbd_rs_controller(struct drbd_conf *mdev)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 441{
 
 442	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 443	unsigned int want;     /* The number of sectors we want in the proxy */
 444	int req_sect; /* Number of sectors to request in this turn */
 445	int correction; /* Number of sectors more we need in the proxy*/
 446	int cps; /* correction per invocation of drbd_rs_controller() */
 447	int steps; /* Number of time steps to plan ahead */
 448	int curr_corr;
 449	int max_sect;
 
 450
 451	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
 452	mdev->rs_in_flight -= sect_in;
 453
 454	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
 
 455
 456	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 457
 458	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
 459		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
 460	} else { /* normal path */
 461		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
 462			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
 463	}
 464
 465	correction = want - mdev->rs_in_flight - mdev->rs_planed;
 466
 467	/* Plan ahead */
 468	cps = correction / steps;
 469	fifo_add_val(&mdev->rs_plan_s, cps);
 470	mdev->rs_planed += cps * steps;
 471
 472	/* What we do in this step */
 473	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
 474	spin_unlock(&mdev->peer_seq_lock);
 475	mdev->rs_planed -= curr_corr;
 476
 477	req_sect = sect_in + curr_corr;
 478	if (req_sect < 0)
 479		req_sect = 0;
 480
 481	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
 482	if (req_sect > max_sect)
 483		req_sect = max_sect;
 484
 485	/*
 486	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 487		 sect_in, mdev->rs_in_flight, want, correction,
 488		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
 489	*/
 490
 491	return req_sect;
 492}
 493
 494static int drbd_rs_number_requests(struct drbd_conf *mdev)
 495{
 496	int number;
 497	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
 498		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
 499		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 
 
 500	} else {
 501		mdev->c_sync_rate = mdev->sync_conf.rate;
 502		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 503	}
 
 504
 505	/* ignore the amount of pending requests, the resync controller should
 506	 * throttle down to incoming reply rate soon enough anyways. */
 507	return number;
 508}
 509
 510static int w_make_resync_request(struct drbd_conf *mdev,
 511				 struct drbd_work *w, int cancel)
 512{
 513	unsigned long bit;
 514	sector_t sector;
 515	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 516	int max_bio_size;
 517	int number, rollback_i, size;
 518	int align, queued, sndbuf;
 519	int i = 0;
 520
 521	if (unlikely(cancel))
 522		return 1;
 523
 524	if (mdev->rs_total == 0) {
 525		/* empty resync? */
 526		drbd_resync_finished(mdev);
 527		return 1;
 528	}
 529
 530	if (!get_ldev(mdev)) {
 531		/* Since we only need to access mdev->rsync a
 532		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
 533		   to continue resync with a broken disk makes no sense at
 534		   all */
 535		dev_err(DEV, "Disk broke down during resync!\n");
 536		return 1;
 537	}
 538
 539	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
 540	number = drbd_rs_number_requests(mdev);
 541	if (number == 0)
 542		goto requeue;
 543
 544	for (i = 0; i < number; i++) {
 545		/* Stop generating RS requests, when half of the send buffer is filled */
 546		mutex_lock(&mdev->data.mutex);
 547		if (mdev->data.socket) {
 548			queued = mdev->data.socket->sk->sk_wmem_queued;
 549			sndbuf = mdev->data.socket->sk->sk_sndbuf;
 550		} else {
 551			queued = 1;
 552			sndbuf = 0;
 553		}
 554		mutex_unlock(&mdev->data.mutex);
 555		if (queued > sndbuf / 2)
 556			goto requeue;
 557
 558next_sector:
 559		size = BM_BLOCK_SIZE;
 560		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
 561
 562		if (bit == DRBD_END_OF_BITMAP) {
 563			mdev->bm_resync_fo = drbd_bm_bits(mdev);
 564			put_ldev(mdev);
 565			return 1;
 566		}
 567
 568		sector = BM_BIT_TO_SECT(bit);
 569
 570		if (drbd_rs_should_slow_down(mdev, sector) ||
 571		    drbd_try_rs_begin_io(mdev, sector)) {
 572			mdev->bm_resync_fo = bit;
 573			goto requeue;
 574		}
 575		mdev->bm_resync_fo = bit + 1;
 576
 577		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
 578			drbd_rs_complete_io(mdev, sector);
 579			goto next_sector;
 580		}
 581
 582#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 583		/* try to find some adjacent bits.
 584		 * we stop if we have already the maximum req size.
 585		 *
 586		 * Additionally always align bigger requests, in order to
 587		 * be prepared for all stripe sizes of software RAIDs.
 588		 */
 589		align = 1;
 590		rollback_i = i;
 591		for (;;) {
 592			if (size + BM_BLOCK_SIZE > max_bio_size)
 593				break;
 594
 595			/* Be always aligned */
 596			if (sector & ((1<<(align+3))-1))
 597				break;
 598
 599			/* do not cross extent boundaries */
 600			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 601				break;
 602			/* now, is it actually dirty, after all?
 603			 * caution, drbd_bm_test_bit is tri-state for some
 604			 * obscure reason; ( b == 0 ) would get the out-of-band
 605			 * only accidentally right because of the "oddly sized"
 606			 * adjustment below */
 607			if (drbd_bm_test_bit(mdev, bit+1) != 1)
 608				break;
 609			bit++;
 610			size += BM_BLOCK_SIZE;
 611			if ((BM_BLOCK_SIZE << align) <= size)
 612				align++;
 613			i++;
 614		}
 615		/* if we merged some,
 616		 * reset the offset to start the next drbd_bm_find_next from */
 617		if (size > BM_BLOCK_SIZE)
 618			mdev->bm_resync_fo = bit + 1;
 619#endif
 620
 621		/* adjust very last sectors, in case we are oddly sized */
 622		if (sector + (size>>9) > capacity)
 623			size = (capacity-sector)<<9;
 624		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
 625			switch (read_for_csum(mdev, sector, size)) {
 
 626			case -EIO: /* Disk failure */
 627				put_ldev(mdev);
 628				return 0;
 629			case -EAGAIN: /* allocation failed, or ldev busy */
 630				drbd_rs_complete_io(mdev, sector);
 631				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
 632				i = rollback_i;
 633				goto requeue;
 634			case 0:
 635				/* everything ok */
 636				break;
 637			default:
 638				BUG();
 639			}
 640		} else {
 641			inc_rs_pending(mdev);
 642			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
 643					       sector, size, ID_SYNCER)) {
 644				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
 645				dec_rs_pending(mdev);
 646				put_ldev(mdev);
 647				return 0;
 
 
 
 648			}
 649		}
 650	}
 651
 652	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
 653		/* last syncer _request_ was sent,
 654		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 655		 * next sync group will resume), as soon as we receive the last
 656		 * resync data block, and the last bit is cleared.
 657		 * until then resync "work" is "inactive" ...
 658		 */
 659		put_ldev(mdev);
 660		return 1;
 661	}
 662
 663 requeue:
 664	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 665	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 666	put_ldev(mdev);
 667	return 1;
 668}
 669
 670static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 671{
 672	int number, i, size;
 673	sector_t sector;
 674	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 
 675
 676	if (unlikely(cancel))
 677		return 1;
 678
 679	number = drbd_rs_number_requests(mdev);
 680
 681	sector = mdev->ov_position;
 682	for (i = 0; i < number; i++) {
 683		if (sector >= capacity) {
 684			return 1;
 685		}
 
 
 
 
 
 
 
 
 686
 687		size = BM_BLOCK_SIZE;
 688
 689		if (drbd_rs_should_slow_down(mdev, sector) ||
 690		    drbd_try_rs_begin_io(mdev, sector)) {
 691			mdev->ov_position = sector;
 692			goto requeue;
 693		}
 694
 695		if (sector + (size>>9) > capacity)
 696			size = (capacity-sector)<<9;
 697
 698		inc_rs_pending(mdev);
 699		if (!drbd_send_ov_request(mdev, sector, size)) {
 700			dec_rs_pending(mdev);
 701			return 0;
 702		}
 703		sector += BM_SECT_PER_BIT;
 704	}
 705	mdev->ov_position = sector;
 706
 707 requeue:
 708	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 709	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 
 710	return 1;
 711}
 712
 713
 714void start_resync_timer_fn(unsigned long data)
 715{
 716	struct drbd_conf *mdev = (struct drbd_conf *) data;
 
 
 
 
 
 717
 718	drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
 719}
 720
 721int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 722{
 723	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
 724		dev_warn(DEV, "w_start_resync later...\n");
 725		mdev->start_resync_timer.expires = jiffies + HZ/10;
 726		add_timer(&mdev->start_resync_timer);
 727		return 1;
 728	}
 729
 730	drbd_start_resync(mdev, C_SYNC_SOURCE);
 731	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
 732	return 1;
 733}
 734
 735int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 736{
 737	kfree(w);
 738	ov_oos_print(mdev);
 739	drbd_resync_finished(mdev);
 740
 741	return 1;
 742}
 743
 744static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 745{
 746	kfree(w);
 747
 748	drbd_resync_finished(mdev);
 749
 750	return 1;
 751}
 752
 753static void ping_peer(struct drbd_conf *mdev)
 754{
 755	clear_bit(GOT_PING_ACK, &mdev->flags);
 756	request_ping(mdev);
 757	wait_event(mdev->misc_wait,
 758		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
 759}
 760
 761int drbd_resync_finished(struct drbd_conf *mdev)
 762{
 763	unsigned long db, dt, dbdt;
 764	unsigned long n_oos;
 765	union drbd_state os, ns;
 766	struct drbd_work *w;
 767	char *khelper_cmd = NULL;
 768	int verify_done = 0;
 769
 770	/* Remove all elements from the resync LRU. Since future actions
 771	 * might set bits in the (main) bitmap, then the entries in the
 772	 * resync LRU would be wrong. */
 773	if (drbd_rs_del_all(mdev)) {
 774		/* In case this is not possible now, most probably because
 775		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 776		 * queue (or even the read operations for those packets
 777		 * is not finished by now).   Retry in 100ms. */
 778
 779		schedule_timeout_interruptible(HZ / 10);
 780		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
 781		if (w) {
 782			w->cb = w_resync_finished;
 783			drbd_queue_work(&mdev->data.work, w);
 
 
 784			return 1;
 785		}
 786		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
 787	}
 788
 789	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
 790	if (dt <= 0)
 791		dt = 1;
 792	db = mdev->rs_total;
 
 
 
 
 
 793	dbdt = Bit2KB(db/dt);
 794	mdev->rs_paused /= HZ;
 795
 796	if (!get_ldev(mdev))
 797		goto out;
 798
 799	ping_peer(mdev);
 800
 801	spin_lock_irq(&mdev->req_lock);
 802	os = mdev->state;
 803
 804	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 805
 806	/* This protects us against multiple calls (that can happen in the presence
 807	   of application IO), and against connectivity loss just before we arrive here. */
 808	if (os.conn <= C_CONNECTED)
 809		goto out_unlock;
 810
 811	ns = os;
 812	ns.conn = C_CONNECTED;
 813
 814	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 815	     verify_done ? "Online verify " : "Resync",
 816	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
 817
 818	n_oos = drbd_bm_total_weight(mdev);
 819
 820	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 821		if (n_oos) {
 822			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
 823			      n_oos, Bit2KB(1));
 824			khelper_cmd = "out-of-sync";
 825		}
 826	} else {
 827		D_ASSERT((n_oos - mdev->rs_failed) == 0);
 828
 829		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 830			khelper_cmd = "after-resync-target";
 831
 832		if (mdev->csums_tfm && mdev->rs_total) {
 833			const unsigned long s = mdev->rs_same_csum;
 834			const unsigned long t = mdev->rs_total;
 835			const int ratio =
 836				(t == 0)     ? 0 :
 837			(t < 100000) ? ((s*100)/t) : (s/(t/100));
 838			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
 839			     "transferred %luK total %luK\n",
 840			     ratio,
 841			     Bit2KB(mdev->rs_same_csum),
 842			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
 843			     Bit2KB(mdev->rs_total));
 844		}
 845	}
 846
 847	if (mdev->rs_failed) {
 848		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
 849
 850		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 851			ns.disk = D_INCONSISTENT;
 852			ns.pdsk = D_UP_TO_DATE;
 853		} else {
 854			ns.disk = D_UP_TO_DATE;
 855			ns.pdsk = D_INCONSISTENT;
 856		}
 857	} else {
 858		ns.disk = D_UP_TO_DATE;
 859		ns.pdsk = D_UP_TO_DATE;
 860
 861		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 862			if (mdev->p_uuid) {
 863				int i;
 864				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 865					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
 866				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
 867				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
 868			} else {
 869				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
 870			}
 871		}
 872
 873		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 874			/* for verify runs, we don't update uuids here,
 875			 * so there would be nothing to report. */
 876			drbd_uuid_set_bm(mdev, 0UL);
 877			drbd_print_uuids(mdev, "updated UUIDs");
 878			if (mdev->p_uuid) {
 879				/* Now the two UUID sets are equal, update what we
 880				 * know of the peer. */
 881				int i;
 882				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 883					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
 884			}
 885		}
 886	}
 887
 888	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
 889out_unlock:
 890	spin_unlock_irq(&mdev->req_lock);
 891	put_ldev(mdev);
 892out:
 893	mdev->rs_total  = 0;
 894	mdev->rs_failed = 0;
 895	mdev->rs_paused = 0;
 896	if (verify_done)
 897		mdev->ov_start_sector = 0;
 
 
 898
 899	drbd_md_sync(mdev);
 900
 901	if (khelper_cmd)
 902		drbd_khelper(mdev, khelper_cmd);
 903
 904	return 1;
 905}
 906
 907/* helper */
 908static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 909{
 910	if (drbd_ee_has_active_page(e)) {
 911		/* This might happen if sendpage() has not finished */
 912		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
 913		atomic_add(i, &mdev->pp_in_use_by_net);
 914		atomic_sub(i, &mdev->pp_in_use);
 915		spin_lock_irq(&mdev->req_lock);
 916		list_add_tail(&e->w.list, &mdev->net_ee);
 917		spin_unlock_irq(&mdev->req_lock);
 918		wake_up(&drbd_pp_wait);
 919	} else
 920		drbd_free_ee(mdev, e);
 921}
 922
 923/**
 924 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 925 * @mdev:	DRBD device.
 926 * @w:		work object.
 927 * @cancel:	The connection will be closed anyways
 928 */
 929int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 930{
 931	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 932	int ok;
 
 
 933
 934	if (unlikely(cancel)) {
 935		drbd_free_ee(mdev, e);
 936		dec_unacked(mdev);
 937		return 1;
 938	}
 939
 940	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 941		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
 942	} else {
 943		if (__ratelimit(&drbd_ratelimit_state))
 944			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
 945			    (unsigned long long)e->sector);
 946
 947		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
 948	}
 949
 950	dec_unacked(mdev);
 951
 952	move_to_net_ee_or_free(mdev, e);
 953
 954	if (unlikely(!ok))
 955		dev_err(DEV, "drbd_send_block() failed\n");
 956	return ok;
 957}
 958
 959/**
 960 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
 961 * @mdev:	DRBD device.
 962 * @w:		work object.
 963 * @cancel:	The connection will be closed anyways
 964 */
 965int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 966{
 967	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 968	int ok;
 
 
 969
 970	if (unlikely(cancel)) {
 971		drbd_free_ee(mdev, e);
 972		dec_unacked(mdev);
 973		return 1;
 974	}
 975
 976	if (get_ldev_if_state(mdev, D_FAILED)) {
 977		drbd_rs_complete_io(mdev, e->sector);
 978		put_ldev(mdev);
 979	}
 980
 981	if (mdev->state.conn == C_AHEAD) {
 982		ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
 983	} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 984		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
 985			inc_rs_pending(mdev);
 986			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
 987		} else {
 988			if (__ratelimit(&drbd_ratelimit_state))
 989				dev_err(DEV, "Not sending RSDataReply, "
 990				    "partner DISKLESS!\n");
 991			ok = 1;
 992		}
 993	} else {
 994		if (__ratelimit(&drbd_ratelimit_state))
 995			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
 996			    (unsigned long long)e->sector);
 997
 998		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
 999
1000		/* update resync data with failure */
1001		drbd_rs_failed_io(mdev, e->sector, e->size);
1002	}
1003
1004	dec_unacked(mdev);
1005
1006	move_to_net_ee_or_free(mdev, e);
1007
1008	if (unlikely(!ok))
1009		dev_err(DEV, "drbd_send_block() failed\n");
1010	return ok;
1011}
1012
1013int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1014{
1015	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 
 
1016	struct digest_info *di;
1017	int digest_size;
1018	void *digest = NULL;
1019	int ok, eq = 0;
1020
1021	if (unlikely(cancel)) {
1022		drbd_free_ee(mdev, e);
1023		dec_unacked(mdev);
1024		return 1;
1025	}
1026
1027	if (get_ldev(mdev)) {
1028		drbd_rs_complete_io(mdev, e->sector);
1029		put_ldev(mdev);
1030	}
1031
1032	di = e->digest;
1033
1034	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1035		/* quick hack to try to avoid a race against reconfiguration.
1036		 * a real fix would be much more involved,
1037		 * introducing more locking mechanisms */
1038		if (mdev->csums_tfm) {
1039			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1040			D_ASSERT(digest_size == di->digest_size);
1041			digest = kmalloc(digest_size, GFP_NOIO);
1042		}
1043		if (digest) {
1044			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1045			eq = !memcmp(digest, di->digest, digest_size);
1046			kfree(digest);
1047		}
1048
1049		if (eq) {
1050			drbd_set_in_sync(mdev, e->sector, e->size);
1051			/* rs_same_csums unit is BM_BLOCK_SIZE */
1052			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1053			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1054		} else {
1055			inc_rs_pending(mdev);
1056			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1057			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1058			kfree(di);
1059			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1060		}
1061	} else {
1062		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1063		if (__ratelimit(&drbd_ratelimit_state))
1064			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1065	}
1066
1067	dec_unacked(mdev);
1068	move_to_net_ee_or_free(mdev, e);
1069
1070	if (unlikely(!ok))
1071		dev_err(DEV, "drbd_send_block/ack() failed\n");
1072	return ok;
1073}
1074
1075/* TODO merge common code with w_e_send_csum */
1076int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1077{
1078	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1079	sector_t sector = e->sector;
1080	unsigned int size = e->size;
 
 
1081	int digest_size;
1082	void *digest;
1083	int ok = 1;
1084
1085	if (unlikely(cancel))
1086		goto out;
1087
1088	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1089	digest = kmalloc(digest_size, GFP_NOIO);
1090	if (!digest) {
1091		ok = 0;	/* terminate the connection in case the allocation failed */
1092		goto out;
1093	}
1094
1095	if (likely(!(e->flags & EE_WAS_ERROR)))
1096		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1097	else
1098		memset(digest, 0, digest_size);
1099
1100	/* Free e and pages before send.
1101	 * In case we block on congestion, we could otherwise run into
1102	 * some distributed deadlock, if the other side blocks on
1103	 * congestion as well, because our receiver blocks in
1104	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1105	drbd_free_ee(mdev, e);
1106	e = NULL;
1107	inc_rs_pending(mdev);
1108	ok = drbd_send_drequest_csum(mdev, sector, size,
1109				     digest, digest_size,
1110				     P_OV_REPLY);
1111	if (!ok)
1112		dec_rs_pending(mdev);
1113	kfree(digest);
1114
1115out:
1116	if (e)
1117		drbd_free_ee(mdev, e);
1118	dec_unacked(mdev);
1119	return ok;
1120}
1121
1122void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1123{
1124	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1125		mdev->ov_last_oos_size += size>>9;
1126	} else {
1127		mdev->ov_last_oos_start = sector;
1128		mdev->ov_last_oos_size = size>>9;
1129	}
1130	drbd_set_out_of_sync(mdev, sector, size);
1131}
1132
1133int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1134{
1135	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 
 
1136	struct digest_info *di;
1137	void *digest;
1138	sector_t sector = e->sector;
1139	unsigned int size = e->size;
1140	int digest_size;
1141	int ok, eq = 0;
 
1142
1143	if (unlikely(cancel)) {
1144		drbd_free_ee(mdev, e);
1145		dec_unacked(mdev);
1146		return 1;
1147	}
1148
1149	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1150	 * the resync lru has been cleaned up already */
1151	if (get_ldev(mdev)) {
1152		drbd_rs_complete_io(mdev, e->sector);
1153		put_ldev(mdev);
1154	}
1155
1156	di = e->digest;
1157
1158	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1159		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1160		digest = kmalloc(digest_size, GFP_NOIO);
1161		if (digest) {
1162			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1163
1164			D_ASSERT(digest_size == di->digest_size);
1165			eq = !memcmp(digest, di->digest, digest_size);
1166			kfree(digest);
1167		}
1168	}
1169
1170		/* Free e and pages before send.
1171		 * In case we block on congestion, we could otherwise run into
1172		 * some distributed deadlock, if the other side blocks on
1173		 * congestion as well, because our receiver blocks in
1174		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1175	drbd_free_ee(mdev, e);
1176	if (!eq)
1177		drbd_ov_oos_found(mdev, sector, size);
1178	else
1179		ov_oos_print(mdev);
1180
1181	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1182			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1183
1184	dec_unacked(mdev);
1185
1186	--mdev->ov_left;
1187
1188	/* let's advance progress step marks only for every other megabyte */
1189	if ((mdev->ov_left & 0x200) == 0x200)
1190		drbd_advance_rs_marks(mdev, mdev->ov_left);
 
 
 
1191
1192	if (mdev->ov_left == 0) {
1193		ov_oos_print(mdev);
1194		drbd_resync_finished(mdev);
1195	}
1196
1197	return ok;
1198}
1199
1200int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 
 
 
 
 
1201{
1202	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1203	complete(&b->done);
1204	return 1;
 
 
 
 
 
 
 
 
 
1205}
1206
1207int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1208{
1209	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1210	struct p_barrier *p = &mdev->data.sbuf.barrier;
1211	int ok = 1;
1212
1213	/* really avoid racing with tl_clear.  w.cb may have been referenced
1214	 * just before it was reassigned and re-queued, so double check that.
1215	 * actually, this race was harmless, since we only try to send the
1216	 * barrier packet here, and otherwise do nothing with the object.
1217	 * but compare with the head of w_clear_epoch */
1218	spin_lock_irq(&mdev->req_lock);
1219	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1220		cancel = 1;
1221	spin_unlock_irq(&mdev->req_lock);
1222	if (cancel)
1223		return 1;
1224
1225	if (!drbd_get_data_sock(mdev))
1226		return 0;
1227	p->barrier = b->br_number;
1228	/* inc_ap_pending was done where this was queued.
1229	 * dec_ap_pending will be done in got_BarrierAck
1230	 * or (on connection loss) in w_clear_epoch.  */
1231	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1232				(struct p_header80 *)p, sizeof(*p), 0);
1233	drbd_put_data_sock(mdev);
1234
1235	return ok;
 
 
 
 
 
 
1236}
1237
1238int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1239{
1240	if (cancel)
1241		return 1;
1242	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
 
 
 
 
 
1243}
1244
1245int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1246{
1247	struct drbd_request *req = container_of(w, struct drbd_request, w);
1248	int ok;
 
 
1249
1250	if (unlikely(cancel)) {
1251		req_mod(req, send_canceled);
1252		return 1;
1253	}
1254
1255	ok = drbd_send_oos(mdev, req);
1256	req_mod(req, oos_handed_to_network);
 
 
 
 
 
 
1257
1258	return ok;
1259}
1260
1261/**
1262 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1263 * @mdev:	DRBD device.
1264 * @w:		work object.
1265 * @cancel:	The connection will be closed anyways
1266 */
1267int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1268{
1269	struct drbd_request *req = container_of(w, struct drbd_request, w);
1270	int ok;
 
 
1271
1272	if (unlikely(cancel)) {
1273		req_mod(req, send_canceled);
1274		return 1;
1275	}
1276
1277	ok = drbd_send_dblock(mdev, req);
1278	req_mod(req, ok ? handed_over_to_network : send_failed);
 
 
 
 
1279
1280	return ok;
1281}
1282
1283/**
1284 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1285 * @mdev:	DRBD device.
1286 * @w:		work object.
1287 * @cancel:	The connection will be closed anyways
1288 */
1289int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1290{
1291	struct drbd_request *req = container_of(w, struct drbd_request, w);
1292	int ok;
 
 
1293
1294	if (unlikely(cancel)) {
1295		req_mod(req, send_canceled);
1296		return 1;
1297	}
1298
1299	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1300				(unsigned long)req);
 
1301
1302	if (!ok) {
1303		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1304		 * so this is probably redundant */
1305		if (mdev->state.conn >= C_CONNECTED)
1306			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1307	}
1308	req_mod(req, ok ? handed_over_to_network : send_failed);
1309
1310	return ok;
 
 
1311}
1312
1313int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1314{
1315	struct drbd_request *req = container_of(w, struct drbd_request, w);
 
1316
1317	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1318		drbd_al_begin_io(mdev, req->sector);
1319	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1320	   theoretically. Practically it can not deadlock, since this is
1321	   only used when unfreezing IOs. All the extents of the requests
1322	   that made it into the TL are already active */
1323
1324	drbd_req_make_private_bio(req, req->master_bio);
1325	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1326	generic_make_request(req->private_bio);
1327
1328	return 1;
1329}
1330
1331static int _drbd_may_sync_now(struct drbd_conf *mdev)
1332{
1333	struct drbd_conf *odev = mdev;
 
1334
1335	while (1) {
1336		if (odev->sync_conf.after == -1)
 
 
 
 
 
 
 
 
1337			return 1;
1338		odev = minor_to_mdev(odev->sync_conf.after);
1339		ERR_IF(!odev) return 1;
1340		if ((odev->state.conn >= C_SYNC_SOURCE &&
1341		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1342		    odev->state.aftr_isp || odev->state.peer_isp ||
1343		    odev->state.user_isp)
1344			return 0;
1345	}
1346}
1347
1348/**
1349 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1350 * @mdev:	DRBD device.
1351 *
1352 * Called from process context only (admin command and after_state_ch).
1353 */
1354static int _drbd_pause_after(struct drbd_conf *mdev)
1355{
1356	struct drbd_conf *odev;
1357	int i, rv = 0;
1358
1359	for (i = 0; i < minor_count; i++) {
1360		odev = minor_to_mdev(i);
1361		if (!odev)
1362			continue;
1363		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1364			continue;
1365		if (!_drbd_may_sync_now(odev))
1366			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1367			       != SS_NOTHING_TO_DO);
1368	}
 
1369
1370	return rv;
1371}
1372
1373/**
1374 * _drbd_resume_next() - Resume resync on all devices that may resync now
1375 * @mdev:	DRBD device.
1376 *
1377 * Called from process context only (admin command and worker).
1378 */
1379static int _drbd_resume_next(struct drbd_conf *mdev)
1380{
1381	struct drbd_conf *odev;
1382	int i, rv = 0;
1383
1384	for (i = 0; i < minor_count; i++) {
1385		odev = minor_to_mdev(i);
1386		if (!odev)
1387			continue;
1388		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1389			continue;
1390		if (odev->state.aftr_isp) {
1391			if (_drbd_may_sync_now(odev))
1392				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1393							CS_HARD, NULL)
1394				       != SS_NOTHING_TO_DO) ;
1395		}
1396	}
 
1397	return rv;
1398}
1399
1400void resume_next_sg(struct drbd_conf *mdev)
1401{
1402	write_lock_irq(&global_state_lock);
1403	_drbd_resume_next(mdev);
1404	write_unlock_irq(&global_state_lock);
1405}
1406
1407void suspend_other_sg(struct drbd_conf *mdev)
1408{
1409	write_lock_irq(&global_state_lock);
1410	_drbd_pause_after(mdev);
1411	write_unlock_irq(&global_state_lock);
1412}
1413
1414static int sync_after_error(struct drbd_conf *mdev, int o_minor)
 
1415{
1416	struct drbd_conf *odev;
 
1417
1418	if (o_minor == -1)
1419		return NO_ERROR;
1420	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1421		return ERR_SYNC_AFTER;
1422
1423	/* check for loops */
1424	odev = minor_to_mdev(o_minor);
1425	while (1) {
1426		if (odev == mdev)
1427			return ERR_SYNC_AFTER_CYCLE;
1428
 
 
 
 
 
 
 
 
 
 
 
 
1429		/* dependency chain ends here, no cycles. */
1430		if (odev->sync_conf.after == -1)
1431			return NO_ERROR;
1432
1433		/* follow the dependency chain */
1434		odev = minor_to_mdev(odev->sync_conf.after);
1435	}
1436}
1437
1438int drbd_alter_sa(struct drbd_conf *mdev, int na)
 
1439{
1440	int changes;
1441	int retcode;
1442
1443	write_lock_irq(&global_state_lock);
1444	retcode = sync_after_error(mdev, na);
1445	if (retcode == NO_ERROR) {
1446		mdev->sync_conf.after = na;
1447		do {
1448			changes  = _drbd_pause_after(mdev);
1449			changes |= _drbd_resume_next(mdev);
1450		} while (changes);
1451	}
1452	write_unlock_irq(&global_state_lock);
1453	return retcode;
 
 
 
 
 
 
 
 
 
 
 
 
1454}
1455
1456void drbd_rs_controller_reset(struct drbd_conf *mdev)
1457{
1458	atomic_set(&mdev->rs_sect_in, 0);
1459	atomic_set(&mdev->rs_sect_ev, 0);
1460	mdev->rs_in_flight = 0;
1461	mdev->rs_planed = 0;
1462	spin_lock(&mdev->peer_seq_lock);
1463	fifo_set(&mdev->rs_plan_s, 0);
1464	spin_unlock(&mdev->peer_seq_lock);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1465}
1466
1467/**
1468 * drbd_start_resync() - Start the resync process
1469 * @mdev:	DRBD device.
1470 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1471 *
1472 * This function might bring you directly into one of the
1473 * C_PAUSED_SYNC_* states.
1474 */
1475void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1476{
1477	union drbd_state ns;
1478	int r;
1479
1480	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1481		dev_err(DEV, "Resync already running!\n");
1482		return;
1483	}
1484
1485	if (mdev->state.conn < C_AHEAD) {
1486		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1487		drbd_rs_cancel_all(mdev);
1488		/* This should be done when we abort the resync. We definitely do not
1489		   want to have this for connections going back and forth between
1490		   Ahead/Behind and SyncSource/SyncTarget */
1491	}
1492
1493	if (side == C_SYNC_TARGET) {
1494		/* Since application IO was locked out during C_WF_BITMAP_T and
1495		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1496		   we check that we might make the data inconsistent. */
1497		r = drbd_khelper(mdev, "before-resync-target");
1498		r = (r >> 8) & 0xff;
1499		if (r > 0) {
1500			dev_info(DEV, "before-resync-target handler returned %d, "
1501			     "dropping connection.\n", r);
1502			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1503			return;
1504		}
1505	} else /* C_SYNC_SOURCE */ {
1506		r = drbd_khelper(mdev, "before-resync-source");
1507		r = (r >> 8) & 0xff;
1508		if (r > 0) {
1509			if (r == 3) {
1510				dev_info(DEV, "before-resync-source handler returned %d, "
1511					 "ignoring. Old userland tools?", r);
1512			} else {
1513				dev_info(DEV, "before-resync-source handler returned %d, "
1514					 "dropping connection.\n", r);
1515				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1516				return;
1517			}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1518		}
1519	}
1520
1521	drbd_state_lock(mdev);
 
 
 
 
 
 
 
 
 
 
 
 
1522
1523	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1524		drbd_state_unlock(mdev);
 
 
 
 
1525		return;
1526	}
1527
1528	write_lock_irq(&global_state_lock);
1529	ns = mdev->state;
1530
1531	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1532
1533	ns.conn = side;
1534
1535	if (side == C_SYNC_TARGET)
1536		ns.disk = D_INCONSISTENT;
1537	else /* side == C_SYNC_SOURCE */
1538		ns.pdsk = D_INCONSISTENT;
1539
1540	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1541	ns = mdev->state;
1542
1543	if (ns.conn < C_CONNECTED)
1544		r = SS_UNKNOWN_ERROR;
1545
1546	if (r == SS_SUCCESS) {
1547		unsigned long tw = drbd_bm_total_weight(mdev);
1548		unsigned long now = jiffies;
1549		int i;
1550
1551		mdev->rs_failed    = 0;
1552		mdev->rs_paused    = 0;
1553		mdev->rs_same_csum = 0;
1554		mdev->rs_last_events = 0;
1555		mdev->rs_last_sect_ev = 0;
1556		mdev->rs_total     = tw;
1557		mdev->rs_start     = now;
1558		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1559			mdev->rs_mark_left[i] = tw;
1560			mdev->rs_mark_time[i] = now;
1561		}
1562		_drbd_pause_after(mdev);
1563	}
1564	write_unlock_irq(&global_state_lock);
1565
1566	if (r == SS_SUCCESS) {
1567		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
 
 
 
 
1568		     drbd_conn_str(ns.conn),
1569		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1570		     (unsigned long) mdev->rs_total);
1571		if (side == C_SYNC_TARGET)
1572			mdev->bm_resync_fo = 0;
1573
1574		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1575		 * with w_send_oos, or the sync target will get confused as to
1576		 * how much bits to resync.  We cannot do that always, because for an
1577		 * empty resync and protocol < 95, we need to do it here, as we call
1578		 * drbd_resync_finished from here in that case.
1579		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1580		 * and from after_state_ch otherwise. */
1581		if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1582			drbd_gen_and_send_sync_uuid(mdev);
 
1583
1584		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
 
1585			/* This still has a race (about when exactly the peers
1586			 * detect connection loss) that can lead to a full sync
1587			 * on next handshake. In 8.3.9 we fixed this with explicit
1588			 * resync-finished notifications, but the fix
1589			 * introduces a protocol change.  Sleeping for some
1590			 * time longer than the ping interval + timeout on the
1591			 * SyncSource, to give the SyncTarget the chance to
1592			 * detect connection loss, then waiting for a ping
1593			 * response (implicit in drbd_resync_finished) reduces
1594			 * the race considerably, but does not solve it. */
1595			if (side == C_SYNC_SOURCE)
1596				schedule_timeout_interruptible(
1597					mdev->net_conf->ping_int * HZ +
1598					mdev->net_conf->ping_timeo*HZ/9);
1599			drbd_resync_finished(mdev);
 
 
 
 
 
 
1600		}
1601
1602		drbd_rs_controller_reset(mdev);
1603		/* ns.conn may already be != mdev->state.conn,
1604		 * we may have been paused in between, or become paused until
1605		 * the timer triggers.
1606		 * No matter, that is handled in resync_timer_fn() */
1607		if (ns.conn == C_SYNC_TARGET)
1608			mod_timer(&mdev->resync_timer, jiffies);
1609
1610		drbd_md_sync(mdev);
1611	}
1612	put_ldev(mdev);
1613	drbd_state_unlock(mdev);
1614}
1615
1616int drbd_worker(struct drbd_thread *thi)
1617{
1618	struct drbd_conf *mdev = thi->mdev;
1619	struct drbd_work *w = NULL;
1620	LIST_HEAD(work_list);
1621	int intr = 0, i;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1622
1623	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
 
1624
1625	while (get_t_state(thi) == Running) {
1626		drbd_thread_current_set_cpu(mdev);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1627
1628		if (down_trylock(&mdev->data.work.s)) {
1629			mutex_lock(&mdev->data.mutex);
1630			if (mdev->data.socket && !mdev->net_conf->no_cork)
1631				drbd_tcp_uncork(mdev->data.socket);
1632			mutex_unlock(&mdev->data.mutex);
1633
1634			intr = down_interruptible(&mdev->data.work.s);
 
 
1635
1636			mutex_lock(&mdev->data.mutex);
1637			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1638				drbd_tcp_cork(mdev->data.socket);
1639			mutex_unlock(&mdev->data.mutex);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1640		}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1641
1642		if (intr) {
1643			D_ASSERT(intr == -EINTR);
 
 
 
 
 
 
 
1644			flush_signals(current);
1645			ERR_IF (get_t_state(thi) == Running)
 
1646				continue;
 
1647			break;
1648		}
1649
1650		if (get_t_state(thi) != Running)
1651			break;
1652		/* With this break, we have done a down() but not consumed
1653		   the entry from the list. The cleanup code takes care of
1654		   this...   */
1655
1656		w = NULL;
1657		spin_lock_irq(&mdev->data.work.q_lock);
1658		ERR_IF(list_empty(&mdev->data.work.q)) {
1659			/* something terribly wrong in our logic.
1660			 * we were able to down() the semaphore,
1661			 * but the list is empty... doh.
1662			 *
1663			 * what is the best thing to do now?
1664			 * try again from scratch, restarting the receiver,
1665			 * asender, whatnot? could break even more ugly,
1666			 * e.g. when we are primary, but no good local data.
1667			 *
1668			 * I'll try to get away just starting over this loop.
1669			 */
1670			spin_unlock_irq(&mdev->data.work.q_lock);
1671			continue;
1672		}
1673		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1674		list_del_init(&w->list);
1675		spin_unlock_irq(&mdev->data.work.q_lock);
1676
1677		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1678			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1679			if (mdev->state.conn >= C_CONNECTED)
1680				drbd_force_state(mdev,
1681						NS(conn, C_NETWORK_FAILURE));
1682		}
1683	}
1684	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1685	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1686
1687	spin_lock_irq(&mdev->data.work.q_lock);
1688	i = 0;
1689	while (!list_empty(&mdev->data.work.q)) {
1690		list_splice_init(&mdev->data.work.q, &work_list);
1691		spin_unlock_irq(&mdev->data.work.q_lock);
1692
 
1693		while (!list_empty(&work_list)) {
1694			w = list_entry(work_list.next, struct drbd_work, list);
1695			list_del_init(&w->list);
1696			w->cb(mdev, w, 1);
1697			i++; /* dead debugging code */
1698		}
 
 
1699
1700		spin_lock_irq(&mdev->data.work.q_lock);
 
 
 
 
 
 
 
 
1701	}
1702	sema_init(&mdev->data.work.s, 0);
1703	/* DANGEROUS race: if someone did queue his work within the spinlock,
1704	 * but up() ed outside the spinlock, we could get an up() on the
1705	 * semaphore without corresponding list entry.
1706	 * So don't do that.
1707	 */
1708	spin_unlock_irq(&mdev->data.work.q_lock);
1709
1710	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1711	/* _drbd_set_state only uses stop_nowait.
1712	 * wait here for the Exiting receiver. */
1713	drbd_thread_stop(&mdev->receiver);
1714	drbd_mdev_cleanup(mdev);
1715
1716	dev_info(DEV, "worker terminated\n");
1717
1718	clear_bit(DEVICE_DYING, &mdev->flags);
1719	clear_bit(CONFIG_PENDING, &mdev->flags);
1720	wake_up(&mdev->state_wait);
1721
1722	return 0;
1723}
v3.15
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24*/
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_protocol.h"
  40#include "drbd_req.h"
  41
  42static int make_ov_request(struct drbd_device *, int);
  43static int make_resync_request(struct drbd_device *, int);
 
 
 
  44
  45/* endio handlers:
  46 *   drbd_md_io_complete (defined here)
  47 *   drbd_request_endio (defined here)
  48 *   drbd_peer_request_endio (defined here)
  49 *   bm_async_io_complete (defined in drbd_bitmap.c)
  50 *
  51 * For all these callbacks, note the following:
  52 * The callbacks will be called in irq context by the IDE drivers,
  53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  54 * Try to get the locking right :)
  55 *
  56 */
  57
  58
  59/* About the global_state_lock
  60   Each state transition on an device holds a read lock. In case we have
  61   to evaluate the resync after dependencies, we grab a write lock, because
  62   we need stable states on all devices for that.  */
  63rwlock_t global_state_lock;
  64
  65/* used for synchronous meta data and bitmap IO
  66 * submitted by drbd_md_sync_page_io()
  67 */
  68void drbd_md_io_complete(struct bio *bio, int error)
  69{
  70	struct drbd_md_io *md_io;
  71	struct drbd_device *device;
  72
  73	md_io = (struct drbd_md_io *)bio->bi_private;
  74	device = container_of(md_io, struct drbd_device, md_io);
  75
  76	md_io->error = error;
  77
  78	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  79	 * to timeout on the lower level device, and eventually detach from it.
  80	 * If this io completion runs after that timeout expired, this
  81	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
  82	 * During normal operation, this only puts that extra reference
  83	 * down to 1 again.
  84	 * Make sure we first drop the reference, and only then signal
  85	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  86	 * next drbd_md_sync_page_io(), that we trigger the
  87	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  88	 */
  89	drbd_md_put_buffer(device);
  90	md_io->done = 1;
  91	wake_up(&device->misc_wait);
  92	bio_put(bio);
  93	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
  94		put_ldev(device);
  95}
  96
  97/* reads on behalf of the partner,
  98 * "submitted" by the receiver
  99 */
 100static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 101{
 102	unsigned long flags = 0;
 103	struct drbd_peer_device *peer_device = peer_req->peer_device;
 104	struct drbd_device *device = peer_device->device;
 
 105
 106	spin_lock_irqsave(&device->resource->req_lock, flags);
 107	device->read_cnt += peer_req->i.size >> 9;
 108	list_del(&peer_req->w.list);
 109	if (list_empty(&device->read_ee))
 110		wake_up(&device->ee_wait);
 111	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 112		__drbd_chk_io_error(device, DRBD_READ_ERROR);
 113	spin_unlock_irqrestore(&device->resource->req_lock, flags);
 114
 115	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
 116	put_ldev(device);
 117}
 118
 119/* writes on behalf of the partner, or resync writes,
 120 * "submitted" by the receiver, final stage.  */
 121static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 122{
 123	unsigned long flags = 0;
 124	struct drbd_peer_device *peer_device = peer_req->peer_device;
 125	struct drbd_device *device = peer_device->device;
 126	struct drbd_interval i;
 127	int do_wake;
 128	u64 block_id;
 129	int do_al_complete_io;
 130
 131	/* after we moved peer_req to done_ee,
 
 
 132	 * we may no longer access it,
 133	 * it may be freed/reused already!
 134	 * (as soon as we release the req_lock) */
 135	i = peer_req->i;
 136	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 137	block_id = peer_req->block_id;
 138
 139	spin_lock_irqsave(&device->resource->req_lock, flags);
 140	device->writ_cnt += peer_req->i.size >> 9;
 141	list_move_tail(&peer_req->w.list, &device->done_ee);
 142
 143	/*
 144	 * Do not remove from the write_requests tree here: we did not send the
 145	 * Ack yet and did not wake possibly waiting conflicting requests.
 146	 * Removed from the tree from "drbd_process_done_ee" within the
 147	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 148	 * _drbd_clear_done_ee.
 149	 */
 150
 151	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 
 
 
 
 152
 153	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 154		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 155	spin_unlock_irqrestore(&device->resource->req_lock, flags);
 156
 157	if (block_id == ID_SYNCER)
 158		drbd_rs_complete_io(device, i.sector);
 159
 160	if (do_wake)
 161		wake_up(&device->ee_wait);
 162
 163	if (do_al_complete_io)
 164		drbd_al_complete_io(device, &i);
 165
 166	wake_asender(peer_device->connection);
 167	put_ldev(device);
 168}
 169
 170/* writes on behalf of the partner, or resync writes,
 171 * "submitted" by the receiver.
 172 */
 173void drbd_peer_request_endio(struct bio *bio, int error)
 174{
 175	struct drbd_peer_request *peer_req = bio->bi_private;
 176	struct drbd_device *device = peer_req->peer_device->device;
 177	int uptodate = bio_flagged(bio, BIO_UPTODATE);
 178	int is_write = bio_data_dir(bio) == WRITE;
 179
 180	if (error && __ratelimit(&drbd_ratelimit_state))
 181		drbd_warn(device, "%s: error=%d s=%llus\n",
 182				is_write ? "write" : "read", error,
 183				(unsigned long long)peer_req->i.sector);
 184	if (!error && !uptodate) {
 185		if (__ratelimit(&drbd_ratelimit_state))
 186			drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
 187					is_write ? "write" : "read",
 188					(unsigned long long)peer_req->i.sector);
 189		/* strange behavior of some lower level drivers...
 190		 * fail the request by clearing the uptodate flag,
 191		 * but do not return any error?! */
 192		error = -EIO;
 193	}
 194
 195	if (error)
 196		set_bit(__EE_WAS_ERROR, &peer_req->flags);
 197
 198	bio_put(bio); /* no need for the bio anymore */
 199	if (atomic_dec_and_test(&peer_req->pending_bios)) {
 200		if (is_write)
 201			drbd_endio_write_sec_final(peer_req);
 202		else
 203			drbd_endio_read_sec_final(peer_req);
 204	}
 205}
 206
 207/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 208 */
 209void drbd_request_endio(struct bio *bio, int error)
 210{
 211	unsigned long flags;
 212	struct drbd_request *req = bio->bi_private;
 213	struct drbd_device *device = req->device;
 214	struct bio_and_error m;
 215	enum drbd_req_event what;
 216	int uptodate = bio_flagged(bio, BIO_UPTODATE);
 217
 218	if (!error && !uptodate) {
 219		drbd_warn(device, "p %s: setting error to -EIO\n",
 220			 bio_data_dir(bio) == WRITE ? "write" : "read");
 221		/* strange behavior of some lower level drivers...
 222		 * fail the request by clearing the uptodate flag,
 223		 * but do not return any error?! */
 224		error = -EIO;
 225	}
 226
 227
 228	/* If this request was aborted locally before,
 229	 * but now was completed "successfully",
 230	 * chances are that this caused arbitrary data corruption.
 231	 *
 232	 * "aborting" requests, or force-detaching the disk, is intended for
 233	 * completely blocked/hung local backing devices which do no longer
 234	 * complete requests at all, not even do error completions.  In this
 235	 * situation, usually a hard-reset and failover is the only way out.
 236	 *
 237	 * By "aborting", basically faking a local error-completion,
 238	 * we allow for a more graceful swichover by cleanly migrating services.
 239	 * Still the affected node has to be rebooted "soon".
 240	 *
 241	 * By completing these requests, we allow the upper layers to re-use
 242	 * the associated data pages.
 243	 *
 244	 * If later the local backing device "recovers", and now DMAs some data
 245	 * from disk into the original request pages, in the best case it will
 246	 * just put random data into unused pages; but typically it will corrupt
 247	 * meanwhile completely unrelated data, causing all sorts of damage.
 248	 *
 249	 * Which means delayed successful completion,
 250	 * especially for READ requests,
 251	 * is a reason to panic().
 252	 *
 253	 * We assume that a delayed *error* completion is OK,
 254	 * though we still will complain noisily about it.
 255	 */
 256	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 257		if (__ratelimit(&drbd_ratelimit_state))
 258			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 259
 260		if (!error)
 261			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
 262	}
 263
 264	/* to avoid recursion in __req_mod */
 265	if (unlikely(error)) {
 266		what = (bio_data_dir(bio) == WRITE)
 267			? WRITE_COMPLETED_WITH_ERROR
 268			: (bio_rw(bio) == READ)
 269			  ? READ_COMPLETED_WITH_ERROR
 270			  : READ_AHEAD_COMPLETED_WITH_ERROR;
 271	} else
 272		what = COMPLETED_OK;
 273
 274	bio_put(req->private_bio);
 275	req->private_bio = ERR_PTR(error);
 276
 277	/* not req_mod(), we need irqsave here! */
 278	spin_lock_irqsave(&device->resource->req_lock, flags);
 279	__req_mod(req, what, &m);
 280	spin_unlock_irqrestore(&device->resource->req_lock, flags);
 281	put_ldev(device);
 282
 283	if (m.bio)
 284		complete_master_bio(device, &m);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 285}
 286
 287void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
 288{
 289	struct hash_desc desc;
 290	struct scatterlist sg;
 291	struct page *page = peer_req->pages;
 292	struct page *tmp;
 293	unsigned len;
 294
 295	desc.tfm = tfm;
 296	desc.flags = 0;
 297
 298	sg_init_table(&sg, 1);
 299	crypto_hash_init(&desc);
 300
 301	while ((tmp = page_chain_next(page))) {
 302		/* all but the last page will be fully used */
 303		sg_set_page(&sg, page, PAGE_SIZE, 0);
 304		crypto_hash_update(&desc, &sg, sg.length);
 305		page = tmp;
 306	}
 307	/* and now the last, possibly only partially used page */
 308	len = peer_req->i.size & (PAGE_SIZE - 1);
 309	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 310	crypto_hash_update(&desc, &sg, sg.length);
 311	crypto_hash_final(&desc, digest);
 312}
 313
 314void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
 315{
 316	struct hash_desc desc;
 317	struct scatterlist sg;
 318	struct bio_vec bvec;
 319	struct bvec_iter iter;
 320
 321	desc.tfm = tfm;
 322	desc.flags = 0;
 323
 324	sg_init_table(&sg, 1);
 325	crypto_hash_init(&desc);
 326
 327	bio_for_each_segment(bvec, bio, iter) {
 328		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
 329		crypto_hash_update(&desc, &sg, sg.length);
 330	}
 331	crypto_hash_final(&desc, digest);
 332}
 333
 334/* MAYBE merge common code with w_e_end_ov_req */
 335static int w_e_send_csum(struct drbd_work *w, int cancel)
 336{
 337	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 338	struct drbd_peer_device *peer_device = peer_req->peer_device;
 339	struct drbd_device *device = peer_device->device;
 340	int digest_size;
 341	void *digest;
 342	int err = 0;
 
 
 343
 344	if (unlikely(cancel))
 345		goto out;
 346
 347	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 348		goto out;
 349
 350	digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
 351	digest = kmalloc(digest_size, GFP_NOIO);
 352	if (digest) {
 353		sector_t sector = peer_req->i.sector;
 354		unsigned int size = peer_req->i.size;
 355		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 356		/* Free peer_req and pages before send.
 357		 * In case we block on congestion, we could otherwise run into
 358		 * some distributed deadlock, if the other side blocks on
 359		 * congestion as well, because our receiver blocks in
 360		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 361		drbd_free_peer_req(device, peer_req);
 362		peer_req = NULL;
 363		inc_rs_pending(device);
 364		err = drbd_send_drequest_csum(peer_device, sector, size,
 365					      digest, digest_size,
 366					      P_CSUM_RS_REQUEST);
 367		kfree(digest);
 368	} else {
 369		drbd_err(device, "kmalloc() of digest failed.\n");
 370		err = -ENOMEM;
 371	}
 372
 373out:
 374	if (peer_req)
 375		drbd_free_peer_req(device, peer_req);
 376
 377	if (unlikely(err))
 378		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 379	return err;
 380}
 381
 382#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
 383
 384static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 385{
 386	struct drbd_device *device = peer_device->device;
 387	struct drbd_peer_request *peer_req;
 388
 389	if (!get_ldev(device))
 390		return -EIO;
 391
 392	if (drbd_rs_should_slow_down(device, sector))
 393		goto defer;
 394
 395	/* GFP_TRY, because if there is no memory available right now, this may
 396	 * be rescheduled for later. It is "only" background resync, after all. */
 397	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 398				       size, GFP_TRY);
 399	if (!peer_req)
 400		goto defer;
 401
 402	peer_req->w.cb = w_e_send_csum;
 403	spin_lock_irq(&device->resource->req_lock);
 404	list_add(&peer_req->w.list, &device->read_ee);
 405	spin_unlock_irq(&device->resource->req_lock);
 406
 407	atomic_add(size >> 9, &device->rs_sect_ev);
 408	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
 409		return 0;
 410
 411	/* If it failed because of ENOMEM, retry should help.  If it failed
 412	 * because bio_add_page failed (probably broken lower level driver),
 413	 * retry may or may not help.
 414	 * If it does not, you may need to force disconnect. */
 415	spin_lock_irq(&device->resource->req_lock);
 416	list_del(&peer_req->w.list);
 417	spin_unlock_irq(&device->resource->req_lock);
 418
 419	drbd_free_peer_req(device, peer_req);
 420defer:
 421	put_ldev(device);
 422	return -EAGAIN;
 423}
 424
 425int w_resync_timer(struct drbd_work *w, int cancel)
 426{
 427	struct drbd_device *device =
 428		container_of(w, struct drbd_device, resync_work);
 429
 430	switch (device->state.conn) {
 431	case C_VERIFY_S:
 432		make_ov_request(device, cancel);
 433		break;
 434	case C_SYNC_TARGET:
 435		make_resync_request(device, cancel);
 436		break;
 437	}
 438
 439	return 0;
 440}
 441
 442void resync_timer_fn(unsigned long data)
 443{
 444	struct drbd_device *device = (struct drbd_device *) data;
 445
 446	if (list_empty(&device->resync_work.list))
 447		drbd_queue_work(&first_peer_device(device)->connection->sender_work,
 448				&device->resync_work);
 449}
 450
 451static void fifo_set(struct fifo_buffer *fb, int value)
 452{
 453	int i;
 454
 455	for (i = 0; i < fb->size; i++)
 456		fb->values[i] = value;
 457}
 458
 459static int fifo_push(struct fifo_buffer *fb, int value)
 460{
 461	int ov;
 462
 463	ov = fb->values[fb->head_index];
 464	fb->values[fb->head_index++] = value;
 465
 466	if (fb->head_index >= fb->size)
 467		fb->head_index = 0;
 468
 469	return ov;
 470}
 471
 472static void fifo_add_val(struct fifo_buffer *fb, int value)
 473{
 474	int i;
 475
 476	for (i = 0; i < fb->size; i++)
 477		fb->values[i] += value;
 478}
 479
 480struct fifo_buffer *fifo_alloc(int fifo_size)
 481{
 482	struct fifo_buffer *fb;
 483
 484	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
 485	if (!fb)
 486		return NULL;
 487
 488	fb->head_index = 0;
 489	fb->size = fifo_size;
 490	fb->total = 0;
 491
 492	return fb;
 493}
 494
 495static int drbd_rs_controller(struct drbd_device *device)
 496{
 497	struct disk_conf *dc;
 498	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 499	unsigned int want;     /* The number of sectors we want in the proxy */
 500	int req_sect; /* Number of sectors to request in this turn */
 501	int correction; /* Number of sectors more we need in the proxy*/
 502	int cps; /* correction per invocation of drbd_rs_controller() */
 503	int steps; /* Number of time steps to plan ahead */
 504	int curr_corr;
 505	int max_sect;
 506	struct fifo_buffer *plan;
 507
 508	sect_in = atomic_xchg(&device->rs_sect_in, 0); /* Number of sectors that came in */
 509	device->rs_in_flight -= sect_in;
 510
 511	dc = rcu_dereference(device->ldev->disk_conf);
 512	plan = rcu_dereference(device->rs_plan_s);
 513
 514	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 515
 516	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 517		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 518	} else { /* normal path */
 519		want = dc->c_fill_target ? dc->c_fill_target :
 520			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 521	}
 522
 523	correction = want - device->rs_in_flight - plan->total;
 524
 525	/* Plan ahead */
 526	cps = correction / steps;
 527	fifo_add_val(plan, cps);
 528	plan->total += cps * steps;
 529
 530	/* What we do in this step */
 531	curr_corr = fifo_push(plan, 0);
 532	plan->total -= curr_corr;
 
 533
 534	req_sect = sect_in + curr_corr;
 535	if (req_sect < 0)
 536		req_sect = 0;
 537
 538	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 539	if (req_sect > max_sect)
 540		req_sect = max_sect;
 541
 542	/*
 543	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 544		 sect_in, device->rs_in_flight, want, correction,
 545		 steps, cps, device->rs_planed, curr_corr, req_sect);
 546	*/
 547
 548	return req_sect;
 549}
 550
 551static int drbd_rs_number_requests(struct drbd_device *device)
 552{
 553	int number;
 554
 555	rcu_read_lock();
 556	if (rcu_dereference(device->rs_plan_s)->size) {
 557		number = drbd_rs_controller(device) >> (BM_BLOCK_SHIFT - 9);
 558		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 559	} else {
 560		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 561		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 562	}
 563	rcu_read_unlock();
 564
 565	/* ignore the amount of pending requests, the resync controller should
 566	 * throttle down to incoming reply rate soon enough anyways. */
 567	return number;
 568}
 569
 570static int make_resync_request(struct drbd_device *device, int cancel)
 
 571{
 572	unsigned long bit;
 573	sector_t sector;
 574	const sector_t capacity = drbd_get_capacity(device->this_bdev);
 575	int max_bio_size;
 576	int number, rollback_i, size;
 577	int align, queued, sndbuf;
 578	int i = 0;
 579
 580	if (unlikely(cancel))
 581		return 0;
 582
 583	if (device->rs_total == 0) {
 584		/* empty resync? */
 585		drbd_resync_finished(device);
 586		return 0;
 587	}
 588
 589	if (!get_ldev(device)) {
 590		/* Since we only need to access device->rsync a
 591		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
 592		   to continue resync with a broken disk makes no sense at
 593		   all */
 594		drbd_err(device, "Disk broke down during resync!\n");
 595		return 0;
 596	}
 597
 598	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 599	number = drbd_rs_number_requests(device);
 600	if (number == 0)
 601		goto requeue;
 602
 603	for (i = 0; i < number; i++) {
 604		/* Stop generating RS requests, when half of the send buffer is filled */
 605		mutex_lock(&first_peer_device(device)->connection->data.mutex);
 606		if (first_peer_device(device)->connection->data.socket) {
 607			queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued;
 608			sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf;
 609		} else {
 610			queued = 1;
 611			sndbuf = 0;
 612		}
 613		mutex_unlock(&first_peer_device(device)->connection->data.mutex);
 614		if (queued > sndbuf / 2)
 615			goto requeue;
 616
 617next_sector:
 618		size = BM_BLOCK_SIZE;
 619		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 620
 621		if (bit == DRBD_END_OF_BITMAP) {
 622			device->bm_resync_fo = drbd_bm_bits(device);
 623			put_ldev(device);
 624			return 0;
 625		}
 626
 627		sector = BM_BIT_TO_SECT(bit);
 628
 629		if (drbd_rs_should_slow_down(device, sector) ||
 630		    drbd_try_rs_begin_io(device, sector)) {
 631			device->bm_resync_fo = bit;
 632			goto requeue;
 633		}
 634		device->bm_resync_fo = bit + 1;
 635
 636		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 637			drbd_rs_complete_io(device, sector);
 638			goto next_sector;
 639		}
 640
 641#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 642		/* try to find some adjacent bits.
 643		 * we stop if we have already the maximum req size.
 644		 *
 645		 * Additionally always align bigger requests, in order to
 646		 * be prepared for all stripe sizes of software RAIDs.
 647		 */
 648		align = 1;
 649		rollback_i = i;
 650		for (;;) {
 651			if (size + BM_BLOCK_SIZE > max_bio_size)
 652				break;
 653
 654			/* Be always aligned */
 655			if (sector & ((1<<(align+3))-1))
 656				break;
 657
 658			/* do not cross extent boundaries */
 659			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 660				break;
 661			/* now, is it actually dirty, after all?
 662			 * caution, drbd_bm_test_bit is tri-state for some
 663			 * obscure reason; ( b == 0 ) would get the out-of-band
 664			 * only accidentally right because of the "oddly sized"
 665			 * adjustment below */
 666			if (drbd_bm_test_bit(device, bit+1) != 1)
 667				break;
 668			bit++;
 669			size += BM_BLOCK_SIZE;
 670			if ((BM_BLOCK_SIZE << align) <= size)
 671				align++;
 672			i++;
 673		}
 674		/* if we merged some,
 675		 * reset the offset to start the next drbd_bm_find_next from */
 676		if (size > BM_BLOCK_SIZE)
 677			device->bm_resync_fo = bit + 1;
 678#endif
 679
 680		/* adjust very last sectors, in case we are oddly sized */
 681		if (sector + (size>>9) > capacity)
 682			size = (capacity-sector)<<9;
 683		if (first_peer_device(device)->connection->agreed_pro_version >= 89 &&
 684		    first_peer_device(device)->connection->csums_tfm) {
 685			switch (read_for_csum(first_peer_device(device), sector, size)) {
 686			case -EIO: /* Disk failure */
 687				put_ldev(device);
 688				return -EIO;
 689			case -EAGAIN: /* allocation failed, or ldev busy */
 690				drbd_rs_complete_io(device, sector);
 691				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 692				i = rollback_i;
 693				goto requeue;
 694			case 0:
 695				/* everything ok */
 696				break;
 697			default:
 698				BUG();
 699			}
 700		} else {
 701			int err;
 702
 703			inc_rs_pending(device);
 704			err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST,
 705						 sector, size, ID_SYNCER);
 706			if (err) {
 707				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 708				dec_rs_pending(device);
 709				put_ldev(device);
 710				return err;
 711			}
 712		}
 713	}
 714
 715	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 716		/* last syncer _request_ was sent,
 717		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 718		 * next sync group will resume), as soon as we receive the last
 719		 * resync data block, and the last bit is cleared.
 720		 * until then resync "work" is "inactive" ...
 721		 */
 722		put_ldev(device);
 723		return 0;
 724	}
 725
 726 requeue:
 727	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 728	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 729	put_ldev(device);
 730	return 0;
 731}
 732
 733static int make_ov_request(struct drbd_device *device, int cancel)
 734{
 735	int number, i, size;
 736	sector_t sector;
 737	const sector_t capacity = drbd_get_capacity(device->this_bdev);
 738	bool stop_sector_reached = false;
 739
 740	if (unlikely(cancel))
 741		return 1;
 742
 743	number = drbd_rs_number_requests(device);
 744
 745	sector = device->ov_position;
 746	for (i = 0; i < number; i++) {
 747		if (sector >= capacity)
 748			return 1;
 749
 750		/* We check for "finished" only in the reply path:
 751		 * w_e_end_ov_reply().
 752		 * We need to send at least one request out. */
 753		stop_sector_reached = i > 0
 754			&& verify_can_do_stop_sector(device)
 755			&& sector >= device->ov_stop_sector;
 756		if (stop_sector_reached)
 757			break;
 758
 759		size = BM_BLOCK_SIZE;
 760
 761		if (drbd_rs_should_slow_down(device, sector) ||
 762		    drbd_try_rs_begin_io(device, sector)) {
 763			device->ov_position = sector;
 764			goto requeue;
 765		}
 766
 767		if (sector + (size>>9) > capacity)
 768			size = (capacity-sector)<<9;
 769
 770		inc_rs_pending(device);
 771		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 772			dec_rs_pending(device);
 773			return 0;
 774		}
 775		sector += BM_SECT_PER_BIT;
 776	}
 777	device->ov_position = sector;
 778
 779 requeue:
 780	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 781	if (i == 0 || !stop_sector_reached)
 782		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 783	return 1;
 784}
 785
 786int w_ov_finished(struct drbd_work *w, int cancel)
 
 787{
 788	struct drbd_device_work *dw =
 789		container_of(w, struct drbd_device_work, w);
 790	struct drbd_device *device = dw->device;
 791	kfree(dw);
 792	ov_out_of_sync_print(device);
 793	drbd_resync_finished(device);
 794
 795	return 0;
 796}
 797
 798static int w_resync_finished(struct drbd_work *w, int cancel)
 799{
 800	struct drbd_device_work *dw =
 801		container_of(w, struct drbd_device_work, w);
 802	struct drbd_device *device = dw->device;
 803	kfree(dw);
 
 
 
 
 
 
 
 804
 805	drbd_resync_finished(device);
 
 
 
 
 806
 807	return 0;
 808}
 809
 810static void ping_peer(struct drbd_device *device)
 811{
 812	struct drbd_connection *connection = first_peer_device(device)->connection;
 813
 814	clear_bit(GOT_PING_ACK, &connection->flags);
 815	request_ping(connection);
 816	wait_event(connection->ping_wait,
 817		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 
 
 
 
 
 
 
 818}
 819
 820int drbd_resync_finished(struct drbd_device *device)
 821{
 822	unsigned long db, dt, dbdt;
 823	unsigned long n_oos;
 824	union drbd_state os, ns;
 825	struct drbd_device_work *dw;
 826	char *khelper_cmd = NULL;
 827	int verify_done = 0;
 828
 829	/* Remove all elements from the resync LRU. Since future actions
 830	 * might set bits in the (main) bitmap, then the entries in the
 831	 * resync LRU would be wrong. */
 832	if (drbd_rs_del_all(device)) {
 833		/* In case this is not possible now, most probably because
 834		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 835		 * queue (or even the read operations for those packets
 836		 * is not finished by now).   Retry in 100ms. */
 837
 838		schedule_timeout_interruptible(HZ / 10);
 839		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 840		if (dw) {
 841			dw->w.cb = w_resync_finished;
 842			dw->device = device;
 843			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
 844					&dw->w);
 845			return 1;
 846		}
 847		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 848	}
 849
 850	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 851	if (dt <= 0)
 852		dt = 1;
 853
 854	db = device->rs_total;
 855	/* adjust for verify start and stop sectors, respective reached position */
 856	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 857		db -= device->ov_left;
 858
 859	dbdt = Bit2KB(db/dt);
 860	device->rs_paused /= HZ;
 861
 862	if (!get_ldev(device))
 863		goto out;
 864
 865	ping_peer(device);
 866
 867	spin_lock_irq(&device->resource->req_lock);
 868	os = drbd_read_state(device);
 869
 870	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 871
 872	/* This protects us against multiple calls (that can happen in the presence
 873	   of application IO), and against connectivity loss just before we arrive here. */
 874	if (os.conn <= C_CONNECTED)
 875		goto out_unlock;
 876
 877	ns = os;
 878	ns.conn = C_CONNECTED;
 879
 880	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 881	     verify_done ? "Online verify" : "Resync",
 882	     dt + device->rs_paused, device->rs_paused, dbdt);
 883
 884	n_oos = drbd_bm_total_weight(device);
 885
 886	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 887		if (n_oos) {
 888			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 889			      n_oos, Bit2KB(1));
 890			khelper_cmd = "out-of-sync";
 891		}
 892	} else {
 893		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 894
 895		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 896			khelper_cmd = "after-resync-target";
 897
 898		if (first_peer_device(device)->connection->csums_tfm && device->rs_total) {
 899			const unsigned long s = device->rs_same_csum;
 900			const unsigned long t = device->rs_total;
 901			const int ratio =
 902				(t == 0)     ? 0 :
 903			(t < 100000) ? ((s*100)/t) : (s/(t/100));
 904			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 905			     "transferred %luK total %luK\n",
 906			     ratio,
 907			     Bit2KB(device->rs_same_csum),
 908			     Bit2KB(device->rs_total - device->rs_same_csum),
 909			     Bit2KB(device->rs_total));
 910		}
 911	}
 912
 913	if (device->rs_failed) {
 914		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 915
 916		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 917			ns.disk = D_INCONSISTENT;
 918			ns.pdsk = D_UP_TO_DATE;
 919		} else {
 920			ns.disk = D_UP_TO_DATE;
 921			ns.pdsk = D_INCONSISTENT;
 922		}
 923	} else {
 924		ns.disk = D_UP_TO_DATE;
 925		ns.pdsk = D_UP_TO_DATE;
 926
 927		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 928			if (device->p_uuid) {
 929				int i;
 930				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 931					_drbd_uuid_set(device, i, device->p_uuid[i]);
 932				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 933				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 934			} else {
 935				drbd_err(device, "device->p_uuid is NULL! BUG\n");
 936			}
 937		}
 938
 939		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 940			/* for verify runs, we don't update uuids here,
 941			 * so there would be nothing to report. */
 942			drbd_uuid_set_bm(device, 0UL);
 943			drbd_print_uuids(device, "updated UUIDs");
 944			if (device->p_uuid) {
 945				/* Now the two UUID sets are equal, update what we
 946				 * know of the peer. */
 947				int i;
 948				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 949					device->p_uuid[i] = device->ldev->md.uuid[i];
 950			}
 951		}
 952	}
 953
 954	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
 955out_unlock:
 956	spin_unlock_irq(&device->resource->req_lock);
 957	put_ldev(device);
 958out:
 959	device->rs_total  = 0;
 960	device->rs_failed = 0;
 961	device->rs_paused = 0;
 962
 963	/* reset start sector, if we reached end of device */
 964	if (verify_done && device->ov_left == 0)
 965		device->ov_start_sector = 0;
 966
 967	drbd_md_sync(device);
 968
 969	if (khelper_cmd)
 970		drbd_khelper(device, khelper_cmd);
 971
 972	return 1;
 973}
 974
 975/* helper */
 976static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
 977{
 978	if (drbd_peer_req_has_active_page(peer_req)) {
 979		/* This might happen if sendpage() has not finished */
 980		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
 981		atomic_add(i, &device->pp_in_use_by_net);
 982		atomic_sub(i, &device->pp_in_use);
 983		spin_lock_irq(&device->resource->req_lock);
 984		list_add_tail(&peer_req->w.list, &device->net_ee);
 985		spin_unlock_irq(&device->resource->req_lock);
 986		wake_up(&drbd_pp_wait);
 987	} else
 988		drbd_free_peer_req(device, peer_req);
 989}
 990
 991/**
 992 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 993 * @device:	DRBD device.
 994 * @w:		work object.
 995 * @cancel:	The connection will be closed anyways
 996 */
 997int w_e_end_data_req(struct drbd_work *w, int cancel)
 998{
 999	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1000	struct drbd_peer_device *peer_device = peer_req->peer_device;
1001	struct drbd_device *device = peer_device->device;
1002	int err;
1003
1004	if (unlikely(cancel)) {
1005		drbd_free_peer_req(device, peer_req);
1006		dec_unacked(device);
1007		return 0;
1008	}
1009
1010	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1011		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1012	} else {
1013		if (__ratelimit(&drbd_ratelimit_state))
1014			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1015			    (unsigned long long)peer_req->i.sector);
1016
1017		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1018	}
1019
1020	dec_unacked(device);
1021
1022	move_to_net_ee_or_free(device, peer_req);
1023
1024	if (unlikely(err))
1025		drbd_err(device, "drbd_send_block() failed\n");
1026	return err;
1027}
1028
1029/**
1030 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
 
1031 * @w:		work object.
1032 * @cancel:	The connection will be closed anyways
1033 */
1034int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1035{
1036	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1037	struct drbd_peer_device *peer_device = peer_req->peer_device;
1038	struct drbd_device *device = peer_device->device;
1039	int err;
1040
1041	if (unlikely(cancel)) {
1042		drbd_free_peer_req(device, peer_req);
1043		dec_unacked(device);
1044		return 0;
1045	}
1046
1047	if (get_ldev_if_state(device, D_FAILED)) {
1048		drbd_rs_complete_io(device, peer_req->i.sector);
1049		put_ldev(device);
1050	}
1051
1052	if (device->state.conn == C_AHEAD) {
1053		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1054	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1055		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1056			inc_rs_pending(device);
1057			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1058		} else {
1059			if (__ratelimit(&drbd_ratelimit_state))
1060				drbd_err(device, "Not sending RSDataReply, "
1061				    "partner DISKLESS!\n");
1062			err = 0;
1063		}
1064	} else {
1065		if (__ratelimit(&drbd_ratelimit_state))
1066			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1067			    (unsigned long long)peer_req->i.sector);
1068
1069		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1070
1071		/* update resync data with failure */
1072		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1073	}
1074
1075	dec_unacked(device);
1076
1077	move_to_net_ee_or_free(device, peer_req);
1078
1079	if (unlikely(err))
1080		drbd_err(device, "drbd_send_block() failed\n");
1081	return err;
1082}
1083
1084int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1085{
1086	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1087	struct drbd_peer_device *peer_device = peer_req->peer_device;
1088	struct drbd_device *device = peer_device->device;
1089	struct digest_info *di;
1090	int digest_size;
1091	void *digest = NULL;
1092	int err, eq = 0;
1093
1094	if (unlikely(cancel)) {
1095		drbd_free_peer_req(device, peer_req);
1096		dec_unacked(device);
1097		return 0;
1098	}
1099
1100	if (get_ldev(device)) {
1101		drbd_rs_complete_io(device, peer_req->i.sector);
1102		put_ldev(device);
1103	}
1104
1105	di = peer_req->digest;
1106
1107	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1108		/* quick hack to try to avoid a race against reconfiguration.
1109		 * a real fix would be much more involved,
1110		 * introducing more locking mechanisms */
1111		if (peer_device->connection->csums_tfm) {
1112			digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1113			D_ASSERT(device, digest_size == di->digest_size);
1114			digest = kmalloc(digest_size, GFP_NOIO);
1115		}
1116		if (digest) {
1117			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1118			eq = !memcmp(digest, di->digest, digest_size);
1119			kfree(digest);
1120		}
1121
1122		if (eq) {
1123			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1124			/* rs_same_csums unit is BM_BLOCK_SIZE */
1125			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1126			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1127		} else {
1128			inc_rs_pending(device);
1129			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1130			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1131			kfree(di);
1132			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1133		}
1134	} else {
1135		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1136		if (__ratelimit(&drbd_ratelimit_state))
1137			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1138	}
1139
1140	dec_unacked(device);
1141	move_to_net_ee_or_free(device, peer_req);
1142
1143	if (unlikely(err))
1144		drbd_err(device, "drbd_send_block/ack() failed\n");
1145	return err;
1146}
1147
1148int w_e_end_ov_req(struct drbd_work *w, int cancel)
 
1149{
1150	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1151	struct drbd_peer_device *peer_device = peer_req->peer_device;
1152	struct drbd_device *device = peer_device->device;
1153	sector_t sector = peer_req->i.sector;
1154	unsigned int size = peer_req->i.size;
1155	int digest_size;
1156	void *digest;
1157	int err = 0;
1158
1159	if (unlikely(cancel))
1160		goto out;
1161
1162	digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1163	digest = kmalloc(digest_size, GFP_NOIO);
1164	if (!digest) {
1165		err = 1;	/* terminate the connection in case the allocation failed */
1166		goto out;
1167	}
1168
1169	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1170		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1171	else
1172		memset(digest, 0, digest_size);
1173
1174	/* Free e and pages before send.
1175	 * In case we block on congestion, we could otherwise run into
1176	 * some distributed deadlock, if the other side blocks on
1177	 * congestion as well, because our receiver blocks in
1178	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1179	drbd_free_peer_req(device, peer_req);
1180	peer_req = NULL;
1181	inc_rs_pending(device);
1182	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1183	if (err)
1184		dec_rs_pending(device);
 
 
1185	kfree(digest);
1186
1187out:
1188	if (peer_req)
1189		drbd_free_peer_req(device, peer_req);
1190	dec_unacked(device);
1191	return err;
1192}
1193
1194void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1195{
1196	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1197		device->ov_last_oos_size += size>>9;
1198	} else {
1199		device->ov_last_oos_start = sector;
1200		device->ov_last_oos_size = size>>9;
1201	}
1202	drbd_set_out_of_sync(device, sector, size);
1203}
1204
1205int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1206{
1207	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1208	struct drbd_peer_device *peer_device = peer_req->peer_device;
1209	struct drbd_device *device = peer_device->device;
1210	struct digest_info *di;
1211	void *digest;
1212	sector_t sector = peer_req->i.sector;
1213	unsigned int size = peer_req->i.size;
1214	int digest_size;
1215	int err, eq = 0;
1216	bool stop_sector_reached = false;
1217
1218	if (unlikely(cancel)) {
1219		drbd_free_peer_req(device, peer_req);
1220		dec_unacked(device);
1221		return 0;
1222	}
1223
1224	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1225	 * the resync lru has been cleaned up already */
1226	if (get_ldev(device)) {
1227		drbd_rs_complete_io(device, peer_req->i.sector);
1228		put_ldev(device);
1229	}
1230
1231	di = peer_req->digest;
1232
1233	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1234		digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1235		digest = kmalloc(digest_size, GFP_NOIO);
1236		if (digest) {
1237			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1238
1239			D_ASSERT(device, digest_size == di->digest_size);
1240			eq = !memcmp(digest, di->digest, digest_size);
1241			kfree(digest);
1242		}
1243	}
1244
1245	/* Free peer_req and pages before send.
1246	 * In case we block on congestion, we could otherwise run into
1247	 * some distributed deadlock, if the other side blocks on
1248	 * congestion as well, because our receiver blocks in
1249	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1250	drbd_free_peer_req(device, peer_req);
1251	if (!eq)
1252		drbd_ov_out_of_sync_found(device, sector, size);
1253	else
1254		ov_out_of_sync_print(device);
1255
1256	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1257			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1258
1259	dec_unacked(device);
1260
1261	--device->ov_left;
1262
1263	/* let's advance progress step marks only for every other megabyte */
1264	if ((device->ov_left & 0x200) == 0x200)
1265		drbd_advance_rs_marks(device, device->ov_left);
1266
1267	stop_sector_reached = verify_can_do_stop_sector(device) &&
1268		(sector + (size>>9)) >= device->ov_stop_sector;
1269
1270	if (device->ov_left == 0 || stop_sector_reached) {
1271		ov_out_of_sync_print(device);
1272		drbd_resync_finished(device);
1273	}
1274
1275	return err;
1276}
1277
1278/* FIXME
1279 * We need to track the number of pending barrier acks,
1280 * and to be able to wait for them.
1281 * See also comment in drbd_adm_attach before drbd_suspend_io.
1282 */
1283static int drbd_send_barrier(struct drbd_connection *connection)
1284{
1285	struct p_barrier *p;
1286	struct drbd_socket *sock;
1287
1288	sock = &connection->data;
1289	p = conn_prepare_command(connection, sock);
1290	if (!p)
1291		return -EIO;
1292	p->barrier = connection->send.current_epoch_nr;
1293	p->pad = 0;
1294	connection->send.current_epoch_writes = 0;
1295
1296	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1297}
1298
1299int w_send_write_hint(struct drbd_work *w, int cancel)
1300{
1301	struct drbd_device *device =
1302		container_of(w, struct drbd_device, unplug_work);
1303	struct drbd_socket *sock;
 
 
 
 
 
 
 
 
 
 
 
 
1304
1305	if (cancel)
1306		return 0;
1307	sock = &first_peer_device(device)->connection->data;
1308	if (!drbd_prepare_command(first_peer_device(device), sock))
1309		return -EIO;
1310	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1311}
 
 
1312
1313static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1314{
1315	if (!connection->send.seen_any_write_yet) {
1316		connection->send.seen_any_write_yet = true;
1317		connection->send.current_epoch_nr = epoch;
1318		connection->send.current_epoch_writes = 0;
1319	}
1320}
1321
1322static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1323{
1324	/* re-init if first write on this connection */
1325	if (!connection->send.seen_any_write_yet)
1326		return;
1327	if (connection->send.current_epoch_nr != epoch) {
1328		if (connection->send.current_epoch_writes)
1329			drbd_send_barrier(connection);
1330		connection->send.current_epoch_nr = epoch;
1331	}
1332}
1333
1334int w_send_out_of_sync(struct drbd_work *w, int cancel)
1335{
1336	struct drbd_request *req = container_of(w, struct drbd_request, w);
1337	struct drbd_device *device = req->device;
1338	struct drbd_connection *connection = first_peer_device(device)->connection;
1339	int err;
1340
1341	if (unlikely(cancel)) {
1342		req_mod(req, SEND_CANCELED);
1343		return 0;
1344	}
1345
1346	/* this time, no connection->send.current_epoch_writes++;
1347	 * If it was sent, it was the closing barrier for the last
1348	 * replicated epoch, before we went into AHEAD mode.
1349	 * No more barriers will be sent, until we leave AHEAD mode again. */
1350	maybe_send_barrier(connection, req->epoch);
1351
1352	err = drbd_send_out_of_sync(first_peer_device(device), req);
1353	req_mod(req, OOS_HANDED_TO_NETWORK);
1354
1355	return err;
1356}
1357
1358/**
1359 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
 
1360 * @w:		work object.
1361 * @cancel:	The connection will be closed anyways
1362 */
1363int w_send_dblock(struct drbd_work *w, int cancel)
1364{
1365	struct drbd_request *req = container_of(w, struct drbd_request, w);
1366	struct drbd_device *device = req->device;
1367	struct drbd_connection *connection = first_peer_device(device)->connection;
1368	int err;
1369
1370	if (unlikely(cancel)) {
1371		req_mod(req, SEND_CANCELED);
1372		return 0;
1373	}
1374
1375	re_init_if_first_write(connection, req->epoch);
1376	maybe_send_barrier(connection, req->epoch);
1377	connection->send.current_epoch_writes++;
1378
1379	err = drbd_send_dblock(first_peer_device(device), req);
1380	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1381
1382	return err;
1383}
1384
1385/**
1386 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
 
1387 * @w:		work object.
1388 * @cancel:	The connection will be closed anyways
1389 */
1390int w_send_read_req(struct drbd_work *w, int cancel)
1391{
1392	struct drbd_request *req = container_of(w, struct drbd_request, w);
1393	struct drbd_device *device = req->device;
1394	struct drbd_connection *connection = first_peer_device(device)->connection;
1395	int err;
1396
1397	if (unlikely(cancel)) {
1398		req_mod(req, SEND_CANCELED);
1399		return 0;
1400	}
1401
1402	/* Even read requests may close a write epoch,
1403	 * if there was any yet. */
1404	maybe_send_barrier(connection, req->epoch);
1405
1406	err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size,
1407				 (unsigned long)req);
 
 
 
 
 
1408
1409	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1410
1411	return err;
1412}
1413
1414int w_restart_disk_io(struct drbd_work *w, int cancel)
1415{
1416	struct drbd_request *req = container_of(w, struct drbd_request, w);
1417	struct drbd_device *device = req->device;
1418
1419	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1420		drbd_al_begin_io(device, &req->i, false);
 
 
 
 
1421
1422	drbd_req_make_private_bio(req, req->master_bio);
1423	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1424	generic_make_request(req->private_bio);
1425
1426	return 0;
1427}
1428
1429static int _drbd_may_sync_now(struct drbd_device *device)
1430{
1431	struct drbd_device *odev = device;
1432	int resync_after;
1433
1434	while (1) {
1435		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1436			return 1;
1437		rcu_read_lock();
1438		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1439		rcu_read_unlock();
1440		if (resync_after == -1)
1441			return 1;
1442		odev = minor_to_device(resync_after);
1443		if (!odev)
1444			return 1;
 
 
1445		if ((odev->state.conn >= C_SYNC_SOURCE &&
1446		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1447		    odev->state.aftr_isp || odev->state.peer_isp ||
1448		    odev->state.user_isp)
1449			return 0;
1450	}
1451}
1452
1453/**
1454 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1455 * @device:	DRBD device.
1456 *
1457 * Called from process context only (admin command and after_state_ch).
1458 */
1459static int _drbd_pause_after(struct drbd_device *device)
1460{
1461	struct drbd_device *odev;
1462	int i, rv = 0;
1463
1464	rcu_read_lock();
1465	idr_for_each_entry(&drbd_devices, odev, i) {
 
 
1466		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1467			continue;
1468		if (!_drbd_may_sync_now(odev))
1469			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1470			       != SS_NOTHING_TO_DO);
1471	}
1472	rcu_read_unlock();
1473
1474	return rv;
1475}
1476
1477/**
1478 * _drbd_resume_next() - Resume resync on all devices that may resync now
1479 * @device:	DRBD device.
1480 *
1481 * Called from process context only (admin command and worker).
1482 */
1483static int _drbd_resume_next(struct drbd_device *device)
1484{
1485	struct drbd_device *odev;
1486	int i, rv = 0;
1487
1488	rcu_read_lock();
1489	idr_for_each_entry(&drbd_devices, odev, i) {
 
 
1490		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1491			continue;
1492		if (odev->state.aftr_isp) {
1493			if (_drbd_may_sync_now(odev))
1494				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1495							CS_HARD, NULL)
1496				       != SS_NOTHING_TO_DO) ;
1497		}
1498	}
1499	rcu_read_unlock();
1500	return rv;
1501}
1502
1503void resume_next_sg(struct drbd_device *device)
1504{
1505	write_lock_irq(&global_state_lock);
1506	_drbd_resume_next(device);
1507	write_unlock_irq(&global_state_lock);
1508}
1509
1510void suspend_other_sg(struct drbd_device *device)
1511{
1512	write_lock_irq(&global_state_lock);
1513	_drbd_pause_after(device);
1514	write_unlock_irq(&global_state_lock);
1515}
1516
1517/* caller must hold global_state_lock */
1518enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1519{
1520	struct drbd_device *odev;
1521	int resync_after;
1522
1523	if (o_minor == -1)
1524		return NO_ERROR;
1525	if (o_minor < -1 || o_minor > MINORMASK)
1526		return ERR_RESYNC_AFTER;
1527
1528	/* check for loops */
1529	odev = minor_to_device(o_minor);
1530	while (1) {
1531		if (odev == device)
1532			return ERR_RESYNC_AFTER_CYCLE;
1533
1534		/* You are free to depend on diskless, non-existing,
1535		 * or not yet/no longer existing minors.
1536		 * We only reject dependency loops.
1537		 * We cannot follow the dependency chain beyond a detached or
1538		 * missing minor.
1539		 */
1540		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1541			return NO_ERROR;
1542
1543		rcu_read_lock();
1544		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1545		rcu_read_unlock();
1546		/* dependency chain ends here, no cycles. */
1547		if (resync_after == -1)
1548			return NO_ERROR;
1549
1550		/* follow the dependency chain */
1551		odev = minor_to_device(resync_after);
1552	}
1553}
1554
1555/* caller must hold global_state_lock */
1556void drbd_resync_after_changed(struct drbd_device *device)
1557{
1558	int changes;
 
1559
1560	do {
1561		changes  = _drbd_pause_after(device);
1562		changes |= _drbd_resume_next(device);
1563	} while (changes);
1564}
1565
1566void drbd_rs_controller_reset(struct drbd_device *device)
1567{
1568	struct fifo_buffer *plan;
1569
1570	atomic_set(&device->rs_sect_in, 0);
1571	atomic_set(&device->rs_sect_ev, 0);
1572	device->rs_in_flight = 0;
1573
1574	/* Updating the RCU protected object in place is necessary since
1575	   this function gets called from atomic context.
1576	   It is valid since all other updates also lead to an completely
1577	   empty fifo */
1578	rcu_read_lock();
1579	plan = rcu_dereference(device->rs_plan_s);
1580	plan->total = 0;
1581	fifo_set(plan, 0);
1582	rcu_read_unlock();
1583}
1584
1585void start_resync_timer_fn(unsigned long data)
1586{
1587	struct drbd_device *device = (struct drbd_device *) data;
1588
1589	drbd_queue_work(&first_peer_device(device)->connection->sender_work,
1590			&device->start_resync_work);
1591}
1592
1593int w_start_resync(struct drbd_work *w, int cancel)
1594{
1595	struct drbd_device *device =
1596		container_of(w, struct drbd_device, start_resync_work);
1597
1598	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1599		drbd_warn(device, "w_start_resync later...\n");
1600		device->start_resync_timer.expires = jiffies + HZ/10;
1601		add_timer(&device->start_resync_timer);
1602		return 0;
1603	}
1604
1605	drbd_start_resync(device, C_SYNC_SOURCE);
1606	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1607	return 0;
1608}
1609
1610/**
1611 * drbd_start_resync() - Start the resync process
1612 * @device:	DRBD device.
1613 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1614 *
1615 * This function might bring you directly into one of the
1616 * C_PAUSED_SYNC_* states.
1617 */
1618void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1619{
1620	union drbd_state ns;
1621	int r;
1622
1623	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1624		drbd_err(device, "Resync already running!\n");
1625		return;
1626	}
1627
1628	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1629		if (side == C_SYNC_TARGET) {
1630			/* Since application IO was locked out during C_WF_BITMAP_T and
1631			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1632			   we check that we might make the data inconsistent. */
1633			r = drbd_khelper(device, "before-resync-target");
1634			r = (r >> 8) & 0xff;
1635			if (r > 0) {
1636				drbd_info(device, "before-resync-target handler returned %d, "
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1637					 "dropping connection.\n", r);
1638				conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
1639				return;
1640			}
1641		} else /* C_SYNC_SOURCE */ {
1642			r = drbd_khelper(device, "before-resync-source");
1643			r = (r >> 8) & 0xff;
1644			if (r > 0) {
1645				if (r == 3) {
1646					drbd_info(device, "before-resync-source handler returned %d, "
1647						 "ignoring. Old userland tools?", r);
1648				} else {
1649					drbd_info(device, "before-resync-source handler returned %d, "
1650						 "dropping connection.\n", r);
1651					conn_request_state(first_peer_device(device)->connection,
1652							   NS(conn, C_DISCONNECTING), CS_HARD);
1653					return;
1654				}
1655			}
1656		}
1657	}
1658
1659	if (current == first_peer_device(device)->connection->worker.task) {
1660		/* The worker should not sleep waiting for state_mutex,
1661		   that can take long */
1662		if (!mutex_trylock(device->state_mutex)) {
1663			set_bit(B_RS_H_DONE, &device->flags);
1664			device->start_resync_timer.expires = jiffies + HZ/5;
1665			add_timer(&device->start_resync_timer);
1666			return;
1667		}
1668	} else {
1669		mutex_lock(device->state_mutex);
1670	}
1671	clear_bit(B_RS_H_DONE, &device->flags);
1672
1673	write_lock_irq(&global_state_lock);
1674	/* Did some connection breakage or IO error race with us? */
1675	if (device->state.conn < C_CONNECTED
1676	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1677		write_unlock_irq(&global_state_lock);
1678		mutex_unlock(device->state_mutex);
1679		return;
1680	}
1681
1682	ns = drbd_read_state(device);
 
1683
1684	ns.aftr_isp = !_drbd_may_sync_now(device);
1685
1686	ns.conn = side;
1687
1688	if (side == C_SYNC_TARGET)
1689		ns.disk = D_INCONSISTENT;
1690	else /* side == C_SYNC_SOURCE */
1691		ns.pdsk = D_INCONSISTENT;
1692
1693	r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1694	ns = drbd_read_state(device);
1695
1696	if (ns.conn < C_CONNECTED)
1697		r = SS_UNKNOWN_ERROR;
1698
1699	if (r == SS_SUCCESS) {
1700		unsigned long tw = drbd_bm_total_weight(device);
1701		unsigned long now = jiffies;
1702		int i;
1703
1704		device->rs_failed    = 0;
1705		device->rs_paused    = 0;
1706		device->rs_same_csum = 0;
1707		device->rs_last_events = 0;
1708		device->rs_last_sect_ev = 0;
1709		device->rs_total     = tw;
1710		device->rs_start     = now;
1711		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1712			device->rs_mark_left[i] = tw;
1713			device->rs_mark_time[i] = now;
1714		}
1715		_drbd_pause_after(device);
1716	}
1717	write_unlock_irq(&global_state_lock);
1718
1719	if (r == SS_SUCCESS) {
1720		/* reset rs_last_bcast when a resync or verify is started,
1721		 * to deal with potential jiffies wrap. */
1722		device->rs_last_bcast = jiffies - HZ;
1723
1724		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1725		     drbd_conn_str(ns.conn),
1726		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1727		     (unsigned long) device->rs_total);
1728		if (side == C_SYNC_TARGET)
1729			device->bm_resync_fo = 0;
1730
1731		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1732		 * with w_send_oos, or the sync target will get confused as to
1733		 * how much bits to resync.  We cannot do that always, because for an
1734		 * empty resync and protocol < 95, we need to do it here, as we call
1735		 * drbd_resync_finished from here in that case.
1736		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1737		 * and from after_state_ch otherwise. */
1738		if (side == C_SYNC_SOURCE &&
1739		    first_peer_device(device)->connection->agreed_pro_version < 96)
1740			drbd_gen_and_send_sync_uuid(first_peer_device(device));
1741
1742		if (first_peer_device(device)->connection->agreed_pro_version < 95 &&
1743		    device->rs_total == 0) {
1744			/* This still has a race (about when exactly the peers
1745			 * detect connection loss) that can lead to a full sync
1746			 * on next handshake. In 8.3.9 we fixed this with explicit
1747			 * resync-finished notifications, but the fix
1748			 * introduces a protocol change.  Sleeping for some
1749			 * time longer than the ping interval + timeout on the
1750			 * SyncSource, to give the SyncTarget the chance to
1751			 * detect connection loss, then waiting for a ping
1752			 * response (implicit in drbd_resync_finished) reduces
1753			 * the race considerably, but does not solve it. */
1754			if (side == C_SYNC_SOURCE) {
1755				struct net_conf *nc;
1756				int timeo;
1757
1758				rcu_read_lock();
1759				nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
1760				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1761				rcu_read_unlock();
1762				schedule_timeout_interruptible(timeo);
1763			}
1764			drbd_resync_finished(device);
1765		}
1766
1767		drbd_rs_controller_reset(device);
1768		/* ns.conn may already be != device->state.conn,
1769		 * we may have been paused in between, or become paused until
1770		 * the timer triggers.
1771		 * No matter, that is handled in resync_timer_fn() */
1772		if (ns.conn == C_SYNC_TARGET)
1773			mod_timer(&device->resync_timer, jiffies);
1774
1775		drbd_md_sync(device);
1776	}
1777	put_ldev(device);
1778	mutex_unlock(device->state_mutex);
1779}
1780
1781/* If the resource already closed the current epoch, but we did not
1782 * (because we have not yet seen new requests), we should send the
1783 * corresponding barrier now.  Must be checked within the same spinlock
1784 * that is used to check for new requests. */
1785static bool need_to_send_barrier(struct drbd_connection *connection)
1786{
1787	if (!connection->send.seen_any_write_yet)
1788		return false;
1789
1790	/* Skip barriers that do not contain any writes.
1791	 * This may happen during AHEAD mode. */
1792	if (!connection->send.current_epoch_writes)
1793		return false;
1794
1795	/* ->req_lock is held when requests are queued on
1796	 * connection->sender_work, and put into ->transfer_log.
1797	 * It is also held when ->current_tle_nr is increased.
1798	 * So either there are already new requests queued,
1799	 * and corresponding barriers will be send there.
1800	 * Or nothing new is queued yet, so the difference will be 1.
1801	 */
1802	if (atomic_read(&connection->current_tle_nr) !=
1803	    connection->send.current_epoch_nr + 1)
1804		return false;
1805
1806	return true;
1807}
1808
1809static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1810{
1811	spin_lock_irq(&queue->q_lock);
1812	list_splice_init(&queue->q, work_list);
1813	spin_unlock_irq(&queue->q_lock);
1814	return !list_empty(work_list);
1815}
1816
1817static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1818{
1819	spin_lock_irq(&queue->q_lock);
1820	if (!list_empty(&queue->q))
1821		list_move(queue->q.next, work_list);
1822	spin_unlock_irq(&queue->q_lock);
1823	return !list_empty(work_list);
1824}
1825
1826static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1827{
1828	DEFINE_WAIT(wait);
1829	struct net_conf *nc;
1830	int uncork, cork;
1831
1832	dequeue_work_item(&connection->sender_work, work_list);
1833	if (!list_empty(work_list))
1834		return;
1835
1836	/* Still nothing to do?
1837	 * Maybe we still need to close the current epoch,
1838	 * even if no new requests are queued yet.
1839	 *
1840	 * Also, poke TCP, just in case.
1841	 * Then wait for new work (or signal). */
1842	rcu_read_lock();
1843	nc = rcu_dereference(connection->net_conf);
1844	uncork = nc ? nc->tcp_cork : 0;
1845	rcu_read_unlock();
1846	if (uncork) {
1847		mutex_lock(&connection->data.mutex);
1848		if (connection->data.socket)
1849			drbd_tcp_uncork(connection->data.socket);
1850		mutex_unlock(&connection->data.mutex);
1851	}
1852
1853	for (;;) {
1854		int send_barrier;
1855		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1856		spin_lock_irq(&connection->resource->req_lock);
1857		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1858		/* dequeue single item only,
1859		 * we still use drbd_queue_work_front() in some places */
1860		if (!list_empty(&connection->sender_work.q))
1861			list_move(connection->sender_work.q.next, work_list);
1862		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1863		if (!list_empty(work_list) || signal_pending(current)) {
1864			spin_unlock_irq(&connection->resource->req_lock);
1865			break;
1866		}
1867		send_barrier = need_to_send_barrier(connection);
1868		spin_unlock_irq(&connection->resource->req_lock);
1869		if (send_barrier) {
1870			drbd_send_barrier(connection);
1871			connection->send.current_epoch_nr++;
1872		}
1873		schedule();
1874		/* may be woken up for other things but new work, too,
1875		 * e.g. if the current epoch got closed.
1876		 * In which case we send the barrier above. */
1877	}
1878	finish_wait(&connection->sender_work.q_wait, &wait);
1879
1880	/* someone may have changed the config while we have been waiting above. */
1881	rcu_read_lock();
1882	nc = rcu_dereference(connection->net_conf);
1883	cork = nc ? nc->tcp_cork : 0;
1884	rcu_read_unlock();
1885	mutex_lock(&connection->data.mutex);
1886	if (connection->data.socket) {
1887		if (cork)
1888			drbd_tcp_cork(connection->data.socket);
1889		else if (!uncork)
1890			drbd_tcp_uncork(connection->data.socket);
1891	}
1892	mutex_unlock(&connection->data.mutex);
1893}
1894
1895int drbd_worker(struct drbd_thread *thi)
1896{
1897	struct drbd_connection *connection = thi->connection;
1898	struct drbd_work *w = NULL;
1899	struct drbd_peer_device *peer_device;
1900	LIST_HEAD(work_list);
1901	int vnr;
1902
1903	while (get_t_state(thi) == RUNNING) {
1904		drbd_thread_current_set_cpu(thi);
1905
1906		/* as long as we use drbd_queue_work_front(),
1907		 * we may only dequeue single work items here, not batches. */
1908		if (list_empty(&work_list))
1909			wait_for_work(connection, &work_list);
1910
1911		if (signal_pending(current)) {
1912			flush_signals(current);
1913			if (get_t_state(thi) == RUNNING) {
1914				drbd_warn(connection, "Worker got an unexpected signal\n");
1915				continue;
1916			}
1917			break;
1918		}
1919
1920		if (get_t_state(thi) != RUNNING)
1921			break;
1922
1923		while (!list_empty(&work_list)) {
1924			w = list_first_entry(&work_list, struct drbd_work, list);
1925			list_del_init(&w->list);
1926			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
1927				continue;
1928			if (connection->cstate >= C_WF_REPORT_PARAMS)
1929				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
 
 
 
 
 
 
 
 
 
 
 
 
1930		}
1931	}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1932
1933	do {
1934		while (!list_empty(&work_list)) {
1935			w = list_first_entry(&work_list, struct drbd_work, list);
1936			list_del_init(&w->list);
1937			w->cb(w, 1);
 
1938		}
1939		dequeue_work_batch(&connection->sender_work, &work_list);
1940	} while (!list_empty(&work_list));
1941
1942	rcu_read_lock();
1943	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1944		struct drbd_device *device = peer_device->device;
1945		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
1946		kref_get(&device->kref);
1947		rcu_read_unlock();
1948		drbd_device_cleanup(device);
1949		kref_put(&device->kref, drbd_destroy_device);
1950		rcu_read_lock();
1951	}
1952	rcu_read_unlock();
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1953
1954	return 0;
1955}