Linux Audio

Check our new training course

Linux BSP development engineering services

Need help to port Linux and bootloaders to your hardware?
Loading...
v3.1
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24 */
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
 
  39#include "drbd_req.h"
  40
  41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
  42static int w_make_resync_request(struct drbd_conf *mdev,
  43				 struct drbd_work *w, int cancel);
  44
  45
  46
  47/* endio handlers:
  48 *   drbd_md_io_complete (defined here)
  49 *   drbd_endio_pri (defined here)
  50 *   drbd_endio_sec (defined here)
  51 *   bm_async_io_complete (defined in drbd_bitmap.c)
  52 *
  53 * For all these callbacks, note the following:
  54 * The callbacks will be called in irq context by the IDE drivers,
  55 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  56 * Try to get the locking right :)
  57 *
  58 */
  59
  60
  61/* About the global_state_lock
  62   Each state transition on an device holds a read lock. In case we have
  63   to evaluate the sync after dependencies, we grab a write lock, because
  64   we need stable states on all devices for that.  */
  65rwlock_t global_state_lock;
  66
  67/* used for synchronous meta data and bitmap IO
  68 * submitted by drbd_md_sync_page_io()
  69 */
  70void drbd_md_io_complete(struct bio *bio, int error)
  71{
  72	struct drbd_md_io *md_io;
  73
  74	md_io = (struct drbd_md_io *)bio->bi_private;
  75	md_io->error = error;
  76
  77	complete(&md_io->event);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  78}
  79
  80/* reads on behalf of the partner,
  81 * "submitted" by the receiver
  82 */
  83void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
  84{
  85	unsigned long flags = 0;
  86	struct drbd_conf *mdev = e->mdev;
  87
  88	D_ASSERT(e->block_id != ID_VACANT);
  89
  90	spin_lock_irqsave(&mdev->req_lock, flags);
  91	mdev->read_cnt += e->size >> 9;
  92	list_del(&e->w.list);
  93	if (list_empty(&mdev->read_ee))
  94		wake_up(&mdev->ee_wait);
  95	if (test_bit(__EE_WAS_ERROR, &e->flags))
  96		__drbd_chk_io_error(mdev, false);
  97	spin_unlock_irqrestore(&mdev->req_lock, flags);
  98
  99	drbd_queue_work(&mdev->data.work, &e->w);
 100	put_ldev(mdev);
 101}
 102
 103/* writes on behalf of the partner, or resync writes,
 104 * "submitted" by the receiver, final stage.  */
 105static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
 106{
 107	unsigned long flags = 0;
 108	struct drbd_conf *mdev = e->mdev;
 109	sector_t e_sector;
 
 
 110	int do_wake;
 111	int is_syncer_req;
 112	int do_al_complete_io;
 113
 114	D_ASSERT(e->block_id != ID_VACANT);
 115
 116	/* after we moved e to done_ee,
 117	 * we may no longer access it,
 118	 * it may be freed/reused already!
 119	 * (as soon as we release the req_lock) */
 120	e_sector = e->sector;
 121	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
 122	is_syncer_req = is_syncer_block_id(e->block_id);
 123
 124	spin_lock_irqsave(&mdev->req_lock, flags);
 125	mdev->writ_cnt += e->size >> 9;
 126	list_del(&e->w.list); /* has been on active_ee or sync_ee */
 127	list_add_tail(&e->w.list, &mdev->done_ee);
 128
 129	/* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
 130	 * neither did we wake possibly waiting conflicting requests.
 131	 * done from "drbd_process_done_ee" within the appropriate w.cb
 132	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
 133
 134	do_wake = is_syncer_req
 135		? list_empty(&mdev->sync_ee)
 136		: list_empty(&mdev->active_ee);
 137
 138	if (test_bit(__EE_WAS_ERROR, &e->flags))
 139		__drbd_chk_io_error(mdev, false);
 140	spin_unlock_irqrestore(&mdev->req_lock, flags);
 141
 142	if (is_syncer_req)
 143		drbd_rs_complete_io(mdev, e_sector);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 144
 145	if (do_wake)
 146		wake_up(&mdev->ee_wait);
 147
 148	if (do_al_complete_io)
 149		drbd_al_complete_io(mdev, e_sector);
 150
 151	wake_asender(mdev);
 152	put_ldev(mdev);
 153}
 154
 155/* writes on behalf of the partner, or resync writes,
 156 * "submitted" by the receiver.
 157 */
 158void drbd_endio_sec(struct bio *bio, int error)
 159{
 160	struct drbd_epoch_entry *e = bio->bi_private;
 161	struct drbd_conf *mdev = e->mdev;
 162	int uptodate = bio_flagged(bio, BIO_UPTODATE);
 163	int is_write = bio_data_dir(bio) == WRITE;
 164
 165	if (error && __ratelimit(&drbd_ratelimit_state))
 166		dev_warn(DEV, "%s: error=%d s=%llus\n",
 167				is_write ? "write" : "read", error,
 168				(unsigned long long)e->sector);
 169	if (!error && !uptodate) {
 170		if (__ratelimit(&drbd_ratelimit_state))
 171			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
 172					is_write ? "write" : "read",
 173					(unsigned long long)e->sector);
 174		/* strange behavior of some lower level drivers...
 175		 * fail the request by clearing the uptodate flag,
 176		 * but do not return any error?! */
 177		error = -EIO;
 178	}
 179
 180	if (error)
 181		set_bit(__EE_WAS_ERROR, &e->flags);
 182
 183	bio_put(bio); /* no need for the bio anymore */
 184	if (atomic_dec_and_test(&e->pending_bios)) {
 185		if (is_write)
 186			drbd_endio_write_sec_final(e);
 187		else
 188			drbd_endio_read_sec_final(e);
 189	}
 190}
 191
 
 
 
 
 
 
 
 192/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 193 */
 194void drbd_endio_pri(struct bio *bio, int error)
 195{
 196	unsigned long flags;
 197	struct drbd_request *req = bio->bi_private;
 198	struct drbd_conf *mdev = req->mdev;
 199	struct bio_and_error m;
 200	enum drbd_req_event what;
 201	int uptodate = bio_flagged(bio, BIO_UPTODATE);
 202
 203	if (!error && !uptodate) {
 204		dev_warn(DEV, "p %s: setting error to -EIO\n",
 205			 bio_data_dir(bio) == WRITE ? "write" : "read");
 206		/* strange behavior of some lower level drivers...
 207		 * fail the request by clearing the uptodate flag,
 208		 * but do not return any error?! */
 209		error = -EIO;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 210	}
 211
 212	/* to avoid recursion in __req_mod */
 213	if (unlikely(error)) {
 214		what = (bio_data_dir(bio) == WRITE)
 215			? write_completed_with_error
 216			: (bio_rw(bio) == READ)
 217			  ? read_completed_with_error
 218			  : read_ahead_completed_with_error;
 219	} else
 220		what = completed_ok;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 221
 222	bio_put(req->private_bio);
 223	req->private_bio = ERR_PTR(error);
 224
 225	/* not req_mod(), we need irqsave here! */
 226	spin_lock_irqsave(&mdev->req_lock, flags);
 227	__req_mod(req, what, &m);
 228	spin_unlock_irqrestore(&mdev->req_lock, flags);
 
 229
 230	if (m.bio)
 231		complete_master_bio(mdev, &m);
 232}
 233
 234int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 235{
 236	struct drbd_request *req = container_of(w, struct drbd_request, w);
 237
 238	/* We should not detach for read io-error,
 239	 * but try to WRITE the P_DATA_REPLY to the failed location,
 240	 * to give the disk the chance to relocate that block */
 241
 242	spin_lock_irq(&mdev->req_lock);
 243	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
 244		_req_mod(req, read_retry_remote_canceled);
 245		spin_unlock_irq(&mdev->req_lock);
 246		return 1;
 247	}
 248	spin_unlock_irq(&mdev->req_lock);
 249
 250	return w_send_read_req(mdev, w, 0);
 251}
 252
 253void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
 254{
 255	struct hash_desc desc;
 256	struct scatterlist sg;
 257	struct page *page = e->pages;
 258	struct page *tmp;
 259	unsigned len;
 260
 261	desc.tfm = tfm;
 262	desc.flags = 0;
 263
 264	sg_init_table(&sg, 1);
 265	crypto_hash_init(&desc);
 266
 267	while ((tmp = page_chain_next(page))) {
 268		/* all but the last page will be fully used */
 269		sg_set_page(&sg, page, PAGE_SIZE, 0);
 270		crypto_hash_update(&desc, &sg, sg.length);
 
 271		page = tmp;
 272	}
 273	/* and now the last, possibly only partially used page */
 274	len = e->size & (PAGE_SIZE - 1);
 275	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 276	crypto_hash_update(&desc, &sg, sg.length);
 277	crypto_hash_final(&desc, digest);
 
 278}
 279
 280void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
 281{
 282	struct hash_desc desc;
 283	struct scatterlist sg;
 284	struct bio_vec *bvec;
 285	int i;
 286
 287	desc.tfm = tfm;
 288	desc.flags = 0;
 289
 290	sg_init_table(&sg, 1);
 291	crypto_hash_init(&desc);
 292
 293	__bio_for_each_segment(bvec, bio, i, 0) {
 294		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
 295		crypto_hash_update(&desc, &sg, sg.length);
 
 
 
 
 
 296	}
 297	crypto_hash_final(&desc, digest);
 
 
 298}
 299
 300/* TODO merge common code with w_e_end_ov_req */
 301int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 302{
 303	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 
 
 304	int digest_size;
 305	void *digest;
 306	int ok = 1;
 307
 308	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
 309
 310	if (unlikely(cancel))
 311		goto out;
 312
 313	if (likely((e->flags & EE_WAS_ERROR) != 0))
 314		goto out;
 315
 316	digest_size = crypto_hash_digestsize(mdev->csums_tfm);
 317	digest = kmalloc(digest_size, GFP_NOIO);
 318	if (digest) {
 319		sector_t sector = e->sector;
 320		unsigned int size = e->size;
 321		drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
 322		/* Free e and pages before send.
 323		 * In case we block on congestion, we could otherwise run into
 324		 * some distributed deadlock, if the other side blocks on
 325		 * congestion as well, because our receiver blocks in
 326		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
 327		drbd_free_ee(mdev, e);
 328		e = NULL;
 329		inc_rs_pending(mdev);
 330		ok = drbd_send_drequest_csum(mdev, sector, size,
 331					     digest, digest_size,
 332					     P_CSUM_RS_REQUEST);
 333		kfree(digest);
 334	} else {
 335		dev_err(DEV, "kmalloc() of digest failed.\n");
 336		ok = 0;
 337	}
 338
 339out:
 340	if (e)
 341		drbd_free_ee(mdev, e);
 342
 343	if (unlikely(!ok))
 344		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
 345	return ok;
 346}
 347
 348#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
 349
 350static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
 351{
 352	struct drbd_epoch_entry *e;
 
 353
 354	if (!get_ldev(mdev))
 355		return -EIO;
 356
 357	if (drbd_rs_should_slow_down(mdev, sector))
 358		goto defer;
 359
 360	/* GFP_TRY, because if there is no memory available right now, this may
 361	 * be rescheduled for later. It is "only" background resync, after all. */
 362	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
 363	if (!e)
 
 364		goto defer;
 365
 366	e->w.cb = w_e_send_csum;
 367	spin_lock_irq(&mdev->req_lock);
 368	list_add(&e->w.list, &mdev->read_ee);
 369	spin_unlock_irq(&mdev->req_lock);
 370
 371	atomic_add(size >> 9, &mdev->rs_sect_ev);
 372	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
 
 373		return 0;
 374
 375	/* If it failed because of ENOMEM, retry should help.  If it failed
 376	 * because bio_add_page failed (probably broken lower level driver),
 377	 * retry may or may not help.
 378	 * If it does not, you may need to force disconnect. */
 379	spin_lock_irq(&mdev->req_lock);
 380	list_del(&e->w.list);
 381	spin_unlock_irq(&mdev->req_lock);
 382
 383	drbd_free_ee(mdev, e);
 384defer:
 385	put_ldev(mdev);
 386	return -EAGAIN;
 387}
 388
 389int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 390{
 391	switch (mdev->state.conn) {
 
 
 
 392	case C_VERIFY_S:
 393		w_make_ov_request(mdev, w, cancel);
 394		break;
 395	case C_SYNC_TARGET:
 396		w_make_resync_request(mdev, w, cancel);
 397		break;
 398	}
 399
 400	return 1;
 401}
 402
 403void resync_timer_fn(unsigned long data)
 404{
 405	struct drbd_conf *mdev = (struct drbd_conf *) data;
 406
 407	if (list_empty(&mdev->resync_work.list))
 408		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
 
 409}
 410
 411static void fifo_set(struct fifo_buffer *fb, int value)
 412{
 413	int i;
 414
 415	for (i = 0; i < fb->size; i++)
 416		fb->values[i] = value;
 417}
 418
 419static int fifo_push(struct fifo_buffer *fb, int value)
 420{
 421	int ov;
 422
 423	ov = fb->values[fb->head_index];
 424	fb->values[fb->head_index++] = value;
 425
 426	if (fb->head_index >= fb->size)
 427		fb->head_index = 0;
 428
 429	return ov;
 430}
 431
 432static void fifo_add_val(struct fifo_buffer *fb, int value)
 433{
 434	int i;
 435
 436	for (i = 0; i < fb->size; i++)
 437		fb->values[i] += value;
 438}
 439
 440static int drbd_rs_controller(struct drbd_conf *mdev)
 441{
 442	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 443	unsigned int want;     /* The number of sectors we want in the proxy */
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 444	int req_sect; /* Number of sectors to request in this turn */
 445	int correction; /* Number of sectors more we need in the proxy*/
 446	int cps; /* correction per invocation of drbd_rs_controller() */
 447	int steps; /* Number of time steps to plan ahead */
 448	int curr_corr;
 449	int max_sect;
 
 450
 451	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
 452	mdev->rs_in_flight -= sect_in;
 453
 454	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
 455
 456	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 457
 458	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
 459		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
 460	} else { /* normal path */
 461		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
 462			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
 463	}
 464
 465	correction = want - mdev->rs_in_flight - mdev->rs_planed;
 466
 467	/* Plan ahead */
 468	cps = correction / steps;
 469	fifo_add_val(&mdev->rs_plan_s, cps);
 470	mdev->rs_planed += cps * steps;
 471
 472	/* What we do in this step */
 473	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
 474	spin_unlock(&mdev->peer_seq_lock);
 475	mdev->rs_planed -= curr_corr;
 476
 477	req_sect = sect_in + curr_corr;
 478	if (req_sect < 0)
 479		req_sect = 0;
 480
 481	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
 482	if (req_sect > max_sect)
 483		req_sect = max_sect;
 484
 485	/*
 486	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 487		 sect_in, mdev->rs_in_flight, want, correction,
 488		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
 489	*/
 490
 491	return req_sect;
 492}
 493
 494static int drbd_rs_number_requests(struct drbd_conf *mdev)
 495{
 496	int number;
 497	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
 498		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
 499		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 
 
 
 
 
 
 
 500	} else {
 501		mdev->c_sync_rate = mdev->sync_conf.rate;
 502		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 503	}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 504
 505	/* ignore the amount of pending requests, the resync controller should
 506	 * throttle down to incoming reply rate soon enough anyways. */
 507	return number;
 508}
 509
 510static int w_make_resync_request(struct drbd_conf *mdev,
 511				 struct drbd_work *w, int cancel)
 512{
 
 
 513	unsigned long bit;
 514	sector_t sector;
 515	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 516	int max_bio_size;
 517	int number, rollback_i, size;
 518	int align, queued, sndbuf;
 519	int i = 0;
 
 520
 521	if (unlikely(cancel))
 522		return 1;
 523
 524	if (mdev->rs_total == 0) {
 525		/* empty resync? */
 526		drbd_resync_finished(mdev);
 527		return 1;
 528	}
 529
 530	if (!get_ldev(mdev)) {
 531		/* Since we only need to access mdev->rsync a
 532		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
 533		   to continue resync with a broken disk makes no sense at
 534		   all */
 535		dev_err(DEV, "Disk broke down during resync!\n");
 536		return 1;
 537	}
 538
 539	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
 540	number = drbd_rs_number_requests(mdev);
 541	if (number == 0)
 
 
 
 
 
 
 542		goto requeue;
 543
 544	for (i = 0; i < number; i++) {
 545		/* Stop generating RS requests, when half of the send buffer is filled */
 546		mutex_lock(&mdev->data.mutex);
 547		if (mdev->data.socket) {
 548			queued = mdev->data.socket->sk->sk_wmem_queued;
 549			sndbuf = mdev->data.socket->sk->sk_sndbuf;
 550		} else {
 551			queued = 1;
 552			sndbuf = 0;
 553		}
 554		mutex_unlock(&mdev->data.mutex);
 555		if (queued > sndbuf / 2)
 
 
 
 
 
 556			goto requeue;
 557
 558next_sector:
 559		size = BM_BLOCK_SIZE;
 560		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
 561
 562		if (bit == DRBD_END_OF_BITMAP) {
 563			mdev->bm_resync_fo = drbd_bm_bits(mdev);
 564			put_ldev(mdev);
 565			return 1;
 566		}
 567
 568		sector = BM_BIT_TO_SECT(bit);
 569
 570		if (drbd_rs_should_slow_down(mdev, sector) ||
 571		    drbd_try_rs_begin_io(mdev, sector)) {
 572			mdev->bm_resync_fo = bit;
 573			goto requeue;
 574		}
 575		mdev->bm_resync_fo = bit + 1;
 576
 577		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
 578			drbd_rs_complete_io(mdev, sector);
 579			goto next_sector;
 580		}
 581
 582#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 583		/* try to find some adjacent bits.
 584		 * we stop if we have already the maximum req size.
 585		 *
 586		 * Additionally always align bigger requests, in order to
 587		 * be prepared for all stripe sizes of software RAIDs.
 588		 */
 589		align = 1;
 590		rollback_i = i;
 591		for (;;) {
 592			if (size + BM_BLOCK_SIZE > max_bio_size)
 593				break;
 594
 595			/* Be always aligned */
 596			if (sector & ((1<<(align+3))-1))
 597				break;
 598
 
 
 
 599			/* do not cross extent boundaries */
 600			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 601				break;
 602			/* now, is it actually dirty, after all?
 603			 * caution, drbd_bm_test_bit is tri-state for some
 604			 * obscure reason; ( b == 0 ) would get the out-of-band
 605			 * only accidentally right because of the "oddly sized"
 606			 * adjustment below */
 607			if (drbd_bm_test_bit(mdev, bit+1) != 1)
 608				break;
 609			bit++;
 610			size += BM_BLOCK_SIZE;
 611			if ((BM_BLOCK_SIZE << align) <= size)
 612				align++;
 613			i++;
 614		}
 615		/* if we merged some,
 616		 * reset the offset to start the next drbd_bm_find_next from */
 617		if (size > BM_BLOCK_SIZE)
 618			mdev->bm_resync_fo = bit + 1;
 619#endif
 620
 621		/* adjust very last sectors, in case we are oddly sized */
 622		if (sector + (size>>9) > capacity)
 623			size = (capacity-sector)<<9;
 624		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
 625			switch (read_for_csum(mdev, sector, size)) {
 
 626			case -EIO: /* Disk failure */
 627				put_ldev(mdev);
 628				return 0;
 629			case -EAGAIN: /* allocation failed, or ldev busy */
 630				drbd_rs_complete_io(mdev, sector);
 631				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
 632				i = rollback_i;
 633				goto requeue;
 634			case 0:
 635				/* everything ok */
 636				break;
 637			default:
 638				BUG();
 639			}
 640		} else {
 641			inc_rs_pending(mdev);
 642			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
 643					       sector, size, ID_SYNCER)) {
 644				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
 645				dec_rs_pending(mdev);
 646				put_ldev(mdev);
 647				return 0;
 
 
 
 
 648			}
 649		}
 650	}
 651
 652	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
 653		/* last syncer _request_ was sent,
 654		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 655		 * next sync group will resume), as soon as we receive the last
 656		 * resync data block, and the last bit is cleared.
 657		 * until then resync "work" is "inactive" ...
 658		 */
 659		put_ldev(mdev);
 660		return 1;
 661	}
 662
 663 requeue:
 664	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 665	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 666	put_ldev(mdev);
 667	return 1;
 668}
 669
 670static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 671{
 672	int number, i, size;
 673	sector_t sector;
 674	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 
 675
 676	if (unlikely(cancel))
 677		return 1;
 678
 679	number = drbd_rs_number_requests(mdev);
 680
 681	sector = mdev->ov_position;
 682	for (i = 0; i < number; i++) {
 683		if (sector >= capacity) {
 684			return 1;
 685		}
 
 
 
 
 
 
 
 
 686
 687		size = BM_BLOCK_SIZE;
 688
 689		if (drbd_rs_should_slow_down(mdev, sector) ||
 690		    drbd_try_rs_begin_io(mdev, sector)) {
 691			mdev->ov_position = sector;
 692			goto requeue;
 693		}
 694
 695		if (sector + (size>>9) > capacity)
 696			size = (capacity-sector)<<9;
 697
 698		inc_rs_pending(mdev);
 699		if (!drbd_send_ov_request(mdev, sector, size)) {
 700			dec_rs_pending(mdev);
 701			return 0;
 702		}
 703		sector += BM_SECT_PER_BIT;
 704	}
 705	mdev->ov_position = sector;
 706
 707 requeue:
 708	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 709	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 
 710	return 1;
 711}
 712
 713
 714void start_resync_timer_fn(unsigned long data)
 715{
 716	struct drbd_conf *mdev = (struct drbd_conf *) data;
 
 
 
 
 
 717
 718	drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
 719}
 720
 721int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 722{
 723	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
 724		dev_warn(DEV, "w_start_resync later...\n");
 725		mdev->start_resync_timer.expires = jiffies + HZ/10;
 726		add_timer(&mdev->start_resync_timer);
 727		return 1;
 728	}
 729
 730	drbd_start_resync(mdev, C_SYNC_SOURCE);
 731	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
 732	return 1;
 733}
 734
 735int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 736{
 737	kfree(w);
 738	ov_oos_print(mdev);
 739	drbd_resync_finished(mdev);
 740
 741	return 1;
 742}
 743
 744static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 745{
 746	kfree(w);
 747
 748	drbd_resync_finished(mdev);
 749
 750	return 1;
 751}
 752
 753static void ping_peer(struct drbd_conf *mdev)
 754{
 755	clear_bit(GOT_PING_ACK, &mdev->flags);
 756	request_ping(mdev);
 757	wait_event(mdev->misc_wait,
 758		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
 759}
 760
 761int drbd_resync_finished(struct drbd_conf *mdev)
 762{
 
 763	unsigned long db, dt, dbdt;
 764	unsigned long n_oos;
 765	union drbd_state os, ns;
 766	struct drbd_work *w;
 767	char *khelper_cmd = NULL;
 768	int verify_done = 0;
 769
 770	/* Remove all elements from the resync LRU. Since future actions
 771	 * might set bits in the (main) bitmap, then the entries in the
 772	 * resync LRU would be wrong. */
 773	if (drbd_rs_del_all(mdev)) {
 774		/* In case this is not possible now, most probably because
 775		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 776		 * queue (or even the read operations for those packets
 777		 * is not finished by now).   Retry in 100ms. */
 778
 779		schedule_timeout_interruptible(HZ / 10);
 780		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
 781		if (w) {
 782			w->cb = w_resync_finished;
 783			drbd_queue_work(&mdev->data.work, w);
 
 784			return 1;
 785		}
 786		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
 787	}
 788
 789	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
 790	if (dt <= 0)
 791		dt = 1;
 792	db = mdev->rs_total;
 
 
 
 
 
 793	dbdt = Bit2KB(db/dt);
 794	mdev->rs_paused /= HZ;
 795
 796	if (!get_ldev(mdev))
 797		goto out;
 798
 799	ping_peer(mdev);
 800
 801	spin_lock_irq(&mdev->req_lock);
 802	os = mdev->state;
 803
 804	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 805
 806	/* This protects us against multiple calls (that can happen in the presence
 807	   of application IO), and against connectivity loss just before we arrive here. */
 808	if (os.conn <= C_CONNECTED)
 809		goto out_unlock;
 810
 811	ns = os;
 812	ns.conn = C_CONNECTED;
 813
 814	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 815	     verify_done ? "Online verify " : "Resync",
 816	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
 817
 818	n_oos = drbd_bm_total_weight(mdev);
 819
 820	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 821		if (n_oos) {
 822			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
 823			      n_oos, Bit2KB(1));
 824			khelper_cmd = "out-of-sync";
 825		}
 826	} else {
 827		D_ASSERT((n_oos - mdev->rs_failed) == 0);
 828
 829		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 830			khelper_cmd = "after-resync-target";
 831
 832		if (mdev->csums_tfm && mdev->rs_total) {
 833			const unsigned long s = mdev->rs_same_csum;
 834			const unsigned long t = mdev->rs_total;
 835			const int ratio =
 836				(t == 0)     ? 0 :
 837			(t < 100000) ? ((s*100)/t) : (s/(t/100));
 838			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
 839			     "transferred %luK total %luK\n",
 840			     ratio,
 841			     Bit2KB(mdev->rs_same_csum),
 842			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
 843			     Bit2KB(mdev->rs_total));
 844		}
 845	}
 846
 847	if (mdev->rs_failed) {
 848		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
 849
 850		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 851			ns.disk = D_INCONSISTENT;
 852			ns.pdsk = D_UP_TO_DATE;
 853		} else {
 854			ns.disk = D_UP_TO_DATE;
 855			ns.pdsk = D_INCONSISTENT;
 856		}
 857	} else {
 858		ns.disk = D_UP_TO_DATE;
 859		ns.pdsk = D_UP_TO_DATE;
 860
 861		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 862			if (mdev->p_uuid) {
 863				int i;
 864				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 865					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
 866				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
 867				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
 868			} else {
 869				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
 870			}
 871		}
 872
 873		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 874			/* for verify runs, we don't update uuids here,
 875			 * so there would be nothing to report. */
 876			drbd_uuid_set_bm(mdev, 0UL);
 877			drbd_print_uuids(mdev, "updated UUIDs");
 878			if (mdev->p_uuid) {
 879				/* Now the two UUID sets are equal, update what we
 880				 * know of the peer. */
 881				int i;
 882				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 883					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
 884			}
 885		}
 886	}
 887
 888	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
 889out_unlock:
 890	spin_unlock_irq(&mdev->req_lock);
 891	put_ldev(mdev);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 892out:
 893	mdev->rs_total  = 0;
 894	mdev->rs_failed = 0;
 895	mdev->rs_paused = 0;
 896	if (verify_done)
 897		mdev->ov_start_sector = 0;
 
 
 898
 899	drbd_md_sync(mdev);
 900
 901	if (khelper_cmd)
 902		drbd_khelper(mdev, khelper_cmd);
 903
 904	return 1;
 905}
 906
 907/* helper */
 908static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 909{
 910	if (drbd_ee_has_active_page(e)) {
 911		/* This might happen if sendpage() has not finished */
 912		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
 913		atomic_add(i, &mdev->pp_in_use_by_net);
 914		atomic_sub(i, &mdev->pp_in_use);
 915		spin_lock_irq(&mdev->req_lock);
 916		list_add_tail(&e->w.list, &mdev->net_ee);
 917		spin_unlock_irq(&mdev->req_lock);
 918		wake_up(&drbd_pp_wait);
 919	} else
 920		drbd_free_ee(mdev, e);
 921}
 922
 923/**
 924 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 925 * @mdev:	DRBD device.
 926 * @w:		work object.
 927 * @cancel:	The connection will be closed anyways
 928 */
 929int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 930{
 931	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 932	int ok;
 
 
 933
 934	if (unlikely(cancel)) {
 935		drbd_free_ee(mdev, e);
 936		dec_unacked(mdev);
 937		return 1;
 938	}
 939
 940	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 941		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
 942	} else {
 943		if (__ratelimit(&drbd_ratelimit_state))
 944			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
 945			    (unsigned long long)e->sector);
 946
 947		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
 948	}
 949
 950	dec_unacked(mdev);
 951
 952	move_to_net_ee_or_free(mdev, e);
 953
 954	if (unlikely(!ok))
 955		dev_err(DEV, "drbd_send_block() failed\n");
 956	return ok;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 957}
 958
 959/**
 960 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
 961 * @mdev:	DRBD device.
 962 * @w:		work object.
 963 * @cancel:	The connection will be closed anyways
 964 */
 965int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 966{
 967	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 968	int ok;
 
 
 969
 970	if (unlikely(cancel)) {
 971		drbd_free_ee(mdev, e);
 972		dec_unacked(mdev);
 973		return 1;
 974	}
 975
 976	if (get_ldev_if_state(mdev, D_FAILED)) {
 977		drbd_rs_complete_io(mdev, e->sector);
 978		put_ldev(mdev);
 979	}
 980
 981	if (mdev->state.conn == C_AHEAD) {
 982		ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
 983	} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 984		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
 985			inc_rs_pending(mdev);
 986			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
 
 
 
 987		} else {
 988			if (__ratelimit(&drbd_ratelimit_state))
 989				dev_err(DEV, "Not sending RSDataReply, "
 990				    "partner DISKLESS!\n");
 991			ok = 1;
 992		}
 993	} else {
 994		if (__ratelimit(&drbd_ratelimit_state))
 995			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
 996			    (unsigned long long)e->sector);
 997
 998		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
 999
1000		/* update resync data with failure */
1001		drbd_rs_failed_io(mdev, e->sector, e->size);
1002	}
1003
1004	dec_unacked(mdev);
1005
1006	move_to_net_ee_or_free(mdev, e);
1007
1008	if (unlikely(!ok))
1009		dev_err(DEV, "drbd_send_block() failed\n");
1010	return ok;
1011}
1012
1013int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1014{
1015	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 
 
1016	struct digest_info *di;
1017	int digest_size;
1018	void *digest = NULL;
1019	int ok, eq = 0;
1020
1021	if (unlikely(cancel)) {
1022		drbd_free_ee(mdev, e);
1023		dec_unacked(mdev);
1024		return 1;
1025	}
1026
1027	if (get_ldev(mdev)) {
1028		drbd_rs_complete_io(mdev, e->sector);
1029		put_ldev(mdev);
1030	}
1031
1032	di = e->digest;
1033
1034	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1035		/* quick hack to try to avoid a race against reconfiguration.
1036		 * a real fix would be much more involved,
1037		 * introducing more locking mechanisms */
1038		if (mdev->csums_tfm) {
1039			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1040			D_ASSERT(digest_size == di->digest_size);
1041			digest = kmalloc(digest_size, GFP_NOIO);
1042		}
1043		if (digest) {
1044			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1045			eq = !memcmp(digest, di->digest, digest_size);
1046			kfree(digest);
1047		}
1048
1049		if (eq) {
1050			drbd_set_in_sync(mdev, e->sector, e->size);
1051			/* rs_same_csums unit is BM_BLOCK_SIZE */
1052			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1053			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1054		} else {
1055			inc_rs_pending(mdev);
1056			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1057			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1058			kfree(di);
1059			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1060		}
1061	} else {
1062		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1063		if (__ratelimit(&drbd_ratelimit_state))
1064			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1065	}
1066
1067	dec_unacked(mdev);
1068	move_to_net_ee_or_free(mdev, e);
1069
1070	if (unlikely(!ok))
1071		dev_err(DEV, "drbd_send_block/ack() failed\n");
1072	return ok;
1073}
1074
1075/* TODO merge common code with w_e_send_csum */
1076int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1077{
1078	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1079	sector_t sector = e->sector;
1080	unsigned int size = e->size;
 
 
1081	int digest_size;
1082	void *digest;
1083	int ok = 1;
1084
1085	if (unlikely(cancel))
1086		goto out;
1087
1088	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1089	digest = kmalloc(digest_size, GFP_NOIO);
1090	if (!digest) {
1091		ok = 0;	/* terminate the connection in case the allocation failed */
1092		goto out;
1093	}
1094
1095	if (likely(!(e->flags & EE_WAS_ERROR)))
1096		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1097	else
1098		memset(digest, 0, digest_size);
1099
1100	/* Free e and pages before send.
1101	 * In case we block on congestion, we could otherwise run into
1102	 * some distributed deadlock, if the other side blocks on
1103	 * congestion as well, because our receiver blocks in
1104	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1105	drbd_free_ee(mdev, e);
1106	e = NULL;
1107	inc_rs_pending(mdev);
1108	ok = drbd_send_drequest_csum(mdev, sector, size,
1109				     digest, digest_size,
1110				     P_OV_REPLY);
1111	if (!ok)
1112		dec_rs_pending(mdev);
1113	kfree(digest);
1114
1115out:
1116	if (e)
1117		drbd_free_ee(mdev, e);
1118	dec_unacked(mdev);
1119	return ok;
1120}
1121
1122void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1123{
1124	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1125		mdev->ov_last_oos_size += size>>9;
1126	} else {
1127		mdev->ov_last_oos_start = sector;
1128		mdev->ov_last_oos_size = size>>9;
1129	}
1130	drbd_set_out_of_sync(mdev, sector, size);
1131}
1132
1133int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1134{
1135	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 
 
1136	struct digest_info *di;
1137	void *digest;
1138	sector_t sector = e->sector;
1139	unsigned int size = e->size;
1140	int digest_size;
1141	int ok, eq = 0;
 
1142
1143	if (unlikely(cancel)) {
1144		drbd_free_ee(mdev, e);
1145		dec_unacked(mdev);
1146		return 1;
1147	}
1148
1149	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1150	 * the resync lru has been cleaned up already */
1151	if (get_ldev(mdev)) {
1152		drbd_rs_complete_io(mdev, e->sector);
1153		put_ldev(mdev);
1154	}
1155
1156	di = e->digest;
1157
1158	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1159		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1160		digest = kmalloc(digest_size, GFP_NOIO);
1161		if (digest) {
1162			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1163
1164			D_ASSERT(digest_size == di->digest_size);
1165			eq = !memcmp(digest, di->digest, digest_size);
1166			kfree(digest);
1167		}
1168	}
1169
1170		/* Free e and pages before send.
1171		 * In case we block on congestion, we could otherwise run into
1172		 * some distributed deadlock, if the other side blocks on
1173		 * congestion as well, because our receiver blocks in
1174		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1175	drbd_free_ee(mdev, e);
1176	if (!eq)
1177		drbd_ov_oos_found(mdev, sector, size);
1178	else
1179		ov_oos_print(mdev);
1180
1181	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1182			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1183
1184	dec_unacked(mdev);
1185
1186	--mdev->ov_left;
1187
1188	/* let's advance progress step marks only for every other megabyte */
1189	if ((mdev->ov_left & 0x200) == 0x200)
1190		drbd_advance_rs_marks(mdev, mdev->ov_left);
1191
1192	if (mdev->ov_left == 0) {
1193		ov_oos_print(mdev);
1194		drbd_resync_finished(mdev);
 
 
 
1195	}
1196
1197	return ok;
1198}
1199
1200int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 
 
 
 
 
1201{
1202	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1203	complete(&b->done);
1204	return 1;
 
 
 
 
 
 
 
 
 
 
1205}
1206
1207int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1208{
1209	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1210	struct p_barrier *p = &mdev->data.sbuf.barrier;
1211	int ok = 1;
1212
1213	/* really avoid racing with tl_clear.  w.cb may have been referenced
1214	 * just before it was reassigned and re-queued, so double check that.
1215	 * actually, this race was harmless, since we only try to send the
1216	 * barrier packet here, and otherwise do nothing with the object.
1217	 * but compare with the head of w_clear_epoch */
1218	spin_lock_irq(&mdev->req_lock);
1219	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1220		cancel = 1;
1221	spin_unlock_irq(&mdev->req_lock);
1222	if (cancel)
1223		return 1;
1224
1225	if (!drbd_get_data_sock(mdev))
 
 
 
 
 
1226		return 0;
1227	p->barrier = b->br_number;
1228	/* inc_ap_pending was done where this was queued.
1229	 * dec_ap_pending will be done in got_BarrierAck
1230	 * or (on connection loss) in w_clear_epoch.  */
1231	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1232				(struct p_header80 *)p, sizeof(*p), 0);
1233	drbd_put_data_sock(mdev);
1234
1235	return ok;
 
 
 
 
 
 
 
1236}
1237
1238int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1239{
1240	if (cancel)
1241		return 1;
1242	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
 
 
 
 
 
1243}
1244
1245int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1246{
1247	struct drbd_request *req = container_of(w, struct drbd_request, w);
1248	int ok;
 
 
 
1249
1250	if (unlikely(cancel)) {
1251		req_mod(req, send_canceled);
1252		return 1;
1253	}
 
 
 
 
 
 
 
1254
1255	ok = drbd_send_oos(mdev, req);
1256	req_mod(req, oos_handed_to_network);
1257
1258	return ok;
1259}
1260
1261/**
1262 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1263 * @mdev:	DRBD device.
1264 * @w:		work object.
1265 * @cancel:	The connection will be closed anyways
1266 */
1267int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1268{
1269	struct drbd_request *req = container_of(w, struct drbd_request, w);
1270	int ok;
 
 
 
 
1271
1272	if (unlikely(cancel)) {
1273		req_mod(req, send_canceled);
1274		return 1;
1275	}
 
 
 
 
 
 
 
 
1276
1277	ok = drbd_send_dblock(mdev, req);
1278	req_mod(req, ok ? handed_over_to_network : send_failed);
1279
1280	return ok;
1281}
1282
1283/**
1284 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1285 * @mdev:	DRBD device.
1286 * @w:		work object.
1287 * @cancel:	The connection will be closed anyways
1288 */
1289int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1290{
1291	struct drbd_request *req = container_of(w, struct drbd_request, w);
1292	int ok;
 
 
 
 
1293
1294	if (unlikely(cancel)) {
1295		req_mod(req, send_canceled);
1296		return 1;
1297	}
 
1298
1299	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1300				(unsigned long)req);
 
1301
1302	if (!ok) {
1303		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1304		 * so this is probably redundant */
1305		if (mdev->state.conn >= C_CONNECTED)
1306			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1307	}
1308	req_mod(req, ok ? handed_over_to_network : send_failed);
1309
1310	return ok;
 
 
 
1311}
1312
1313int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1314{
1315	struct drbd_request *req = container_of(w, struct drbd_request, w);
 
1316
1317	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1318		drbd_al_begin_io(mdev, req->sector);
1319	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1320	   theoretically. Practically it can not deadlock, since this is
1321	   only used when unfreezing IOs. All the extents of the requests
1322	   that made it into the TL are already active */
1323
1324	drbd_req_make_private_bio(req, req->master_bio);
1325	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1326	generic_make_request(req->private_bio);
1327
1328	return 1;
1329}
1330
1331static int _drbd_may_sync_now(struct drbd_conf *mdev)
1332{
1333	struct drbd_conf *odev = mdev;
 
1334
1335	while (1) {
1336		if (odev->sync_conf.after == -1)
 
 
 
 
 
 
 
 
1337			return 1;
1338		odev = minor_to_mdev(odev->sync_conf.after);
1339		ERR_IF(!odev) return 1;
1340		if ((odev->state.conn >= C_SYNC_SOURCE &&
1341		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1342		    odev->state.aftr_isp || odev->state.peer_isp ||
1343		    odev->state.user_isp)
1344			return 0;
1345	}
1346}
1347
1348/**
1349 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1350 * @mdev:	DRBD device.
1351 *
1352 * Called from process context only (admin command and after_state_ch).
1353 */
1354static int _drbd_pause_after(struct drbd_conf *mdev)
1355{
1356	struct drbd_conf *odev;
1357	int i, rv = 0;
 
1358
1359	for (i = 0; i < minor_count; i++) {
1360		odev = minor_to_mdev(i);
1361		if (!odev)
1362			continue;
1363		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1364			continue;
1365		if (!_drbd_may_sync_now(odev))
1366			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1367			       != SS_NOTHING_TO_DO);
 
1368	}
 
1369
1370	return rv;
1371}
1372
1373/**
1374 * _drbd_resume_next() - Resume resync on all devices that may resync now
1375 * @mdev:	DRBD device.
1376 *
1377 * Called from process context only (admin command and worker).
1378 */
1379static int _drbd_resume_next(struct drbd_conf *mdev)
1380{
1381	struct drbd_conf *odev;
1382	int i, rv = 0;
 
1383
1384	for (i = 0; i < minor_count; i++) {
1385		odev = minor_to_mdev(i);
1386		if (!odev)
1387			continue;
1388		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1389			continue;
1390		if (odev->state.aftr_isp) {
1391			if (_drbd_may_sync_now(odev))
1392				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1393							CS_HARD, NULL)
1394				       != SS_NOTHING_TO_DO) ;
1395		}
1396	}
1397	return rv;
 
1398}
1399
1400void resume_next_sg(struct drbd_conf *mdev)
1401{
1402	write_lock_irq(&global_state_lock);
1403	_drbd_resume_next(mdev);
1404	write_unlock_irq(&global_state_lock);
1405}
1406
1407void suspend_other_sg(struct drbd_conf *mdev)
1408{
1409	write_lock_irq(&global_state_lock);
1410	_drbd_pause_after(mdev);
1411	write_unlock_irq(&global_state_lock);
1412}
1413
1414static int sync_after_error(struct drbd_conf *mdev, int o_minor)
 
1415{
1416	struct drbd_conf *odev;
 
1417
1418	if (o_minor == -1)
1419		return NO_ERROR;
1420	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1421		return ERR_SYNC_AFTER;
1422
1423	/* check for loops */
1424	odev = minor_to_mdev(o_minor);
1425	while (1) {
1426		if (odev == mdev)
1427			return ERR_SYNC_AFTER_CYCLE;
 
 
 
 
 
 
 
 
 
1428
 
 
 
1429		/* dependency chain ends here, no cycles. */
1430		if (odev->sync_conf.after == -1)
1431			return NO_ERROR;
1432
1433		/* follow the dependency chain */
1434		odev = minor_to_mdev(odev->sync_conf.after);
1435	}
1436}
1437
1438int drbd_alter_sa(struct drbd_conf *mdev, int na)
 
1439{
1440	int changes;
1441	int retcode;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1442
1443	write_lock_irq(&global_state_lock);
1444	retcode = sync_after_error(mdev, na);
1445	if (retcode == NO_ERROR) {
1446		mdev->sync_conf.after = na;
1447		do {
1448			changes  = _drbd_pause_after(mdev);
1449			changes |= _drbd_resume_next(mdev);
1450		} while (changes);
 
 
 
 
 
1451	}
1452	write_unlock_irq(&global_state_lock);
1453	return retcode;
 
1454}
1455
1456void drbd_rs_controller_reset(struct drbd_conf *mdev)
1457{
1458	atomic_set(&mdev->rs_sect_in, 0);
1459	atomic_set(&mdev->rs_sect_ev, 0);
1460	mdev->rs_in_flight = 0;
1461	mdev->rs_planed = 0;
1462	spin_lock(&mdev->peer_seq_lock);
1463	fifo_set(&mdev->rs_plan_s, 0);
1464	spin_unlock(&mdev->peer_seq_lock);
 
1465}
1466
1467/**
1468 * drbd_start_resync() - Start the resync process
1469 * @mdev:	DRBD device.
1470 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1471 *
1472 * This function might bring you directly into one of the
1473 * C_PAUSED_SYNC_* states.
1474 */
1475void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1476{
 
 
1477	union drbd_state ns;
1478	int r;
1479
1480	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1481		dev_err(DEV, "Resync already running!\n");
1482		return;
1483	}
1484
1485	if (mdev->state.conn < C_AHEAD) {
1486		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1487		drbd_rs_cancel_all(mdev);
1488		/* This should be done when we abort the resync. We definitely do not
1489		   want to have this for connections going back and forth between
1490		   Ahead/Behind and SyncSource/SyncTarget */
1491	}
1492
1493	if (side == C_SYNC_TARGET) {
1494		/* Since application IO was locked out during C_WF_BITMAP_T and
1495		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1496		   we check that we might make the data inconsistent. */
1497		r = drbd_khelper(mdev, "before-resync-target");
1498		r = (r >> 8) & 0xff;
1499		if (r > 0) {
1500			dev_info(DEV, "before-resync-target handler returned %d, "
1501			     "dropping connection.\n", r);
1502			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1503			return;
1504		}
1505	} else /* C_SYNC_SOURCE */ {
1506		r = drbd_khelper(mdev, "before-resync-source");
1507		r = (r >> 8) & 0xff;
1508		if (r > 0) {
1509			if (r == 3) {
1510				dev_info(DEV, "before-resync-source handler returned %d, "
1511					 "ignoring. Old userland tools?", r);
1512			} else {
1513				dev_info(DEV, "before-resync-source handler returned %d, "
1514					 "dropping connection.\n", r);
1515				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1516				return;
1517			}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1518		}
1519	}
1520
1521	drbd_state_lock(mdev);
 
 
 
 
 
 
 
 
 
 
 
1522
1523	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1524		drbd_state_unlock(mdev);
1525		return;
 
 
 
 
1526	}
1527
1528	write_lock_irq(&global_state_lock);
1529	ns = mdev->state;
1530
1531	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1532
1533	ns.conn = side;
1534
1535	if (side == C_SYNC_TARGET)
1536		ns.disk = D_INCONSISTENT;
1537	else /* side == C_SYNC_SOURCE */
1538		ns.pdsk = D_INCONSISTENT;
1539
1540	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1541	ns = mdev->state;
1542
1543	if (ns.conn < C_CONNECTED)
1544		r = SS_UNKNOWN_ERROR;
1545
1546	if (r == SS_SUCCESS) {
1547		unsigned long tw = drbd_bm_total_weight(mdev);
1548		unsigned long now = jiffies;
1549		int i;
1550
1551		mdev->rs_failed    = 0;
1552		mdev->rs_paused    = 0;
1553		mdev->rs_same_csum = 0;
1554		mdev->rs_last_events = 0;
1555		mdev->rs_last_sect_ev = 0;
1556		mdev->rs_total     = tw;
1557		mdev->rs_start     = now;
1558		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1559			mdev->rs_mark_left[i] = tw;
1560			mdev->rs_mark_time[i] = now;
1561		}
1562		_drbd_pause_after(mdev);
 
 
 
 
 
 
 
 
1563	}
1564	write_unlock_irq(&global_state_lock);
1565
1566	if (r == SS_SUCCESS) {
1567		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
 
 
 
 
 
1568		     drbd_conn_str(ns.conn),
1569		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1570		     (unsigned long) mdev->rs_total);
1571		if (side == C_SYNC_TARGET)
1572			mdev->bm_resync_fo = 0;
 
 
 
 
1573
1574		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1575		 * with w_send_oos, or the sync target will get confused as to
1576		 * how much bits to resync.  We cannot do that always, because for an
1577		 * empty resync and protocol < 95, we need to do it here, as we call
1578		 * drbd_resync_finished from here in that case.
1579		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1580		 * and from after_state_ch otherwise. */
1581		if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1582			drbd_gen_and_send_sync_uuid(mdev);
1583
1584		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1585			/* This still has a race (about when exactly the peers
1586			 * detect connection loss) that can lead to a full sync
1587			 * on next handshake. In 8.3.9 we fixed this with explicit
1588			 * resync-finished notifications, but the fix
1589			 * introduces a protocol change.  Sleeping for some
1590			 * time longer than the ping interval + timeout on the
1591			 * SyncSource, to give the SyncTarget the chance to
1592			 * detect connection loss, then waiting for a ping
1593			 * response (implicit in drbd_resync_finished) reduces
1594			 * the race considerably, but does not solve it. */
1595			if (side == C_SYNC_SOURCE)
1596				schedule_timeout_interruptible(
1597					mdev->net_conf->ping_int * HZ +
1598					mdev->net_conf->ping_timeo*HZ/9);
1599			drbd_resync_finished(mdev);
 
 
 
 
 
 
1600		}
1601
1602		drbd_rs_controller_reset(mdev);
1603		/* ns.conn may already be != mdev->state.conn,
1604		 * we may have been paused in between, or become paused until
1605		 * the timer triggers.
1606		 * No matter, that is handled in resync_timer_fn() */
1607		if (ns.conn == C_SYNC_TARGET)
1608			mod_timer(&mdev->resync_timer, jiffies);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1609
1610		drbd_md_sync(mdev);
 
 
 
 
 
 
 
 
 
 
 
1611	}
1612	put_ldev(mdev);
1613	drbd_state_unlock(mdev);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1614}
1615
1616int drbd_worker(struct drbd_thread *thi)
1617{
1618	struct drbd_conf *mdev = thi->mdev;
1619	struct drbd_work *w = NULL;
 
1620	LIST_HEAD(work_list);
1621	int intr = 0, i;
1622
1623	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
 
1624
1625	while (get_t_state(thi) == Running) {
1626		drbd_thread_current_set_cpu(mdev);
1627
1628		if (down_trylock(&mdev->data.work.s)) {
1629			mutex_lock(&mdev->data.mutex);
1630			if (mdev->data.socket && !mdev->net_conf->no_cork)
1631				drbd_tcp_uncork(mdev->data.socket);
1632			mutex_unlock(&mdev->data.mutex);
1633
1634			intr = down_interruptible(&mdev->data.work.s);
1635
1636			mutex_lock(&mdev->data.mutex);
1637			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1638				drbd_tcp_cork(mdev->data.socket);
1639			mutex_unlock(&mdev->data.mutex);
1640		}
1641
1642		if (intr) {
1643			D_ASSERT(intr == -EINTR);
1644			flush_signals(current);
1645			ERR_IF (get_t_state(thi) == Running)
 
1646				continue;
 
1647			break;
1648		}
1649
1650		if (get_t_state(thi) != Running)
1651			break;
1652		/* With this break, we have done a down() but not consumed
1653		   the entry from the list. The cleanup code takes care of
1654		   this...   */
1655
1656		w = NULL;
1657		spin_lock_irq(&mdev->data.work.q_lock);
1658		ERR_IF(list_empty(&mdev->data.work.q)) {
1659			/* something terribly wrong in our logic.
1660			 * we were able to down() the semaphore,
1661			 * but the list is empty... doh.
1662			 *
1663			 * what is the best thing to do now?
1664			 * try again from scratch, restarting the receiver,
1665			 * asender, whatnot? could break even more ugly,
1666			 * e.g. when we are primary, but no good local data.
1667			 *
1668			 * I'll try to get away just starting over this loop.
1669			 */
1670			spin_unlock_irq(&mdev->data.work.q_lock);
1671			continue;
1672		}
1673		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1674		list_del_init(&w->list);
1675		spin_unlock_irq(&mdev->data.work.q_lock);
1676
1677		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1678			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1679			if (mdev->state.conn >= C_CONNECTED)
1680				drbd_force_state(mdev,
1681						NS(conn, C_NETWORK_FAILURE));
1682		}
1683	}
1684	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1685	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1686
1687	spin_lock_irq(&mdev->data.work.q_lock);
1688	i = 0;
1689	while (!list_empty(&mdev->data.work.q)) {
1690		list_splice_init(&mdev->data.work.q, &work_list);
1691		spin_unlock_irq(&mdev->data.work.q_lock);
1692
1693		while (!list_empty(&work_list)) {
1694			w = list_entry(work_list.next, struct drbd_work, list);
1695			list_del_init(&w->list);
1696			w->cb(mdev, w, 1);
1697			i++; /* dead debugging code */
 
 
 
1698		}
1699
1700		spin_lock_irq(&mdev->data.work.q_lock);
1701	}
1702	sema_init(&mdev->data.work.s, 0);
1703	/* DANGEROUS race: if someone did queue his work within the spinlock,
1704	 * but up() ed outside the spinlock, we could get an up() on the
1705	 * semaphore without corresponding list entry.
1706	 * So don't do that.
1707	 */
1708	spin_unlock_irq(&mdev->data.work.q_lock);
1709
1710	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1711	/* _drbd_set_state only uses stop_nowait.
1712	 * wait here for the Exiting receiver. */
1713	drbd_thread_stop(&mdev->receiver);
1714	drbd_mdev_cleanup(mdev);
1715
1716	dev_info(DEV, "worker terminated\n");
1717
1718	clear_bit(DEVICE_DYING, &mdev->flags);
1719	clear_bit(CONFIG_PENDING, &mdev->flags);
1720	wake_up(&mdev->state_wait);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1721
1722	return 0;
1723}
v4.17
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24*/
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched/signal.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_protocol.h"
  40#include "drbd_req.h"
  41
  42static int make_ov_request(struct drbd_device *, int);
  43static int make_resync_request(struct drbd_device *, int);
 
 
 
  44
  45/* endio handlers:
  46 *   drbd_md_endio (defined here)
  47 *   drbd_request_endio (defined here)
  48 *   drbd_peer_request_endio (defined here)
  49 *   drbd_bm_endio (defined in drbd_bitmap.c)
  50 *
  51 * For all these callbacks, note the following:
  52 * The callbacks will be called in irq context by the IDE drivers,
  53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  54 * Try to get the locking right :)
  55 *
  56 */
  57
 
 
 
 
 
 
 
  58/* used for synchronous meta data and bitmap IO
  59 * submitted by drbd_md_sync_page_io()
  60 */
  61void drbd_md_endio(struct bio *bio)
  62{
  63	struct drbd_device *device;
  64
  65	device = bio->bi_private;
  66	device->md_io.error = blk_status_to_errno(bio->bi_status);
  67
  68	/* special case: drbd_md_read() during drbd_adm_attach() */
  69	if (device->ldev)
  70		put_ldev(device);
  71	bio_put(bio);
  72
  73	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  74	 * to timeout on the lower level device, and eventually detach from it.
  75	 * If this io completion runs after that timeout expired, this
  76	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
  77	 * During normal operation, this only puts that extra reference
  78	 * down to 1 again.
  79	 * Make sure we first drop the reference, and only then signal
  80	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  81	 * next drbd_md_sync_page_io(), that we trigger the
  82	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  83	 */
  84	drbd_md_put_buffer(device);
  85	device->md_io.done = 1;
  86	wake_up(&device->misc_wait);
  87}
  88
  89/* reads on behalf of the partner,
  90 * "submitted" by the receiver
  91 */
  92static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
  93{
  94	unsigned long flags = 0;
  95	struct drbd_peer_device *peer_device = peer_req->peer_device;
  96	struct drbd_device *device = peer_device->device;
 
  97
  98	spin_lock_irqsave(&device->resource->req_lock, flags);
  99	device->read_cnt += peer_req->i.size >> 9;
 100	list_del(&peer_req->w.list);
 101	if (list_empty(&device->read_ee))
 102		wake_up(&device->ee_wait);
 103	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 104		__drbd_chk_io_error(device, DRBD_READ_ERROR);
 105	spin_unlock_irqrestore(&device->resource->req_lock, flags);
 106
 107	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
 108	put_ldev(device);
 109}
 110
 111/* writes on behalf of the partner, or resync writes,
 112 * "submitted" by the receiver, final stage.  */
 113void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 114{
 115	unsigned long flags = 0;
 116	struct drbd_peer_device *peer_device = peer_req->peer_device;
 117	struct drbd_device *device = peer_device->device;
 118	struct drbd_connection *connection = peer_device->connection;
 119	struct drbd_interval i;
 120	int do_wake;
 121	u64 block_id;
 122	int do_al_complete_io;
 123
 124	/* after we moved peer_req to done_ee,
 
 
 125	 * we may no longer access it,
 126	 * it may be freed/reused already!
 127	 * (as soon as we release the req_lock) */
 128	i = peer_req->i;
 129	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 130	block_id = peer_req->block_id;
 131	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 132
 133	if (peer_req->flags & EE_WAS_ERROR) {
 134		/* In protocol != C, we usually do not send write acks.
 135		 * In case of a write error, send the neg ack anyways. */
 136		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
 137			inc_unacked(device);
 138		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
 139	}
 140
 141	spin_lock_irqsave(&device->resource->req_lock, flags);
 142	device->writ_cnt += peer_req->i.size >> 9;
 143	list_move_tail(&peer_req->w.list, &device->done_ee);
 
 
 
 
 
 144
 145	/*
 146	 * Do not remove from the write_requests tree here: we did not send the
 147	 * Ack yet and did not wake possibly waiting conflicting requests.
 148	 * Removed from the tree from "drbd_process_done_ee" within the
 149	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 150	 * _drbd_clear_done_ee.
 151	 */
 152
 153	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 154
 155	/* FIXME do we want to detach for failed REQ_DISCARD?
 156	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
 157	if (peer_req->flags & EE_WAS_ERROR)
 158		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 159
 160	if (connection->cstate >= C_WF_REPORT_PARAMS) {
 161		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
 162		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
 163			kref_put(&device->kref, drbd_destroy_device);
 164	}
 165	spin_unlock_irqrestore(&device->resource->req_lock, flags);
 166
 167	if (block_id == ID_SYNCER)
 168		drbd_rs_complete_io(device, i.sector);
 169
 170	if (do_wake)
 171		wake_up(&device->ee_wait);
 172
 173	if (do_al_complete_io)
 174		drbd_al_complete_io(device, &i);
 175
 176	put_ldev(device);
 
 177}
 178
 179/* writes on behalf of the partner, or resync writes,
 180 * "submitted" by the receiver.
 181 */
 182void drbd_peer_request_endio(struct bio *bio)
 183{
 184	struct drbd_peer_request *peer_req = bio->bi_private;
 185	struct drbd_device *device = peer_req->peer_device->device;
 186	bool is_write = bio_data_dir(bio) == WRITE;
 187	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
 188			  bio_op(bio) == REQ_OP_DISCARD;
 189
 190	if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
 191		drbd_warn(device, "%s: error=%d s=%llus\n",
 192				is_write ? (is_discard ? "discard" : "write")
 193					: "read", bio->bi_status,
 194				(unsigned long long)peer_req->i.sector);
 
 
 
 
 
 
 
 
 195
 196	if (bio->bi_status)
 197		set_bit(__EE_WAS_ERROR, &peer_req->flags);
 198
 199	bio_put(bio); /* no need for the bio anymore */
 200	if (atomic_dec_and_test(&peer_req->pending_bios)) {
 201		if (is_write)
 202			drbd_endio_write_sec_final(peer_req);
 203		else
 204			drbd_endio_read_sec_final(peer_req);
 205	}
 206}
 207
 208static void
 209drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
 210{
 211	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
 212		device->minor, device->resource->name, device->vnr);
 213}
 214
 215/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 216 */
 217void drbd_request_endio(struct bio *bio)
 218{
 219	unsigned long flags;
 220	struct drbd_request *req = bio->bi_private;
 221	struct drbd_device *device = req->device;
 222	struct bio_and_error m;
 223	enum drbd_req_event what;
 
 224
 225	/* If this request was aborted locally before,
 226	 * but now was completed "successfully",
 227	 * chances are that this caused arbitrary data corruption.
 228	 *
 229	 * "aborting" requests, or force-detaching the disk, is intended for
 230	 * completely blocked/hung local backing devices which do no longer
 231	 * complete requests at all, not even do error completions.  In this
 232	 * situation, usually a hard-reset and failover is the only way out.
 233	 *
 234	 * By "aborting", basically faking a local error-completion,
 235	 * we allow for a more graceful swichover by cleanly migrating services.
 236	 * Still the affected node has to be rebooted "soon".
 237	 *
 238	 * By completing these requests, we allow the upper layers to re-use
 239	 * the associated data pages.
 240	 *
 241	 * If later the local backing device "recovers", and now DMAs some data
 242	 * from disk into the original request pages, in the best case it will
 243	 * just put random data into unused pages; but typically it will corrupt
 244	 * meanwhile completely unrelated data, causing all sorts of damage.
 245	 *
 246	 * Which means delayed successful completion,
 247	 * especially for READ requests,
 248	 * is a reason to panic().
 249	 *
 250	 * We assume that a delayed *error* completion is OK,
 251	 * though we still will complain noisily about it.
 252	 */
 253	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 254		if (__ratelimit(&drbd_ratelimit_state))
 255			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 256
 257		if (!bio->bi_status)
 258			drbd_panic_after_delayed_completion_of_aborted_request(device);
 259	}
 260
 261	/* to avoid recursion in __req_mod */
 262	if (unlikely(bio->bi_status)) {
 263		switch (bio_op(bio)) {
 264		case REQ_OP_WRITE_ZEROES:
 265		case REQ_OP_DISCARD:
 266			if (bio->bi_status == BLK_STS_NOTSUPP)
 267				what = DISCARD_COMPLETED_NOTSUPP;
 268			else
 269				what = DISCARD_COMPLETED_WITH_ERROR;
 270			break;
 271		case REQ_OP_READ:
 272			if (bio->bi_opf & REQ_RAHEAD)
 273				what = READ_AHEAD_COMPLETED_WITH_ERROR;
 274			else
 275				what = READ_COMPLETED_WITH_ERROR;
 276			break;
 277		default:
 278			what = WRITE_COMPLETED_WITH_ERROR;
 279			break;
 280		}
 281	} else {
 282		what = COMPLETED_OK;
 283	}
 284
 285	bio_put(req->private_bio);
 286	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
 287
 288	/* not req_mod(), we need irqsave here! */
 289	spin_lock_irqsave(&device->resource->req_lock, flags);
 290	__req_mod(req, what, &m);
 291	spin_unlock_irqrestore(&device->resource->req_lock, flags);
 292	put_ldev(device);
 293
 294	if (m.bio)
 295		complete_master_bio(device, &m);
 296}
 297
 298void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
 299{
 300	AHASH_REQUEST_ON_STACK(req, tfm);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 301	struct scatterlist sg;
 302	struct page *page = peer_req->pages;
 303	struct page *tmp;
 304	unsigned len;
 305
 306	ahash_request_set_tfm(req, tfm);
 307	ahash_request_set_callback(req, 0, NULL, NULL);
 308
 309	sg_init_table(&sg, 1);
 310	crypto_ahash_init(req);
 311
 312	while ((tmp = page_chain_next(page))) {
 313		/* all but the last page will be fully used */
 314		sg_set_page(&sg, page, PAGE_SIZE, 0);
 315		ahash_request_set_crypt(req, &sg, NULL, sg.length);
 316		crypto_ahash_update(req);
 317		page = tmp;
 318	}
 319	/* and now the last, possibly only partially used page */
 320	len = peer_req->i.size & (PAGE_SIZE - 1);
 321	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 322	ahash_request_set_crypt(req, &sg, digest, sg.length);
 323	crypto_ahash_finup(req);
 324	ahash_request_zero(req);
 325}
 326
 327void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
 328{
 329	AHASH_REQUEST_ON_STACK(req, tfm);
 330	struct scatterlist sg;
 331	struct bio_vec bvec;
 332	struct bvec_iter iter;
 333
 334	ahash_request_set_tfm(req, tfm);
 335	ahash_request_set_callback(req, 0, NULL, NULL);
 336
 337	sg_init_table(&sg, 1);
 338	crypto_ahash_init(req);
 339
 340	bio_for_each_segment(bvec, bio, iter) {
 341		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
 342		ahash_request_set_crypt(req, &sg, NULL, sg.length);
 343		crypto_ahash_update(req);
 344		/* REQ_OP_WRITE_SAME has only one segment,
 345		 * checksum the payload only once. */
 346		if (bio_op(bio) == REQ_OP_WRITE_SAME)
 347			break;
 348	}
 349	ahash_request_set_crypt(req, NULL, digest, 0);
 350	crypto_ahash_final(req);
 351	ahash_request_zero(req);
 352}
 353
 354/* MAYBE merge common code with w_e_end_ov_req */
 355static int w_e_send_csum(struct drbd_work *w, int cancel)
 356{
 357	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 358	struct drbd_peer_device *peer_device = peer_req->peer_device;
 359	struct drbd_device *device = peer_device->device;
 360	int digest_size;
 361	void *digest;
 362	int err = 0;
 
 
 363
 364	if (unlikely(cancel))
 365		goto out;
 366
 367	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 368		goto out;
 369
 370	digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
 371	digest = kmalloc(digest_size, GFP_NOIO);
 372	if (digest) {
 373		sector_t sector = peer_req->i.sector;
 374		unsigned int size = peer_req->i.size;
 375		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 376		/* Free peer_req and pages before send.
 377		 * In case we block on congestion, we could otherwise run into
 378		 * some distributed deadlock, if the other side blocks on
 379		 * congestion as well, because our receiver blocks in
 380		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 381		drbd_free_peer_req(device, peer_req);
 382		peer_req = NULL;
 383		inc_rs_pending(device);
 384		err = drbd_send_drequest_csum(peer_device, sector, size,
 385					      digest, digest_size,
 386					      P_CSUM_RS_REQUEST);
 387		kfree(digest);
 388	} else {
 389		drbd_err(device, "kmalloc() of digest failed.\n");
 390		err = -ENOMEM;
 391	}
 392
 393out:
 394	if (peer_req)
 395		drbd_free_peer_req(device, peer_req);
 396
 397	if (unlikely(err))
 398		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 399	return err;
 400}
 401
 402#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
 403
 404static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 405{
 406	struct drbd_device *device = peer_device->device;
 407	struct drbd_peer_request *peer_req;
 408
 409	if (!get_ldev(device))
 410		return -EIO;
 411
 
 
 
 412	/* GFP_TRY, because if there is no memory available right now, this may
 413	 * be rescheduled for later. It is "only" background resync, after all. */
 414	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 415				       size, size, GFP_TRY);
 416	if (!peer_req)
 417		goto defer;
 418
 419	peer_req->w.cb = w_e_send_csum;
 420	spin_lock_irq(&device->resource->req_lock);
 421	list_add_tail(&peer_req->w.list, &device->read_ee);
 422	spin_unlock_irq(&device->resource->req_lock);
 423
 424	atomic_add(size >> 9, &device->rs_sect_ev);
 425	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
 426				     DRBD_FAULT_RS_RD) == 0)
 427		return 0;
 428
 429	/* If it failed because of ENOMEM, retry should help.  If it failed
 430	 * because bio_add_page failed (probably broken lower level driver),
 431	 * retry may or may not help.
 432	 * If it does not, you may need to force disconnect. */
 433	spin_lock_irq(&device->resource->req_lock);
 434	list_del(&peer_req->w.list);
 435	spin_unlock_irq(&device->resource->req_lock);
 436
 437	drbd_free_peer_req(device, peer_req);
 438defer:
 439	put_ldev(device);
 440	return -EAGAIN;
 441}
 442
 443int w_resync_timer(struct drbd_work *w, int cancel)
 444{
 445	struct drbd_device *device =
 446		container_of(w, struct drbd_device, resync_work);
 447
 448	switch (device->state.conn) {
 449	case C_VERIFY_S:
 450		make_ov_request(device, cancel);
 451		break;
 452	case C_SYNC_TARGET:
 453		make_resync_request(device, cancel);
 454		break;
 455	}
 456
 457	return 0;
 458}
 459
 460void resync_timer_fn(struct timer_list *t)
 461{
 462	struct drbd_device *device = from_timer(device, t, resync_timer);
 463
 464	drbd_queue_work_if_unqueued(
 465		&first_peer_device(device)->connection->sender_work,
 466		&device->resync_work);
 467}
 468
 469static void fifo_set(struct fifo_buffer *fb, int value)
 470{
 471	int i;
 472
 473	for (i = 0; i < fb->size; i++)
 474		fb->values[i] = value;
 475}
 476
 477static int fifo_push(struct fifo_buffer *fb, int value)
 478{
 479	int ov;
 480
 481	ov = fb->values[fb->head_index];
 482	fb->values[fb->head_index++] = value;
 483
 484	if (fb->head_index >= fb->size)
 485		fb->head_index = 0;
 486
 487	return ov;
 488}
 489
 490static void fifo_add_val(struct fifo_buffer *fb, int value)
 491{
 492	int i;
 493
 494	for (i = 0; i < fb->size; i++)
 495		fb->values[i] += value;
 496}
 497
 498struct fifo_buffer *fifo_alloc(int fifo_size)
 499{
 500	struct fifo_buffer *fb;
 501
 502	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
 503	if (!fb)
 504		return NULL;
 505
 506	fb->head_index = 0;
 507	fb->size = fifo_size;
 508	fb->total = 0;
 509
 510	return fb;
 511}
 512
 513static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 514{
 515	struct disk_conf *dc;
 516	unsigned int want;     /* The number of sectors we want in-flight */
 517	int req_sect; /* Number of sectors to request in this turn */
 518	int correction; /* Number of sectors more we need in-flight */
 519	int cps; /* correction per invocation of drbd_rs_controller() */
 520	int steps; /* Number of time steps to plan ahead */
 521	int curr_corr;
 522	int max_sect;
 523	struct fifo_buffer *plan;
 524
 525	dc = rcu_dereference(device->ldev->disk_conf);
 526	plan = rcu_dereference(device->rs_plan_s);
 
 
 527
 528	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 529
 530	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 531		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 532	} else { /* normal path */
 533		want = dc->c_fill_target ? dc->c_fill_target :
 534			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 535	}
 536
 537	correction = want - device->rs_in_flight - plan->total;
 538
 539	/* Plan ahead */
 540	cps = correction / steps;
 541	fifo_add_val(plan, cps);
 542	plan->total += cps * steps;
 543
 544	/* What we do in this step */
 545	curr_corr = fifo_push(plan, 0);
 546	plan->total -= curr_corr;
 
 547
 548	req_sect = sect_in + curr_corr;
 549	if (req_sect < 0)
 550		req_sect = 0;
 551
 552	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 553	if (req_sect > max_sect)
 554		req_sect = max_sect;
 555
 556	/*
 557	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 558		 sect_in, device->rs_in_flight, want, correction,
 559		 steps, cps, device->rs_planed, curr_corr, req_sect);
 560	*/
 561
 562	return req_sect;
 563}
 564
 565static int drbd_rs_number_requests(struct drbd_device *device)
 566{
 567	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 568	int number, mxb;
 569
 570	sect_in = atomic_xchg(&device->rs_sect_in, 0);
 571	device->rs_in_flight -= sect_in;
 572
 573	rcu_read_lock();
 574	mxb = drbd_get_max_buffers(device) / 2;
 575	if (rcu_dereference(device->rs_plan_s)->size) {
 576		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
 577		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 578	} else {
 579		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 580		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 581	}
 582	rcu_read_unlock();
 583
 584	/* Don't have more than "max-buffers"/2 in-flight.
 585	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
 586	 * potentially causing a distributed deadlock on congestion during
 587	 * online-verify or (checksum-based) resync, if max-buffers,
 588	 * socket buffer sizes and resync rate settings are mis-configured. */
 589
 590	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
 591	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
 592	 * "number of pages" (typically also 4k),
 593	 * but "rs_in_flight" is in "sectors" (512 Byte). */
 594	if (mxb - device->rs_in_flight/8 < number)
 595		number = mxb - device->rs_in_flight/8;
 596
 
 
 597	return number;
 598}
 599
 600static int make_resync_request(struct drbd_device *const device, int cancel)
 
 601{
 602	struct drbd_peer_device *const peer_device = first_peer_device(device);
 603	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 604	unsigned long bit;
 605	sector_t sector;
 606	const sector_t capacity = drbd_get_capacity(device->this_bdev);
 607	int max_bio_size;
 608	int number, rollback_i, size;
 609	int align, requeue = 0;
 610	int i = 0;
 611	int discard_granularity = 0;
 612
 613	if (unlikely(cancel))
 614		return 0;
 615
 616	if (device->rs_total == 0) {
 617		/* empty resync? */
 618		drbd_resync_finished(device);
 619		return 0;
 620	}
 621
 622	if (!get_ldev(device)) {
 623		/* Since we only need to access device->rsync a
 624		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
 625		   to continue resync with a broken disk makes no sense at
 626		   all */
 627		drbd_err(device, "Disk broke down during resync!\n");
 628		return 0;
 629	}
 630
 631	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
 632		rcu_read_lock();
 633		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
 634		rcu_read_unlock();
 635	}
 636
 637	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 638	number = drbd_rs_number_requests(device);
 639	if (number <= 0)
 640		goto requeue;
 641
 642	for (i = 0; i < number; i++) {
 643		/* Stop generating RS requests when half of the send buffer is filled,
 644		 * but notify TCP that we'd like to have more space. */
 645		mutex_lock(&connection->data.mutex);
 646		if (connection->data.socket) {
 647			struct sock *sk = connection->data.socket->sk;
 648			int queued = sk->sk_wmem_queued;
 649			int sndbuf = sk->sk_sndbuf;
 650			if (queued > sndbuf / 2) {
 651				requeue = 1;
 652				if (sk->sk_socket)
 653					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 654			}
 655		} else
 656			requeue = 1;
 657		mutex_unlock(&connection->data.mutex);
 658		if (requeue)
 659			goto requeue;
 660
 661next_sector:
 662		size = BM_BLOCK_SIZE;
 663		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 664
 665		if (bit == DRBD_END_OF_BITMAP) {
 666			device->bm_resync_fo = drbd_bm_bits(device);
 667			put_ldev(device);
 668			return 0;
 669		}
 670
 671		sector = BM_BIT_TO_SECT(bit);
 672
 673		if (drbd_try_rs_begin_io(device, sector)) {
 674			device->bm_resync_fo = bit;
 
 675			goto requeue;
 676		}
 677		device->bm_resync_fo = bit + 1;
 678
 679		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 680			drbd_rs_complete_io(device, sector);
 681			goto next_sector;
 682		}
 683
 684#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 685		/* try to find some adjacent bits.
 686		 * we stop if we have already the maximum req size.
 687		 *
 688		 * Additionally always align bigger requests, in order to
 689		 * be prepared for all stripe sizes of software RAIDs.
 690		 */
 691		align = 1;
 692		rollback_i = i;
 693		while (i < number) {
 694			if (size + BM_BLOCK_SIZE > max_bio_size)
 695				break;
 696
 697			/* Be always aligned */
 698			if (sector & ((1<<(align+3))-1))
 699				break;
 700
 701			if (discard_granularity && size == discard_granularity)
 702				break;
 703
 704			/* do not cross extent boundaries */
 705			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 706				break;
 707			/* now, is it actually dirty, after all?
 708			 * caution, drbd_bm_test_bit is tri-state for some
 709			 * obscure reason; ( b == 0 ) would get the out-of-band
 710			 * only accidentally right because of the "oddly sized"
 711			 * adjustment below */
 712			if (drbd_bm_test_bit(device, bit+1) != 1)
 713				break;
 714			bit++;
 715			size += BM_BLOCK_SIZE;
 716			if ((BM_BLOCK_SIZE << align) <= size)
 717				align++;
 718			i++;
 719		}
 720		/* if we merged some,
 721		 * reset the offset to start the next drbd_bm_find_next from */
 722		if (size > BM_BLOCK_SIZE)
 723			device->bm_resync_fo = bit + 1;
 724#endif
 725
 726		/* adjust very last sectors, in case we are oddly sized */
 727		if (sector + (size>>9) > capacity)
 728			size = (capacity-sector)<<9;
 729
 730		if (device->use_csums) {
 731			switch (read_for_csum(peer_device, sector, size)) {
 732			case -EIO: /* Disk failure */
 733				put_ldev(device);
 734				return -EIO;
 735			case -EAGAIN: /* allocation failed, or ldev busy */
 736				drbd_rs_complete_io(device, sector);
 737				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 738				i = rollback_i;
 739				goto requeue;
 740			case 0:
 741				/* everything ok */
 742				break;
 743			default:
 744				BUG();
 745			}
 746		} else {
 747			int err;
 748
 749			inc_rs_pending(device);
 750			err = drbd_send_drequest(peer_device,
 751						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
 752						 sector, size, ID_SYNCER);
 753			if (err) {
 754				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 755				dec_rs_pending(device);
 756				put_ldev(device);
 757				return err;
 758			}
 759		}
 760	}
 761
 762	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 763		/* last syncer _request_ was sent,
 764		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 765		 * next sync group will resume), as soon as we receive the last
 766		 * resync data block, and the last bit is cleared.
 767		 * until then resync "work" is "inactive" ...
 768		 */
 769		put_ldev(device);
 770		return 0;
 771	}
 772
 773 requeue:
 774	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 775	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 776	put_ldev(device);
 777	return 0;
 778}
 779
 780static int make_ov_request(struct drbd_device *device, int cancel)
 781{
 782	int number, i, size;
 783	sector_t sector;
 784	const sector_t capacity = drbd_get_capacity(device->this_bdev);
 785	bool stop_sector_reached = false;
 786
 787	if (unlikely(cancel))
 788		return 1;
 789
 790	number = drbd_rs_number_requests(device);
 791
 792	sector = device->ov_position;
 793	for (i = 0; i < number; i++) {
 794		if (sector >= capacity)
 795			return 1;
 796
 797		/* We check for "finished" only in the reply path:
 798		 * w_e_end_ov_reply().
 799		 * We need to send at least one request out. */
 800		stop_sector_reached = i > 0
 801			&& verify_can_do_stop_sector(device)
 802			&& sector >= device->ov_stop_sector;
 803		if (stop_sector_reached)
 804			break;
 805
 806		size = BM_BLOCK_SIZE;
 807
 808		if (drbd_try_rs_begin_io(device, sector)) {
 809			device->ov_position = sector;
 
 810			goto requeue;
 811		}
 812
 813		if (sector + (size>>9) > capacity)
 814			size = (capacity-sector)<<9;
 815
 816		inc_rs_pending(device);
 817		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 818			dec_rs_pending(device);
 819			return 0;
 820		}
 821		sector += BM_SECT_PER_BIT;
 822	}
 823	device->ov_position = sector;
 824
 825 requeue:
 826	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 827	if (i == 0 || !stop_sector_reached)
 828		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 829	return 1;
 830}
 831
 832int w_ov_finished(struct drbd_work *w, int cancel)
 
 833{
 834	struct drbd_device_work *dw =
 835		container_of(w, struct drbd_device_work, w);
 836	struct drbd_device *device = dw->device;
 837	kfree(dw);
 838	ov_out_of_sync_print(device);
 839	drbd_resync_finished(device);
 840
 841	return 0;
 842}
 843
 844static int w_resync_finished(struct drbd_work *w, int cancel)
 845{
 846	struct drbd_device_work *dw =
 847		container_of(w, struct drbd_device_work, w);
 848	struct drbd_device *device = dw->device;
 849	kfree(dw);
 
 
 850
 851	drbd_resync_finished(device);
 
 
 
 
 
 
 
 
 
 852
 853	return 0;
 854}
 855
 856static void ping_peer(struct drbd_device *device)
 857{
 858	struct drbd_connection *connection = first_peer_device(device)->connection;
 
 
 
 
 
 859
 860	clear_bit(GOT_PING_ACK, &connection->flags);
 861	request_ping(connection);
 862	wait_event(connection->ping_wait,
 863		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 
 
 864}
 865
 866int drbd_resync_finished(struct drbd_device *device)
 867{
 868	struct drbd_connection *connection = first_peer_device(device)->connection;
 869	unsigned long db, dt, dbdt;
 870	unsigned long n_oos;
 871	union drbd_state os, ns;
 872	struct drbd_device_work *dw;
 873	char *khelper_cmd = NULL;
 874	int verify_done = 0;
 875
 876	/* Remove all elements from the resync LRU. Since future actions
 877	 * might set bits in the (main) bitmap, then the entries in the
 878	 * resync LRU would be wrong. */
 879	if (drbd_rs_del_all(device)) {
 880		/* In case this is not possible now, most probably because
 881		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 882		 * queue (or even the read operations for those packets
 883		 * is not finished by now).   Retry in 100ms. */
 884
 885		schedule_timeout_interruptible(HZ / 10);
 886		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 887		if (dw) {
 888			dw->w.cb = w_resync_finished;
 889			dw->device = device;
 890			drbd_queue_work(&connection->sender_work, &dw->w);
 891			return 1;
 892		}
 893		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 894	}
 895
 896	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 897	if (dt <= 0)
 898		dt = 1;
 899
 900	db = device->rs_total;
 901	/* adjust for verify start and stop sectors, respective reached position */
 902	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 903		db -= device->ov_left;
 904
 905	dbdt = Bit2KB(db/dt);
 906	device->rs_paused /= HZ;
 907
 908	if (!get_ldev(device))
 909		goto out;
 910
 911	ping_peer(device);
 912
 913	spin_lock_irq(&device->resource->req_lock);
 914	os = drbd_read_state(device);
 915
 916	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 917
 918	/* This protects us against multiple calls (that can happen in the presence
 919	   of application IO), and against connectivity loss just before we arrive here. */
 920	if (os.conn <= C_CONNECTED)
 921		goto out_unlock;
 922
 923	ns = os;
 924	ns.conn = C_CONNECTED;
 925
 926	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 927	     verify_done ? "Online verify" : "Resync",
 928	     dt + device->rs_paused, device->rs_paused, dbdt);
 929
 930	n_oos = drbd_bm_total_weight(device);
 931
 932	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 933		if (n_oos) {
 934			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 935			      n_oos, Bit2KB(1));
 936			khelper_cmd = "out-of-sync";
 937		}
 938	} else {
 939		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 940
 941		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 942			khelper_cmd = "after-resync-target";
 943
 944		if (device->use_csums && device->rs_total) {
 945			const unsigned long s = device->rs_same_csum;
 946			const unsigned long t = device->rs_total;
 947			const int ratio =
 948				(t == 0)     ? 0 :
 949			(t < 100000) ? ((s*100)/t) : (s/(t/100));
 950			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 951			     "transferred %luK total %luK\n",
 952			     ratio,
 953			     Bit2KB(device->rs_same_csum),
 954			     Bit2KB(device->rs_total - device->rs_same_csum),
 955			     Bit2KB(device->rs_total));
 956		}
 957	}
 958
 959	if (device->rs_failed) {
 960		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 961
 962		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 963			ns.disk = D_INCONSISTENT;
 964			ns.pdsk = D_UP_TO_DATE;
 965		} else {
 966			ns.disk = D_UP_TO_DATE;
 967			ns.pdsk = D_INCONSISTENT;
 968		}
 969	} else {
 970		ns.disk = D_UP_TO_DATE;
 971		ns.pdsk = D_UP_TO_DATE;
 972
 973		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 974			if (device->p_uuid) {
 975				int i;
 976				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 977					_drbd_uuid_set(device, i, device->p_uuid[i]);
 978				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 979				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 980			} else {
 981				drbd_err(device, "device->p_uuid is NULL! BUG\n");
 982			}
 983		}
 984
 985		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 986			/* for verify runs, we don't update uuids here,
 987			 * so there would be nothing to report. */
 988			drbd_uuid_set_bm(device, 0UL);
 989			drbd_print_uuids(device, "updated UUIDs");
 990			if (device->p_uuid) {
 991				/* Now the two UUID sets are equal, update what we
 992				 * know of the peer. */
 993				int i;
 994				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 995					device->p_uuid[i] = device->ldev->md.uuid[i];
 996			}
 997		}
 998	}
 999
1000	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
1001out_unlock:
1002	spin_unlock_irq(&device->resource->req_lock);
1003
1004	/* If we have been sync source, and have an effective fencing-policy,
1005	 * once *all* volumes are back in sync, call "unfence". */
1006	if (os.conn == C_SYNC_SOURCE) {
1007		enum drbd_disk_state disk_state = D_MASK;
1008		enum drbd_disk_state pdsk_state = D_MASK;
1009		enum drbd_fencing_p fp = FP_DONT_CARE;
1010
1011		rcu_read_lock();
1012		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1013		if (fp != FP_DONT_CARE) {
1014			struct drbd_peer_device *peer_device;
1015			int vnr;
1016			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1017				struct drbd_device *device = peer_device->device;
1018				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1019				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1020			}
1021		}
1022		rcu_read_unlock();
1023		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1024			conn_khelper(connection, "unfence-peer");
1025	}
1026
1027	put_ldev(device);
1028out:
1029	device->rs_total  = 0;
1030	device->rs_failed = 0;
1031	device->rs_paused = 0;
1032
1033	/* reset start sector, if we reached end of device */
1034	if (verify_done && device->ov_left == 0)
1035		device->ov_start_sector = 0;
1036
1037	drbd_md_sync(device);
1038
1039	if (khelper_cmd)
1040		drbd_khelper(device, khelper_cmd);
1041
1042	return 1;
1043}
1044
1045/* helper */
1046static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1047{
1048	if (drbd_peer_req_has_active_page(peer_req)) {
1049		/* This might happen if sendpage() has not finished */
1050		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1051		atomic_add(i, &device->pp_in_use_by_net);
1052		atomic_sub(i, &device->pp_in_use);
1053		spin_lock_irq(&device->resource->req_lock);
1054		list_add_tail(&peer_req->w.list, &device->net_ee);
1055		spin_unlock_irq(&device->resource->req_lock);
1056		wake_up(&drbd_pp_wait);
1057	} else
1058		drbd_free_peer_req(device, peer_req);
1059}
1060
1061/**
1062 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 
1063 * @w:		work object.
1064 * @cancel:	The connection will be closed anyways
1065 */
1066int w_e_end_data_req(struct drbd_work *w, int cancel)
1067{
1068	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1069	struct drbd_peer_device *peer_device = peer_req->peer_device;
1070	struct drbd_device *device = peer_device->device;
1071	int err;
1072
1073	if (unlikely(cancel)) {
1074		drbd_free_peer_req(device, peer_req);
1075		dec_unacked(device);
1076		return 0;
1077	}
1078
1079	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1080		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1081	} else {
1082		if (__ratelimit(&drbd_ratelimit_state))
1083			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1084			    (unsigned long long)peer_req->i.sector);
1085
1086		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1087	}
1088
1089	dec_unacked(device);
1090
1091	move_to_net_ee_or_free(device, peer_req);
1092
1093	if (unlikely(err))
1094		drbd_err(device, "drbd_send_block() failed\n");
1095	return err;
1096}
1097
1098static bool all_zero(struct drbd_peer_request *peer_req)
1099{
1100	struct page *page = peer_req->pages;
1101	unsigned int len = peer_req->i.size;
1102
1103	page_chain_for_each(page) {
1104		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1105		unsigned int i, words = l / sizeof(long);
1106		unsigned long *d;
1107
1108		d = kmap_atomic(page);
1109		for (i = 0; i < words; i++) {
1110			if (d[i]) {
1111				kunmap_atomic(d);
1112				return false;
1113			}
1114		}
1115		kunmap_atomic(d);
1116		len -= l;
1117	}
1118
1119	return true;
1120}
1121
1122/**
1123 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
 
1124 * @w:		work object.
1125 * @cancel:	The connection will be closed anyways
1126 */
1127int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1128{
1129	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1130	struct drbd_peer_device *peer_device = peer_req->peer_device;
1131	struct drbd_device *device = peer_device->device;
1132	int err;
1133
1134	if (unlikely(cancel)) {
1135		drbd_free_peer_req(device, peer_req);
1136		dec_unacked(device);
1137		return 0;
1138	}
1139
1140	if (get_ldev_if_state(device, D_FAILED)) {
1141		drbd_rs_complete_io(device, peer_req->i.sector);
1142		put_ldev(device);
1143	}
1144
1145	if (device->state.conn == C_AHEAD) {
1146		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1147	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1148		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1149			inc_rs_pending(device);
1150			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1151				err = drbd_send_rs_deallocated(peer_device, peer_req);
1152			else
1153				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154		} else {
1155			if (__ratelimit(&drbd_ratelimit_state))
1156				drbd_err(device, "Not sending RSDataReply, "
1157				    "partner DISKLESS!\n");
1158			err = 0;
1159		}
1160	} else {
1161		if (__ratelimit(&drbd_ratelimit_state))
1162			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1163			    (unsigned long long)peer_req->i.sector);
1164
1165		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1166
1167		/* update resync data with failure */
1168		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1169	}
1170
1171	dec_unacked(device);
1172
1173	move_to_net_ee_or_free(device, peer_req);
1174
1175	if (unlikely(err))
1176		drbd_err(device, "drbd_send_block() failed\n");
1177	return err;
1178}
1179
1180int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1181{
1182	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1183	struct drbd_peer_device *peer_device = peer_req->peer_device;
1184	struct drbd_device *device = peer_device->device;
1185	struct digest_info *di;
1186	int digest_size;
1187	void *digest = NULL;
1188	int err, eq = 0;
1189
1190	if (unlikely(cancel)) {
1191		drbd_free_peer_req(device, peer_req);
1192		dec_unacked(device);
1193		return 0;
1194	}
1195
1196	if (get_ldev(device)) {
1197		drbd_rs_complete_io(device, peer_req->i.sector);
1198		put_ldev(device);
1199	}
1200
1201	di = peer_req->digest;
1202
1203	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1204		/* quick hack to try to avoid a race against reconfiguration.
1205		 * a real fix would be much more involved,
1206		 * introducing more locking mechanisms */
1207		if (peer_device->connection->csums_tfm) {
1208			digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1209			D_ASSERT(device, digest_size == di->digest_size);
1210			digest = kmalloc(digest_size, GFP_NOIO);
1211		}
1212		if (digest) {
1213			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1214			eq = !memcmp(digest, di->digest, digest_size);
1215			kfree(digest);
1216		}
1217
1218		if (eq) {
1219			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1220			/* rs_same_csums unit is BM_BLOCK_SIZE */
1221			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1222			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1223		} else {
1224			inc_rs_pending(device);
1225			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1226			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1227			kfree(di);
1228			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1229		}
1230	} else {
1231		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1232		if (__ratelimit(&drbd_ratelimit_state))
1233			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1234	}
1235
1236	dec_unacked(device);
1237	move_to_net_ee_or_free(device, peer_req);
1238
1239	if (unlikely(err))
1240		drbd_err(device, "drbd_send_block/ack() failed\n");
1241	return err;
1242}
1243
1244int w_e_end_ov_req(struct drbd_work *w, int cancel)
 
1245{
1246	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1247	struct drbd_peer_device *peer_device = peer_req->peer_device;
1248	struct drbd_device *device = peer_device->device;
1249	sector_t sector = peer_req->i.sector;
1250	unsigned int size = peer_req->i.size;
1251	int digest_size;
1252	void *digest;
1253	int err = 0;
1254
1255	if (unlikely(cancel))
1256		goto out;
1257
1258	digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1259	digest = kmalloc(digest_size, GFP_NOIO);
1260	if (!digest) {
1261		err = 1;	/* terminate the connection in case the allocation failed */
1262		goto out;
1263	}
1264
1265	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1266		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1267	else
1268		memset(digest, 0, digest_size);
1269
1270	/* Free e and pages before send.
1271	 * In case we block on congestion, we could otherwise run into
1272	 * some distributed deadlock, if the other side blocks on
1273	 * congestion as well, because our receiver blocks in
1274	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1275	drbd_free_peer_req(device, peer_req);
1276	peer_req = NULL;
1277	inc_rs_pending(device);
1278	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1279	if (err)
1280		dec_rs_pending(device);
 
 
1281	kfree(digest);
1282
1283out:
1284	if (peer_req)
1285		drbd_free_peer_req(device, peer_req);
1286	dec_unacked(device);
1287	return err;
1288}
1289
1290void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1291{
1292	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1293		device->ov_last_oos_size += size>>9;
1294	} else {
1295		device->ov_last_oos_start = sector;
1296		device->ov_last_oos_size = size>>9;
1297	}
1298	drbd_set_out_of_sync(device, sector, size);
1299}
1300
1301int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1302{
1303	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1304	struct drbd_peer_device *peer_device = peer_req->peer_device;
1305	struct drbd_device *device = peer_device->device;
1306	struct digest_info *di;
1307	void *digest;
1308	sector_t sector = peer_req->i.sector;
1309	unsigned int size = peer_req->i.size;
1310	int digest_size;
1311	int err, eq = 0;
1312	bool stop_sector_reached = false;
1313
1314	if (unlikely(cancel)) {
1315		drbd_free_peer_req(device, peer_req);
1316		dec_unacked(device);
1317		return 0;
1318	}
1319
1320	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1321	 * the resync lru has been cleaned up already */
1322	if (get_ldev(device)) {
1323		drbd_rs_complete_io(device, peer_req->i.sector);
1324		put_ldev(device);
1325	}
1326
1327	di = peer_req->digest;
1328
1329	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1330		digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1331		digest = kmalloc(digest_size, GFP_NOIO);
1332		if (digest) {
1333			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1334
1335			D_ASSERT(device, digest_size == di->digest_size);
1336			eq = !memcmp(digest, di->digest, digest_size);
1337			kfree(digest);
1338		}
1339	}
1340
1341	/* Free peer_req and pages before send.
1342	 * In case we block on congestion, we could otherwise run into
1343	 * some distributed deadlock, if the other side blocks on
1344	 * congestion as well, because our receiver blocks in
1345	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1346	drbd_free_peer_req(device, peer_req);
1347	if (!eq)
1348		drbd_ov_out_of_sync_found(device, sector, size);
1349	else
1350		ov_out_of_sync_print(device);
1351
1352	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1353			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1354
1355	dec_unacked(device);
1356
1357	--device->ov_left;
1358
1359	/* let's advance progress step marks only for every other megabyte */
1360	if ((device->ov_left & 0x200) == 0x200)
1361		drbd_advance_rs_marks(device, device->ov_left);
1362
1363	stop_sector_reached = verify_can_do_stop_sector(device) &&
1364		(sector + (size>>9)) >= device->ov_stop_sector;
1365
1366	if (device->ov_left == 0 || stop_sector_reached) {
1367		ov_out_of_sync_print(device);
1368		drbd_resync_finished(device);
1369	}
1370
1371	return err;
1372}
1373
1374/* FIXME
1375 * We need to track the number of pending barrier acks,
1376 * and to be able to wait for them.
1377 * See also comment in drbd_adm_attach before drbd_suspend_io.
1378 */
1379static int drbd_send_barrier(struct drbd_connection *connection)
1380{
1381	struct p_barrier *p;
1382	struct drbd_socket *sock;
1383
1384	sock = &connection->data;
1385	p = conn_prepare_command(connection, sock);
1386	if (!p)
1387		return -EIO;
1388	p->barrier = connection->send.current_epoch_nr;
1389	p->pad = 0;
1390	connection->send.current_epoch_writes = 0;
1391	connection->send.last_sent_barrier_jif = jiffies;
1392
1393	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1394}
1395
1396static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1397{
1398	struct drbd_socket *sock = &pd->connection->data;
1399	if (!drbd_prepare_command(pd, sock))
1400		return -EIO;
1401	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1402}
 
 
 
 
 
 
 
 
 
 
1403
1404int w_send_write_hint(struct drbd_work *w, int cancel)
1405{
1406	struct drbd_device *device =
1407		container_of(w, struct drbd_device, unplug_work);
1408
1409	if (cancel)
1410		return 0;
1411	return pd_send_unplug_remote(first_peer_device(device));
1412}
 
 
 
 
 
1413
1414static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1415{
1416	if (!connection->send.seen_any_write_yet) {
1417		connection->send.seen_any_write_yet = true;
1418		connection->send.current_epoch_nr = epoch;
1419		connection->send.current_epoch_writes = 0;
1420		connection->send.last_sent_barrier_jif = jiffies;
1421	}
1422}
1423
1424static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1425{
1426	/* re-init if first write on this connection */
1427	if (!connection->send.seen_any_write_yet)
1428		return;
1429	if (connection->send.current_epoch_nr != epoch) {
1430		if (connection->send.current_epoch_writes)
1431			drbd_send_barrier(connection);
1432		connection->send.current_epoch_nr = epoch;
1433	}
1434}
1435
1436int w_send_out_of_sync(struct drbd_work *w, int cancel)
1437{
1438	struct drbd_request *req = container_of(w, struct drbd_request, w);
1439	struct drbd_device *device = req->device;
1440	struct drbd_peer_device *const peer_device = first_peer_device(device);
1441	struct drbd_connection *const connection = peer_device->connection;
1442	int err;
1443
1444	if (unlikely(cancel)) {
1445		req_mod(req, SEND_CANCELED);
1446		return 0;
1447	}
1448	req->pre_send_jif = jiffies;
1449
1450	/* this time, no connection->send.current_epoch_writes++;
1451	 * If it was sent, it was the closing barrier for the last
1452	 * replicated epoch, before we went into AHEAD mode.
1453	 * No more barriers will be sent, until we leave AHEAD mode again. */
1454	maybe_send_barrier(connection, req->epoch);
1455
1456	err = drbd_send_out_of_sync(peer_device, req);
1457	req_mod(req, OOS_HANDED_TO_NETWORK);
1458
1459	return err;
1460}
1461
1462/**
1463 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
 
1464 * @w:		work object.
1465 * @cancel:	The connection will be closed anyways
1466 */
1467int w_send_dblock(struct drbd_work *w, int cancel)
1468{
1469	struct drbd_request *req = container_of(w, struct drbd_request, w);
1470	struct drbd_device *device = req->device;
1471	struct drbd_peer_device *const peer_device = first_peer_device(device);
1472	struct drbd_connection *connection = peer_device->connection;
1473	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1474	int err;
1475
1476	if (unlikely(cancel)) {
1477		req_mod(req, SEND_CANCELED);
1478		return 0;
1479	}
1480	req->pre_send_jif = jiffies;
1481
1482	re_init_if_first_write(connection, req->epoch);
1483	maybe_send_barrier(connection, req->epoch);
1484	connection->send.current_epoch_writes++;
1485
1486	err = drbd_send_dblock(peer_device, req);
1487	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1488
1489	if (do_send_unplug && !err)
1490		pd_send_unplug_remote(peer_device);
1491
1492	return err;
1493}
1494
1495/**
1496 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
 
1497 * @w:		work object.
1498 * @cancel:	The connection will be closed anyways
1499 */
1500int w_send_read_req(struct drbd_work *w, int cancel)
1501{
1502	struct drbd_request *req = container_of(w, struct drbd_request, w);
1503	struct drbd_device *device = req->device;
1504	struct drbd_peer_device *const peer_device = first_peer_device(device);
1505	struct drbd_connection *connection = peer_device->connection;
1506	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1507	int err;
1508
1509	if (unlikely(cancel)) {
1510		req_mod(req, SEND_CANCELED);
1511		return 0;
1512	}
1513	req->pre_send_jif = jiffies;
1514
1515	/* Even read requests may close a write epoch,
1516	 * if there was any yet. */
1517	maybe_send_barrier(connection, req->epoch);
1518
1519	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1520				 (unsigned long)req);
1521
1522	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
 
 
 
1523
1524	if (do_send_unplug && !err)
1525		pd_send_unplug_remote(peer_device);
1526
1527	return err;
1528}
1529
1530int w_restart_disk_io(struct drbd_work *w, int cancel)
1531{
1532	struct drbd_request *req = container_of(w, struct drbd_request, w);
1533	struct drbd_device *device = req->device;
1534
1535	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1536		drbd_al_begin_io(device, &req->i);
 
 
 
 
1537
1538	drbd_req_make_private_bio(req, req->master_bio);
1539	bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1540	generic_make_request(req->private_bio);
1541
1542	return 0;
1543}
1544
1545static int _drbd_may_sync_now(struct drbd_device *device)
1546{
1547	struct drbd_device *odev = device;
1548	int resync_after;
1549
1550	while (1) {
1551		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1552			return 1;
1553		rcu_read_lock();
1554		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1555		rcu_read_unlock();
1556		if (resync_after == -1)
1557			return 1;
1558		odev = minor_to_device(resync_after);
1559		if (!odev)
1560			return 1;
 
 
1561		if ((odev->state.conn >= C_SYNC_SOURCE &&
1562		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1563		    odev->state.aftr_isp || odev->state.peer_isp ||
1564		    odev->state.user_isp)
1565			return 0;
1566	}
1567}
1568
1569/**
1570 * drbd_pause_after() - Pause resync on all devices that may not resync now
1571 * @device:	DRBD device.
1572 *
1573 * Called from process context only (admin command and after_state_ch).
1574 */
1575static bool drbd_pause_after(struct drbd_device *device)
1576{
1577	bool changed = false;
1578	struct drbd_device *odev;
1579	int i;
1580
1581	rcu_read_lock();
1582	idr_for_each_entry(&drbd_devices, odev, i) {
 
 
1583		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1584			continue;
1585		if (!_drbd_may_sync_now(odev) &&
1586		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1587				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1588			changed = true;
1589	}
1590	rcu_read_unlock();
1591
1592	return changed;
1593}
1594
1595/**
1596 * drbd_resume_next() - Resume resync on all devices that may resync now
1597 * @device:	DRBD device.
1598 *
1599 * Called from process context only (admin command and worker).
1600 */
1601static bool drbd_resume_next(struct drbd_device *device)
1602{
1603	bool changed = false;
1604	struct drbd_device *odev;
1605	int i;
1606
1607	rcu_read_lock();
1608	idr_for_each_entry(&drbd_devices, odev, i) {
 
 
1609		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1610			continue;
1611		if (odev->state.aftr_isp) {
1612			if (_drbd_may_sync_now(odev) &&
1613			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1614					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1615				changed = true;
1616		}
1617	}
1618	rcu_read_unlock();
1619	return changed;
1620}
1621
1622void resume_next_sg(struct drbd_device *device)
1623{
1624	lock_all_resources();
1625	drbd_resume_next(device);
1626	unlock_all_resources();
1627}
1628
1629void suspend_other_sg(struct drbd_device *device)
1630{
1631	lock_all_resources();
1632	drbd_pause_after(device);
1633	unlock_all_resources();
1634}
1635
1636/* caller must lock_all_resources() */
1637enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1638{
1639	struct drbd_device *odev;
1640	int resync_after;
1641
1642	if (o_minor == -1)
1643		return NO_ERROR;
1644	if (o_minor < -1 || o_minor > MINORMASK)
1645		return ERR_RESYNC_AFTER;
1646
1647	/* check for loops */
1648	odev = minor_to_device(o_minor);
1649	while (1) {
1650		if (odev == device)
1651			return ERR_RESYNC_AFTER_CYCLE;
1652
1653		/* You are free to depend on diskless, non-existing,
1654		 * or not yet/no longer existing minors.
1655		 * We only reject dependency loops.
1656		 * We cannot follow the dependency chain beyond a detached or
1657		 * missing minor.
1658		 */
1659		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1660			return NO_ERROR;
1661
1662		rcu_read_lock();
1663		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1664		rcu_read_unlock();
1665		/* dependency chain ends here, no cycles. */
1666		if (resync_after == -1)
1667			return NO_ERROR;
1668
1669		/* follow the dependency chain */
1670		odev = minor_to_device(resync_after);
1671	}
1672}
1673
1674/* caller must lock_all_resources() */
1675void drbd_resync_after_changed(struct drbd_device *device)
1676{
1677	int changed;
1678
1679	do {
1680		changed  = drbd_pause_after(device);
1681		changed |= drbd_resume_next(device);
1682	} while (changed);
1683}
1684
1685void drbd_rs_controller_reset(struct drbd_device *device)
1686{
1687	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1688	struct fifo_buffer *plan;
1689
1690	atomic_set(&device->rs_sect_in, 0);
1691	atomic_set(&device->rs_sect_ev, 0);
1692	device->rs_in_flight = 0;
1693	device->rs_last_events =
1694		(int)part_stat_read(&disk->part0, sectors[0]) +
1695		(int)part_stat_read(&disk->part0, sectors[1]);
1696
1697	/* Updating the RCU protected object in place is necessary since
1698	   this function gets called from atomic context.
1699	   It is valid since all other updates also lead to an completely
1700	   empty fifo */
1701	rcu_read_lock();
1702	plan = rcu_dereference(device->rs_plan_s);
1703	plan->total = 0;
1704	fifo_set(plan, 0);
1705	rcu_read_unlock();
1706}
1707
1708void start_resync_timer_fn(struct timer_list *t)
1709{
1710	struct drbd_device *device = from_timer(device, t, start_resync_timer);
1711	drbd_device_post_work(device, RS_START);
1712}
1713
1714static void do_start_resync(struct drbd_device *device)
1715{
1716	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1717		drbd_warn(device, "postponing start_resync ...\n");
1718		device->start_resync_timer.expires = jiffies + HZ/10;
1719		add_timer(&device->start_resync_timer);
1720		return;
1721	}
1722
1723	drbd_start_resync(device, C_SYNC_SOURCE);
1724	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1725}
1726
1727static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1728{
1729	bool csums_after_crash_only;
1730	rcu_read_lock();
1731	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1732	rcu_read_unlock();
1733	return connection->agreed_pro_version >= 89 &&		/* supported? */
1734		connection->csums_tfm &&			/* configured? */
1735		(csums_after_crash_only == false		/* use for each resync? */
1736		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1737}
1738
1739/**
1740 * drbd_start_resync() - Start the resync process
1741 * @device:	DRBD device.
1742 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1743 *
1744 * This function might bring you directly into one of the
1745 * C_PAUSED_SYNC_* states.
1746 */
1747void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1748{
1749	struct drbd_peer_device *peer_device = first_peer_device(device);
1750	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1751	union drbd_state ns;
1752	int r;
1753
1754	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1755		drbd_err(device, "Resync already running!\n");
1756		return;
1757	}
1758
1759	if (!connection) {
1760		drbd_err(device, "No connection to peer, aborting!\n");
1761		return;
1762	}
1763
1764	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1765		if (side == C_SYNC_TARGET) {
1766			/* Since application IO was locked out during C_WF_BITMAP_T and
1767			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1768			   we check that we might make the data inconsistent. */
1769			r = drbd_khelper(device, "before-resync-target");
1770			r = (r >> 8) & 0xff;
1771			if (r > 0) {
1772				drbd_info(device, "before-resync-target handler returned %d, "
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1773					 "dropping connection.\n", r);
1774				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1775				return;
1776			}
1777		} else /* C_SYNC_SOURCE */ {
1778			r = drbd_khelper(device, "before-resync-source");
1779			r = (r >> 8) & 0xff;
1780			if (r > 0) {
1781				if (r == 3) {
1782					drbd_info(device, "before-resync-source handler returned %d, "
1783						 "ignoring. Old userland tools?", r);
1784				} else {
1785					drbd_info(device, "before-resync-source handler returned %d, "
1786						 "dropping connection.\n", r);
1787					conn_request_state(connection,
1788							   NS(conn, C_DISCONNECTING), CS_HARD);
1789					return;
1790				}
1791			}
1792		}
1793	}
1794
1795	if (current == connection->worker.task) {
1796		/* The worker should not sleep waiting for state_mutex,
1797		   that can take long */
1798		if (!mutex_trylock(device->state_mutex)) {
1799			set_bit(B_RS_H_DONE, &device->flags);
1800			device->start_resync_timer.expires = jiffies + HZ/5;
1801			add_timer(&device->start_resync_timer);
1802			return;
1803		}
1804	} else {
1805		mutex_lock(device->state_mutex);
1806	}
1807
1808	lock_all_resources();
1809	clear_bit(B_RS_H_DONE, &device->flags);
1810	/* Did some connection breakage or IO error race with us? */
1811	if (device->state.conn < C_CONNECTED
1812	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1813		unlock_all_resources();
1814		goto out;
1815	}
1816
1817	ns = drbd_read_state(device);
 
1818
1819	ns.aftr_isp = !_drbd_may_sync_now(device);
1820
1821	ns.conn = side;
1822
1823	if (side == C_SYNC_TARGET)
1824		ns.disk = D_INCONSISTENT;
1825	else /* side == C_SYNC_SOURCE */
1826		ns.pdsk = D_INCONSISTENT;
1827
1828	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1829	ns = drbd_read_state(device);
1830
1831	if (ns.conn < C_CONNECTED)
1832		r = SS_UNKNOWN_ERROR;
1833
1834	if (r == SS_SUCCESS) {
1835		unsigned long tw = drbd_bm_total_weight(device);
1836		unsigned long now = jiffies;
1837		int i;
1838
1839		device->rs_failed    = 0;
1840		device->rs_paused    = 0;
1841		device->rs_same_csum = 0;
1842		device->rs_last_sect_ev = 0;
1843		device->rs_total     = tw;
1844		device->rs_start     = now;
 
1845		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1846			device->rs_mark_left[i] = tw;
1847			device->rs_mark_time[i] = now;
1848		}
1849		drbd_pause_after(device);
1850		/* Forget potentially stale cached per resync extent bit-counts.
1851		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1852		 * disabled, and know the disk state is ok. */
1853		spin_lock(&device->al_lock);
1854		lc_reset(device->resync);
1855		device->resync_locked = 0;
1856		device->resync_wenr = LC_FREE;
1857		spin_unlock(&device->al_lock);
1858	}
1859	unlock_all_resources();
1860
1861	if (r == SS_SUCCESS) {
1862		wake_up(&device->al_wait); /* for lc_reset() above */
1863		/* reset rs_last_bcast when a resync or verify is started,
1864		 * to deal with potential jiffies wrap. */
1865		device->rs_last_bcast = jiffies - HZ;
1866
1867		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1868		     drbd_conn_str(ns.conn),
1869		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1870		     (unsigned long) device->rs_total);
1871		if (side == C_SYNC_TARGET) {
1872			device->bm_resync_fo = 0;
1873			device->use_csums = use_checksum_based_resync(connection, device);
1874		} else {
1875			device->use_csums = false;
1876		}
1877
1878		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1879		 * with w_send_oos, or the sync target will get confused as to
1880		 * how much bits to resync.  We cannot do that always, because for an
1881		 * empty resync and protocol < 95, we need to do it here, as we call
1882		 * drbd_resync_finished from here in that case.
1883		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1884		 * and from after_state_ch otherwise. */
1885		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1886			drbd_gen_and_send_sync_uuid(peer_device);
1887
1888		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1889			/* This still has a race (about when exactly the peers
1890			 * detect connection loss) that can lead to a full sync
1891			 * on next handshake. In 8.3.9 we fixed this with explicit
1892			 * resync-finished notifications, but the fix
1893			 * introduces a protocol change.  Sleeping for some
1894			 * time longer than the ping interval + timeout on the
1895			 * SyncSource, to give the SyncTarget the chance to
1896			 * detect connection loss, then waiting for a ping
1897			 * response (implicit in drbd_resync_finished) reduces
1898			 * the race considerably, but does not solve it. */
1899			if (side == C_SYNC_SOURCE) {
1900				struct net_conf *nc;
1901				int timeo;
1902
1903				rcu_read_lock();
1904				nc = rcu_dereference(connection->net_conf);
1905				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1906				rcu_read_unlock();
1907				schedule_timeout_interruptible(timeo);
1908			}
1909			drbd_resync_finished(device);
1910		}
1911
1912		drbd_rs_controller_reset(device);
1913		/* ns.conn may already be != device->state.conn,
1914		 * we may have been paused in between, or become paused until
1915		 * the timer triggers.
1916		 * No matter, that is handled in resync_timer_fn() */
1917		if (ns.conn == C_SYNC_TARGET)
1918			mod_timer(&device->resync_timer, jiffies);
1919
1920		drbd_md_sync(device);
1921	}
1922	put_ldev(device);
1923out:
1924	mutex_unlock(device->state_mutex);
1925}
1926
1927static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1928{
1929	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1930	device->rs_last_bcast = jiffies;
1931
1932	if (!get_ldev(device))
1933		return;
1934
1935	drbd_bm_write_lazy(device, 0);
1936	if (resync_done && is_sync_state(device->state.conn))
1937		drbd_resync_finished(device);
1938
1939	drbd_bcast_event(device, &sib);
1940	/* update timestamp, in case it took a while to write out stuff */
1941	device->rs_last_bcast = jiffies;
1942	put_ldev(device);
1943}
1944
1945static void drbd_ldev_destroy(struct drbd_device *device)
1946{
1947	lc_destroy(device->resync);
1948	device->resync = NULL;
1949	lc_destroy(device->act_log);
1950	device->act_log = NULL;
1951
1952	__acquire(local);
1953	drbd_backing_dev_free(device, device->ldev);
1954	device->ldev = NULL;
1955	__release(local);
1956
1957	clear_bit(GOING_DISKLESS, &device->flags);
1958	wake_up(&device->misc_wait);
1959}
1960
1961static void go_diskless(struct drbd_device *device)
1962{
1963	D_ASSERT(device, device->state.disk == D_FAILED);
1964	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1965	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1966	 * the protected members anymore, though, so once put_ldev reaches zero
1967	 * again, it will be safe to free them. */
1968
1969	/* Try to write changed bitmap pages, read errors may have just
1970	 * set some bits outside the area covered by the activity log.
1971	 *
1972	 * If we have an IO error during the bitmap writeout,
1973	 * we will want a full sync next time, just in case.
1974	 * (Do we want a specific meta data flag for this?)
1975	 *
1976	 * If that does not make it to stable storage either,
1977	 * we cannot do anything about that anymore.
1978	 *
1979	 * We still need to check if both bitmap and ldev are present, we may
1980	 * end up here after a failed attach, before ldev was even assigned.
1981	 */
1982	if (device->bitmap && device->ldev) {
1983		/* An interrupted resync or similar is allowed to recounts bits
1984		 * while we detach.
1985		 * Any modifications would not be expected anymore, though.
1986		 */
1987		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1988					"detach", BM_LOCKED_TEST_ALLOWED)) {
1989			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1990				drbd_md_set_flag(device, MDF_FULL_SYNC);
1991				drbd_md_sync(device);
1992			}
1993		}
1994	}
1995
1996	drbd_force_state(device, NS(disk, D_DISKLESS));
1997}
1998
1999static int do_md_sync(struct drbd_device *device)
2000{
2001	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2002	drbd_md_sync(device);
2003	return 0;
2004}
2005
2006/* only called from drbd_worker thread, no locking */
2007void __update_timing_details(
2008		struct drbd_thread_timing_details *tdp,
2009		unsigned int *cb_nr,
2010		void *cb,
2011		const char *fn, const unsigned int line)
2012{
2013	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2014	struct drbd_thread_timing_details *td = tdp + i;
2015
2016	td->start_jif = jiffies;
2017	td->cb_addr = cb;
2018	td->caller_fn = fn;
2019	td->line = line;
2020	td->cb_nr = *cb_nr;
2021
2022	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2023	td = tdp + i;
2024	memset(td, 0, sizeof(*td));
2025
2026	++(*cb_nr);
2027}
2028
2029static void do_device_work(struct drbd_device *device, const unsigned long todo)
2030{
2031	if (test_bit(MD_SYNC, &todo))
2032		do_md_sync(device);
2033	if (test_bit(RS_DONE, &todo) ||
2034	    test_bit(RS_PROGRESS, &todo))
2035		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2036	if (test_bit(GO_DISKLESS, &todo))
2037		go_diskless(device);
2038	if (test_bit(DESTROY_DISK, &todo))
2039		drbd_ldev_destroy(device);
2040	if (test_bit(RS_START, &todo))
2041		do_start_resync(device);
2042}
2043
2044#define DRBD_DEVICE_WORK_MASK	\
2045	((1UL << GO_DISKLESS)	\
2046	|(1UL << DESTROY_DISK)	\
2047	|(1UL << MD_SYNC)	\
2048	|(1UL << RS_START)	\
2049	|(1UL << RS_PROGRESS)	\
2050	|(1UL << RS_DONE)	\
2051	)
2052
2053static unsigned long get_work_bits(unsigned long *flags)
2054{
2055	unsigned long old, new;
2056	do {
2057		old = *flags;
2058		new = old & ~DRBD_DEVICE_WORK_MASK;
2059	} while (cmpxchg(flags, old, new) != old);
2060	return old & DRBD_DEVICE_WORK_MASK;
2061}
2062
2063static void do_unqueued_work(struct drbd_connection *connection)
2064{
2065	struct drbd_peer_device *peer_device;
2066	int vnr;
2067
2068	rcu_read_lock();
2069	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2070		struct drbd_device *device = peer_device->device;
2071		unsigned long todo = get_work_bits(&device->flags);
2072		if (!todo)
2073			continue;
2074
2075		kref_get(&device->kref);
2076		rcu_read_unlock();
2077		do_device_work(device, todo);
2078		kref_put(&device->kref, drbd_destroy_device);
2079		rcu_read_lock();
2080	}
2081	rcu_read_unlock();
2082}
2083
2084static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2085{
2086	spin_lock_irq(&queue->q_lock);
2087	list_splice_tail_init(&queue->q, work_list);
2088	spin_unlock_irq(&queue->q_lock);
2089	return !list_empty(work_list);
2090}
2091
2092static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2093{
2094	DEFINE_WAIT(wait);
2095	struct net_conf *nc;
2096	int uncork, cork;
2097
2098	dequeue_work_batch(&connection->sender_work, work_list);
2099	if (!list_empty(work_list))
2100		return;
2101
2102	/* Still nothing to do?
2103	 * Maybe we still need to close the current epoch,
2104	 * even if no new requests are queued yet.
2105	 *
2106	 * Also, poke TCP, just in case.
2107	 * Then wait for new work (or signal). */
2108	rcu_read_lock();
2109	nc = rcu_dereference(connection->net_conf);
2110	uncork = nc ? nc->tcp_cork : 0;
2111	rcu_read_unlock();
2112	if (uncork) {
2113		mutex_lock(&connection->data.mutex);
2114		if (connection->data.socket)
2115			drbd_tcp_uncork(connection->data.socket);
2116		mutex_unlock(&connection->data.mutex);
2117	}
2118
2119	for (;;) {
2120		int send_barrier;
2121		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2122		spin_lock_irq(&connection->resource->req_lock);
2123		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2124		if (!list_empty(&connection->sender_work.q))
2125			list_splice_tail_init(&connection->sender_work.q, work_list);
2126		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2127		if (!list_empty(work_list) || signal_pending(current)) {
2128			spin_unlock_irq(&connection->resource->req_lock);
2129			break;
2130		}
2131
2132		/* We found nothing new to do, no to-be-communicated request,
2133		 * no other work item.  We may still need to close the last
2134		 * epoch.  Next incoming request epoch will be connection ->
2135		 * current transfer log epoch number.  If that is different
2136		 * from the epoch of the last request we communicated, it is
2137		 * safe to send the epoch separating barrier now.
2138		 */
2139		send_barrier =
2140			atomic_read(&connection->current_tle_nr) !=
2141			connection->send.current_epoch_nr;
2142		spin_unlock_irq(&connection->resource->req_lock);
2143
2144		if (send_barrier)
2145			maybe_send_barrier(connection,
2146					connection->send.current_epoch_nr + 1);
2147
2148		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2149			break;
2150
2151		/* drbd_send() may have called flush_signals() */
2152		if (get_t_state(&connection->worker) != RUNNING)
2153			break;
2154
2155		schedule();
2156		/* may be woken up for other things but new work, too,
2157		 * e.g. if the current epoch got closed.
2158		 * In which case we send the barrier above. */
2159	}
2160	finish_wait(&connection->sender_work.q_wait, &wait);
2161
2162	/* someone may have changed the config while we have been waiting above. */
2163	rcu_read_lock();
2164	nc = rcu_dereference(connection->net_conf);
2165	cork = nc ? nc->tcp_cork : 0;
2166	rcu_read_unlock();
2167	mutex_lock(&connection->data.mutex);
2168	if (connection->data.socket) {
2169		if (cork)
2170			drbd_tcp_cork(connection->data.socket);
2171		else if (!uncork)
2172			drbd_tcp_uncork(connection->data.socket);
2173	}
2174	mutex_unlock(&connection->data.mutex);
2175}
2176
2177int drbd_worker(struct drbd_thread *thi)
2178{
2179	struct drbd_connection *connection = thi->connection;
2180	struct drbd_work *w = NULL;
2181	struct drbd_peer_device *peer_device;
2182	LIST_HEAD(work_list);
2183	int vnr;
2184
2185	while (get_t_state(thi) == RUNNING) {
2186		drbd_thread_current_set_cpu(thi);
2187
2188		if (list_empty(&work_list)) {
2189			update_worker_timing_details(connection, wait_for_work);
2190			wait_for_work(connection, &work_list);
2191		}
 
 
 
 
 
 
2192
2193		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2194			update_worker_timing_details(connection, do_unqueued_work);
2195			do_unqueued_work(connection);
 
2196		}
2197
2198		if (signal_pending(current)) {
 
2199			flush_signals(current);
2200			if (get_t_state(thi) == RUNNING) {
2201				drbd_warn(connection, "Worker got an unexpected signal\n");
2202				continue;
2203			}
2204			break;
2205		}
2206
2207		if (get_t_state(thi) != RUNNING)
2208			break;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2209
2210		if (!list_empty(&work_list)) {
2211			w = list_first_entry(&work_list, struct drbd_work, list);
2212			list_del_init(&w->list);
2213			update_worker_timing_details(connection, w->cb);
2214			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2215				continue;
2216			if (connection->cstate >= C_WF_REPORT_PARAMS)
2217				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2218		}
 
 
2219	}
 
 
 
 
 
 
 
2220
2221	do {
2222		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2223			update_worker_timing_details(connection, do_unqueued_work);
2224			do_unqueued_work(connection);
2225		}
2226		if (!list_empty(&work_list)) {
2227			w = list_first_entry(&work_list, struct drbd_work, list);
2228			list_del_init(&w->list);
2229			update_worker_timing_details(connection, w->cb);
2230			w->cb(w, 1);
2231		} else
2232			dequeue_work_batch(&connection->sender_work, &work_list);
2233	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2234
2235	rcu_read_lock();
2236	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2237		struct drbd_device *device = peer_device->device;
2238		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2239		kref_get(&device->kref);
2240		rcu_read_unlock();
2241		drbd_device_cleanup(device);
2242		kref_put(&device->kref, drbd_destroy_device);
2243		rcu_read_lock();
2244	}
2245	rcu_read_unlock();
2246
2247	return 0;
2248}