Linux Audio

Check our new training course

Loading...
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
   4 * Copyright 2004-2011 Red Hat, Inc.
   5 */
   6
   7#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
   8
   9#include <linux/fs.h>
  10#include <linux/dlm.h>
  11#include <linux/slab.h>
  12#include <linux/types.h>
  13#include <linux/delay.h>
  14#include <linux/gfs2_ondisk.h>
  15#include <linux/sched/signal.h>
  16
  17#include "incore.h"
  18#include "glock.h"
  19#include "glops.h"
  20#include "recovery.h"
  21#include "util.h"
  22#include "sys.h"
  23#include "trace_gfs2.h"
  24
  25/**
  26 * gfs2_update_stats - Update time based stats
  27 * @s: The stats to update (local or global)
  28 * @index: The index inside @s
  29 * @sample: New data to include
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  30 */
 
  31static inline void gfs2_update_stats(struct gfs2_lkstats *s, unsigned index,
  32				     s64 sample)
  33{
  34	/*
  35	 * @delta is the difference between the current rtt sample and the
  36	 * running average srtt. We add 1/8 of that to the srtt in order to
  37	 * update the current srtt estimate. The variance estimate is a bit
  38	 * more complicated. We subtract the current variance estimate from
  39	 * the abs value of the @delta and add 1/4 of that to the running
  40	 * total.  That's equivalent to 3/4 of the current variance
  41	 * estimate plus 1/4 of the abs of @delta.
  42	 *
  43	 * Note that the index points at the array entry containing the
  44	 * smoothed mean value, and the variance is always in the following
  45	 * entry
  46	 *
  47	 * Reference: TCP/IP Illustrated, vol 2, p. 831,832
  48	 * All times are in units of integer nanoseconds. Unlike the TCP/IP
  49	 * case, they are not scaled fixed point.
  50	 */
  51
  52	s64 delta = sample - s->stats[index];
  53	s->stats[index] += (delta >> 3);
  54	index++;
  55	s->stats[index] += (s64)(abs(delta) - s->stats[index]) >> 2;
  56}
  57
  58/**
  59 * gfs2_update_reply_times - Update locking statistics
  60 * @gl: The glock to update
  61 *
  62 * This assumes that gl->gl_dstamp has been set earlier.
  63 *
  64 * The rtt (lock round trip time) is an estimate of the time
  65 * taken to perform a dlm lock request. We update it on each
  66 * reply from the dlm.
  67 *
  68 * The blocking flag is set on the glock for all dlm requests
  69 * which may potentially block due to lock requests from other nodes.
  70 * DLM requests where the current lock state is exclusive, the
  71 * requested state is null (or unlocked) or where the TRY or
  72 * TRY_1CB flags are set are classified as non-blocking. All
  73 * other DLM requests are counted as (potentially) blocking.
  74 */
  75static inline void gfs2_update_reply_times(struct gfs2_glock *gl)
  76{
  77	struct gfs2_pcpu_lkstats *lks;
  78	const unsigned gltype = gl->gl_name.ln_type;
  79	unsigned index = test_bit(GLF_BLOCKING, &gl->gl_flags) ?
  80			 GFS2_LKS_SRTTB : GFS2_LKS_SRTT;
  81	s64 rtt;
  82
  83	preempt_disable();
  84	rtt = ktime_to_ns(ktime_sub(ktime_get_real(), gl->gl_dstamp));
  85	lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
  86	gfs2_update_stats(&gl->gl_stats, index, rtt);		/* Local */
  87	gfs2_update_stats(&lks->lkstats[gltype], index, rtt);	/* Global */
  88	preempt_enable();
  89
  90	trace_gfs2_glock_lock_time(gl, rtt);
  91}
  92
  93/**
  94 * gfs2_update_request_times - Update locking statistics
  95 * @gl: The glock to update
  96 *
  97 * The irt (lock inter-request times) measures the average time
  98 * between requests to the dlm. It is updated immediately before
  99 * each dlm call.
 100 */
 101
 102static inline void gfs2_update_request_times(struct gfs2_glock *gl)
 103{
 104	struct gfs2_pcpu_lkstats *lks;
 105	const unsigned gltype = gl->gl_name.ln_type;
 106	ktime_t dstamp;
 107	s64 irt;
 108
 109	preempt_disable();
 110	dstamp = gl->gl_dstamp;
 111	gl->gl_dstamp = ktime_get_real();
 112	irt = ktime_to_ns(ktime_sub(gl->gl_dstamp, dstamp));
 113	lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
 114	gfs2_update_stats(&gl->gl_stats, GFS2_LKS_SIRT, irt);		/* Local */
 115	gfs2_update_stats(&lks->lkstats[gltype], GFS2_LKS_SIRT, irt);	/* Global */
 116	preempt_enable();
 117}
 118 
 119static void gdlm_ast(void *arg)
 120{
 121	struct gfs2_glock *gl = arg;
 122	unsigned ret = gl->gl_state;
 123
 124	/* If the glock is dead, we only react to a dlm_unlock() reply. */
 125	if (__lockref_is_dead(&gl->gl_lockref) &&
 126	    gl->gl_lksb.sb_status != -DLM_EUNLOCK)
 127		return;
 128
 129	gfs2_update_reply_times(gl);
 130	BUG_ON(gl->gl_lksb.sb_flags & DLM_SBF_DEMOTED);
 131
 132	if ((gl->gl_lksb.sb_flags & DLM_SBF_VALNOTVALID) && gl->gl_lksb.sb_lvbptr)
 133		memset(gl->gl_lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
 134
 135	switch (gl->gl_lksb.sb_status) {
 136	case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
 137		if (gl->gl_ops->go_free)
 138			gl->gl_ops->go_free(gl);
 139		gfs2_glock_free(gl);
 140		return;
 141	case -DLM_ECANCEL: /* Cancel while getting lock */
 142		ret |= LM_OUT_CANCELED;
 143		goto out;
 144	case -EAGAIN: /* Try lock fails */
 145	case -EDEADLK: /* Deadlock detected */
 146		goto out;
 147	case -ETIMEDOUT: /* Canceled due to timeout */
 148		ret |= LM_OUT_ERROR;
 149		goto out;
 150	case 0: /* Success */
 151		break;
 152	default: /* Something unexpected */
 153		BUG();
 154	}
 155
 156	ret = gl->gl_req;
 157	if (gl->gl_lksb.sb_flags & DLM_SBF_ALTMODE) {
 158		if (gl->gl_req == LM_ST_SHARED)
 159			ret = LM_ST_DEFERRED;
 160		else if (gl->gl_req == LM_ST_DEFERRED)
 161			ret = LM_ST_SHARED;
 162		else
 163			BUG();
 164	}
 165
 166	set_bit(GLF_INITIAL, &gl->gl_flags);
 167	gfs2_glock_complete(gl, ret);
 168	return;
 169out:
 170	if (!test_bit(GLF_INITIAL, &gl->gl_flags))
 171		gl->gl_lksb.sb_lkid = 0;
 172	gfs2_glock_complete(gl, ret);
 173}
 174
 175static void gdlm_bast(void *arg, int mode)
 176{
 177	struct gfs2_glock *gl = arg;
 178
 179	if (__lockref_is_dead(&gl->gl_lockref))
 180		return;
 181
 182	switch (mode) {
 183	case DLM_LOCK_EX:
 184		gfs2_glock_cb(gl, LM_ST_UNLOCKED);
 185		break;
 186	case DLM_LOCK_CW:
 187		gfs2_glock_cb(gl, LM_ST_DEFERRED);
 188		break;
 189	case DLM_LOCK_PR:
 190		gfs2_glock_cb(gl, LM_ST_SHARED);
 191		break;
 192	default:
 193		fs_err(gl->gl_name.ln_sbd, "unknown bast mode %d\n", mode);
 194		BUG();
 195	}
 196}
 197
 198/* convert gfs lock-state to dlm lock-mode */
 199
 200static int make_mode(struct gfs2_sbd *sdp, const unsigned int lmstate)
 201{
 202	switch (lmstate) {
 203	case LM_ST_UNLOCKED:
 204		return DLM_LOCK_NL;
 205	case LM_ST_EXCLUSIVE:
 206		return DLM_LOCK_EX;
 207	case LM_ST_DEFERRED:
 208		return DLM_LOCK_CW;
 209	case LM_ST_SHARED:
 210		return DLM_LOCK_PR;
 211	}
 212	fs_err(sdp, "unknown LM state %d\n", lmstate);
 213	BUG();
 214	return -1;
 215}
 216
 217static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
 218		      const int req)
 219{
 220	u32 lkf = 0;
 221
 222	if (gl->gl_lksb.sb_lvbptr)
 223		lkf |= DLM_LKF_VALBLK;
 224
 225	if (gfs_flags & LM_FLAG_TRY)
 226		lkf |= DLM_LKF_NOQUEUE;
 227
 228	if (gfs_flags & LM_FLAG_TRY_1CB) {
 229		lkf |= DLM_LKF_NOQUEUE;
 230		lkf |= DLM_LKF_NOQUEUEBAST;
 231	}
 232
 
 
 
 
 
 233	if (gfs_flags & LM_FLAG_ANY) {
 234		if (req == DLM_LOCK_PR)
 235			lkf |= DLM_LKF_ALTCW;
 236		else if (req == DLM_LOCK_CW)
 237			lkf |= DLM_LKF_ALTPR;
 238		else
 239			BUG();
 240	}
 241
 242	if (gl->gl_lksb.sb_lkid != 0) {
 243		lkf |= DLM_LKF_CONVERT;
 244		if (test_bit(GLF_BLOCKING, &gl->gl_flags))
 245			lkf |= DLM_LKF_QUECVT;
 246	}
 247
 248	return lkf;
 249}
 250
 251static void gfs2_reverse_hex(char *c, u64 value)
 252{
 253	*c = '0';
 254	while (value) {
 255		*c-- = hex_asc[value & 0x0f];
 256		value >>= 4;
 257	}
 258}
 259
 260static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
 261		     unsigned int flags)
 262{
 263	struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
 264	int req;
 265	u32 lkf;
 266	char strname[GDLM_STRNAME_BYTES] = "";
 267	int error;
 268
 269	req = make_mode(gl->gl_name.ln_sbd, req_state);
 270	lkf = make_flags(gl, flags, req);
 271	gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
 272	gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
 273	if (gl->gl_lksb.sb_lkid) {
 274		gfs2_update_request_times(gl);
 275	} else {
 276		memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
 277		strname[GDLM_STRNAME_BYTES - 1] = '\0';
 278		gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
 279		gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
 280		gl->gl_dstamp = ktime_get_real();
 281	}
 282	/*
 283	 * Submit the actual lock request.
 284	 */
 285
 286again:
 287	error = dlm_lock(ls->ls_dlm, req, &gl->gl_lksb, lkf, strname,
 288			GDLM_STRNAME_BYTES - 1, 0, gdlm_ast, gl, gdlm_bast);
 289	if (error == -EBUSY) {
 290		msleep(20);
 291		goto again;
 292	}
 293	return error;
 294}
 295
 296static void gdlm_put_lock(struct gfs2_glock *gl)
 297{
 298	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 299	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 
 300	int error;
 301
 302	BUG_ON(!__lockref_is_dead(&gl->gl_lockref));
 303
 304	if (gl->gl_lksb.sb_lkid == 0) {
 305		gfs2_glock_free(gl);
 306		return;
 307	}
 308
 309	clear_bit(GLF_BLOCKING, &gl->gl_flags);
 310	gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
 311	gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
 312	gfs2_update_request_times(gl);
 313
 314	/* don't want to call dlm if we've unmounted the lock protocol */
 315	if (test_bit(DFL_UNMOUNT, &ls->ls_recover_flags)) {
 316		gfs2_glock_free(gl);
 317		return;
 318	}
 319	/* don't want to skip dlm_unlock writing the lvb when lock has one */
 320
 321	if (test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags) &&
 322	    !gl->gl_lksb.sb_lvbptr) {
 323		gfs2_glock_free_later(gl);
 324		return;
 325	}
 326
 327again:
 328	error = dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_VALBLK,
 329			   NULL, gl);
 330	if (error == -EBUSY) {
 331		msleep(20);
 332		goto again;
 333	}
 334
 335	if (error) {
 336		fs_err(sdp, "gdlm_unlock %x,%llx err=%d\n",
 337		       gl->gl_name.ln_type,
 338		       (unsigned long long)gl->gl_name.ln_number, error);
 
 339	}
 340}
 341
 342static void gdlm_cancel(struct gfs2_glock *gl)
 343{
 344	struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
 345	dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl);
 346}
 347
 348/*
 349 * dlm/gfs2 recovery coordination using dlm_recover callbacks
 350 *
 351 *  0. gfs2 checks for another cluster node withdraw, needing journal replay
 352 *  1. dlm_controld sees lockspace members change
 353 *  2. dlm_controld blocks dlm-kernel locking activity
 354 *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
 355 *  4. dlm_controld starts and finishes its own user level recovery
 356 *  5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery
 357 *  6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot)
 358 *  7. dlm_recoverd does its own lock recovery
 359 *  8. dlm_recoverd unblocks dlm-kernel locking activity
 360 *  9. dlm_recoverd notifies gfs2 when done (recover_done with new generation)
 361 * 10. gfs2_control updates control_lock lvb with new generation and jid bits
 362 * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none)
 363 * 12. gfs2_recover dequeues and recovers journals of failed nodes
 364 * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result)
 365 * 14. gfs2_control updates control_lock lvb jid bits for recovered journals
 366 * 15. gfs2_control unblocks normal locking when all journals are recovered
 367 *
 368 * - failures during recovery
 369 *
 370 * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control
 371 * clears BLOCK_LOCKS (step 15), e.g. another node fails while still
 372 * recovering for a prior failure.  gfs2_control needs a way to detect
 373 * this so it can leave BLOCK_LOCKS set in step 15.  This is managed using
 374 * the recover_block and recover_start values.
 375 *
 376 * recover_done() provides a new lockspace generation number each time it
 377 * is called (step 9).  This generation number is saved as recover_start.
 378 * When recover_prep() is called, it sets BLOCK_LOCKS and sets
 379 * recover_block = recover_start.  So, while recover_block is equal to
 380 * recover_start, BLOCK_LOCKS should remain set.  (recover_spin must
 381 * be held around the BLOCK_LOCKS/recover_block/recover_start logic.)
 382 *
 383 * - more specific gfs2 steps in sequence above
 384 *
 385 *  3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start
 386 *  6. recover_slot records any failed jids (maybe none)
 387 *  9. recover_done sets recover_start = new generation number
 388 * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids
 389 * 12. gfs2_recover does journal recoveries for failed jids identified above
 390 * 14. gfs2_control clears control_lock lvb bits for recovered jids
 391 * 15. gfs2_control checks if recover_block == recover_start (step 3 occured
 392 *     again) then do nothing, otherwise if recover_start > recover_block
 393 *     then clear BLOCK_LOCKS.
 394 *
 395 * - parallel recovery steps across all nodes
 396 *
 397 * All nodes attempt to update the control_lock lvb with the new generation
 398 * number and jid bits, but only the first to get the control_lock EX will
 399 * do so; others will see that it's already done (lvb already contains new
 400 * generation number.)
 401 *
 402 * . All nodes get the same recover_prep/recover_slot/recover_done callbacks
 403 * . All nodes attempt to set control_lock lvb gen + bits for the new gen
 404 * . One node gets control_lock first and writes the lvb, others see it's done
 405 * . All nodes attempt to recover jids for which they see control_lock bits set
 406 * . One node succeeds for a jid, and that one clears the jid bit in the lvb
 407 * . All nodes will eventually see all lvb bits clear and unblock locks
 408 *
 409 * - is there a problem with clearing an lvb bit that should be set
 410 *   and missing a journal recovery?
 411 *
 412 * 1. jid fails
 413 * 2. lvb bit set for step 1
 414 * 3. jid recovered for step 1
 415 * 4. jid taken again (new mount)
 416 * 5. jid fails (for step 4)
 417 * 6. lvb bit set for step 5 (will already be set)
 418 * 7. lvb bit cleared for step 3
 419 *
 420 * This is not a problem because the failure in step 5 does not
 421 * require recovery, because the mount in step 4 could not have
 422 * progressed far enough to unblock locks and access the fs.  The
 423 * control_mount() function waits for all recoveries to be complete
 424 * for the latest lockspace generation before ever unblocking locks
 425 * and returning.  The mount in step 4 waits until the recovery in
 426 * step 1 is done.
 427 *
 428 * - special case of first mounter: first node to mount the fs
 429 *
 430 * The first node to mount a gfs2 fs needs to check all the journals
 431 * and recover any that need recovery before other nodes are allowed
 432 * to mount the fs.  (Others may begin mounting, but they must wait
 433 * for the first mounter to be done before taking locks on the fs
 434 * or accessing the fs.)  This has two parts:
 435 *
 436 * 1. The mounted_lock tells a node it's the first to mount the fs.
 437 * Each node holds the mounted_lock in PR while it's mounted.
 438 * Each node tries to acquire the mounted_lock in EX when it mounts.
 439 * If a node is granted the mounted_lock EX it means there are no
 440 * other mounted nodes (no PR locks exist), and it is the first mounter.
 441 * The mounted_lock is demoted to PR when first recovery is done, so
 442 * others will fail to get an EX lock, but will get a PR lock.
 443 *
 444 * 2. The control_lock blocks others in control_mount() while the first
 445 * mounter is doing first mount recovery of all journals.
 446 * A mounting node needs to acquire control_lock in EX mode before
 447 * it can proceed.  The first mounter holds control_lock in EX while doing
 448 * the first mount recovery, blocking mounts from other nodes, then demotes
 449 * control_lock to NL when it's done (others_may_mount/first_done),
 450 * allowing other nodes to continue mounting.
 451 *
 452 * first mounter:
 453 * control_lock EX/NOQUEUE success
 454 * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters)
 455 * set first=1
 456 * do first mounter recovery
 457 * mounted_lock EX->PR
 458 * control_lock EX->NL, write lvb generation
 459 *
 460 * other mounter:
 461 * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry)
 462 * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR)
 463 * mounted_lock PR/NOQUEUE success
 464 * read lvb generation
 465 * control_lock EX->NL
 466 * set first=0
 467 *
 468 * - mount during recovery
 469 *
 470 * If a node mounts while others are doing recovery (not first mounter),
 471 * the mounting node will get its initial recover_done() callback without
 472 * having seen any previous failures/callbacks.
 473 *
 474 * It must wait for all recoveries preceding its mount to be finished
 475 * before it unblocks locks.  It does this by repeating the "other mounter"
 476 * steps above until the lvb generation number is >= its mount generation
 477 * number (from initial recover_done) and all lvb bits are clear.
 478 *
 479 * - control_lock lvb format
 480 *
 481 * 4 bytes generation number: the latest dlm lockspace generation number
 482 * from recover_done callback.  Indicates the jid bitmap has been updated
 483 * to reflect all slot failures through that generation.
 484 * 4 bytes unused.
 485 * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates
 486 * that jid N needs recovery.
 487 */
 488
 489#define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */
 490
 491static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen,
 492			     char *lvb_bits)
 493{
 494	__le32 gen;
 495	memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE);
 496	memcpy(&gen, lvb_bits, sizeof(__le32));
 497	*lvb_gen = le32_to_cpu(gen);
 498}
 499
 500static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen,
 501			      char *lvb_bits)
 502{
 503	__le32 gen;
 504	memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE);
 505	gen = cpu_to_le32(lvb_gen);
 506	memcpy(ls->ls_control_lvb, &gen, sizeof(__le32));
 507}
 508
 509static int all_jid_bits_clear(char *lvb)
 510{
 511	return !memchr_inv(lvb + JID_BITMAP_OFFSET, 0,
 512			GDLM_LVB_SIZE - JID_BITMAP_OFFSET);
 513}
 514
 515static void sync_wait_cb(void *arg)
 516{
 517	struct lm_lockstruct *ls = arg;
 518	complete(&ls->ls_sync_wait);
 519}
 520
 521static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name)
 522{
 523	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 524	int error;
 525
 526	error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
 527	if (error) {
 528		fs_err(sdp, "%s lkid %x error %d\n",
 529		       name, lksb->sb_lkid, error);
 530		return error;
 531	}
 532
 533	wait_for_completion(&ls->ls_sync_wait);
 534
 535	if (lksb->sb_status != -DLM_EUNLOCK) {
 536		fs_err(sdp, "%s lkid %x status %d\n",
 537		       name, lksb->sb_lkid, lksb->sb_status);
 538		return -1;
 539	}
 540	return 0;
 541}
 542
 543static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags,
 544		     unsigned int num, struct dlm_lksb *lksb, char *name)
 545{
 546	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 547	char strname[GDLM_STRNAME_BYTES];
 548	int error, status;
 549
 550	memset(strname, 0, GDLM_STRNAME_BYTES);
 551	snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
 552
 553	error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
 554			 strname, GDLM_STRNAME_BYTES - 1,
 555			 0, sync_wait_cb, ls, NULL);
 556	if (error) {
 557		fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
 558		       name, lksb->sb_lkid, flags, mode, error);
 559		return error;
 560	}
 561
 562	wait_for_completion(&ls->ls_sync_wait);
 563
 564	status = lksb->sb_status;
 565
 566	if (status && status != -EAGAIN) {
 567		fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n",
 568		       name, lksb->sb_lkid, flags, mode, status);
 569	}
 570
 571	return status;
 572}
 573
 574static int mounted_unlock(struct gfs2_sbd *sdp)
 575{
 576	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 577	return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock");
 578}
 579
 580static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
 581{
 582	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 583	return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK,
 584			 &ls->ls_mounted_lksb, "mounted_lock");
 585}
 586
 587static int control_unlock(struct gfs2_sbd *sdp)
 588{
 589	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 590	return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock");
 591}
 592
 593static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
 594{
 595	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 596	return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK,
 597			 &ls->ls_control_lksb, "control_lock");
 598}
 599
 600/**
 601 * remote_withdraw - react to a node withdrawing from the file system
 602 * @sdp: The superblock
 603 */
 604static void remote_withdraw(struct gfs2_sbd *sdp)
 605{
 606	struct gfs2_jdesc *jd;
 607	int ret = 0, count = 0;
 608
 609	list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
 610		if (jd->jd_jid == sdp->sd_lockstruct.ls_jid)
 611			continue;
 612		ret = gfs2_recover_journal(jd, true);
 613		if (ret)
 614			break;
 615		count++;
 616	}
 617
 618	/* Now drop the additional reference we acquired */
 619	fs_err(sdp, "Journals checked: %d, ret = %d.\n", count, ret);
 620}
 621
 622static void gfs2_control_func(struct work_struct *work)
 623{
 624	struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
 625	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 626	uint32_t block_gen, start_gen, lvb_gen, flags;
 627	int recover_set = 0;
 628	int write_lvb = 0;
 629	int recover_size;
 630	int i, error;
 631
 632	/* First check for other nodes that may have done a withdraw. */
 633	if (test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
 634		remote_withdraw(sdp);
 635		clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
 636		return;
 637	}
 638
 639	spin_lock(&ls->ls_recover_spin);
 640	/*
 641	 * No MOUNT_DONE means we're still mounting; control_mount()
 642	 * will set this flag, after which this thread will take over
 643	 * all further clearing of BLOCK_LOCKS.
 644	 *
 645	 * FIRST_MOUNT means this node is doing first mounter recovery,
 646	 * for which recovery control is handled by
 647	 * control_mount()/control_first_done(), not this thread.
 648	 */
 649	if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
 650	     test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
 651		spin_unlock(&ls->ls_recover_spin);
 652		return;
 653	}
 654	block_gen = ls->ls_recover_block;
 655	start_gen = ls->ls_recover_start;
 656	spin_unlock(&ls->ls_recover_spin);
 657
 658	/*
 659	 * Equal block_gen and start_gen implies we are between
 660	 * recover_prep and recover_done callbacks, which means
 661	 * dlm recovery is in progress and dlm locking is blocked.
 662	 * There's no point trying to do any work until recover_done.
 663	 */
 664
 665	if (block_gen == start_gen)
 666		return;
 667
 668	/*
 669	 * Propagate recover_submit[] and recover_result[] to lvb:
 670	 * dlm_recoverd adds to recover_submit[] jids needing recovery
 671	 * gfs2_recover adds to recover_result[] journal recovery results
 672	 *
 673	 * set lvb bit for jids in recover_submit[] if the lvb has not
 674	 * yet been updated for the generation of the failure
 675	 *
 676	 * clear lvb bit for jids in recover_result[] if the result of
 677	 * the journal recovery is SUCCESS
 678	 */
 679
 680	error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
 681	if (error) {
 682		fs_err(sdp, "control lock EX error %d\n", error);
 683		return;
 684	}
 685
 686	control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
 687
 688	spin_lock(&ls->ls_recover_spin);
 689	if (block_gen != ls->ls_recover_block ||
 690	    start_gen != ls->ls_recover_start) {
 691		fs_info(sdp, "recover generation %u block1 %u %u\n",
 692			start_gen, block_gen, ls->ls_recover_block);
 693		spin_unlock(&ls->ls_recover_spin);
 694		control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 695		return;
 696	}
 697
 698	recover_size = ls->ls_recover_size;
 699
 700	if (lvb_gen <= start_gen) {
 701		/*
 702		 * Clear lvb bits for jids we've successfully recovered.
 703		 * Because all nodes attempt to recover failed journals,
 704		 * a journal can be recovered multiple times successfully
 705		 * in succession.  Only the first will really do recovery,
 706		 * the others find it clean, but still report a successful
 707		 * recovery.  So, another node may have already recovered
 708		 * the jid and cleared the lvb bit for it.
 709		 */
 710		for (i = 0; i < recover_size; i++) {
 711			if (ls->ls_recover_result[i] != LM_RD_SUCCESS)
 712				continue;
 713
 714			ls->ls_recover_result[i] = 0;
 715
 716			if (!test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET))
 717				continue;
 718
 719			__clear_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
 720			write_lvb = 1;
 721		}
 722	}
 723
 724	if (lvb_gen == start_gen) {
 725		/*
 726		 * Failed slots before start_gen are already set in lvb.
 727		 */
 728		for (i = 0; i < recover_size; i++) {
 729			if (!ls->ls_recover_submit[i])
 730				continue;
 731			if (ls->ls_recover_submit[i] < lvb_gen)
 732				ls->ls_recover_submit[i] = 0;
 733		}
 734	} else if (lvb_gen < start_gen) {
 735		/*
 736		 * Failed slots before start_gen are not yet set in lvb.
 737		 */
 738		for (i = 0; i < recover_size; i++) {
 739			if (!ls->ls_recover_submit[i])
 740				continue;
 741			if (ls->ls_recover_submit[i] < start_gen) {
 742				ls->ls_recover_submit[i] = 0;
 743				__set_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
 744			}
 745		}
 746		/* even if there are no bits to set, we need to write the
 747		   latest generation to the lvb */
 748		write_lvb = 1;
 749	} else {
 750		/*
 751		 * we should be getting a recover_done() for lvb_gen soon
 752		 */
 753	}
 754	spin_unlock(&ls->ls_recover_spin);
 755
 756	if (write_lvb) {
 757		control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
 758		flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK;
 759	} else {
 760		flags = DLM_LKF_CONVERT;
 761	}
 762
 763	error = control_lock(sdp, DLM_LOCK_NL, flags);
 764	if (error) {
 765		fs_err(sdp, "control lock NL error %d\n", error);
 766		return;
 767	}
 768
 769	/*
 770	 * Everyone will see jid bits set in the lvb, run gfs2_recover_set(),
 771	 * and clear a jid bit in the lvb if the recovery is a success.
 772	 * Eventually all journals will be recovered, all jid bits will
 773	 * be cleared in the lvb, and everyone will clear BLOCK_LOCKS.
 774	 */
 775
 776	for (i = 0; i < recover_size; i++) {
 777		if (test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET)) {
 778			fs_info(sdp, "recover generation %u jid %d\n",
 779				start_gen, i);
 780			gfs2_recover_set(sdp, i);
 781			recover_set++;
 782		}
 783	}
 784	if (recover_set)
 785		return;
 786
 787	/*
 788	 * No more jid bits set in lvb, all recovery is done, unblock locks
 789	 * (unless a new recover_prep callback has occured blocking locks
 790	 * again while working above)
 791	 */
 792
 793	spin_lock(&ls->ls_recover_spin);
 794	if (ls->ls_recover_block == block_gen &&
 795	    ls->ls_recover_start == start_gen) {
 796		clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 797		spin_unlock(&ls->ls_recover_spin);
 798		fs_info(sdp, "recover generation %u done\n", start_gen);
 799		gfs2_glock_thaw(sdp);
 800	} else {
 801		fs_info(sdp, "recover generation %u block2 %u %u\n",
 802			start_gen, block_gen, ls->ls_recover_block);
 803		spin_unlock(&ls->ls_recover_spin);
 804	}
 805}
 806
 807static int control_mount(struct gfs2_sbd *sdp)
 808{
 809	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 810	uint32_t start_gen, block_gen, mount_gen, lvb_gen;
 811	int mounted_mode;
 812	int retries = 0;
 813	int error;
 814
 815	memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb));
 816	memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb));
 817	memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE);
 818	ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb;
 819	init_completion(&ls->ls_sync_wait);
 820
 821	set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 822
 823	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK);
 824	if (error) {
 825		fs_err(sdp, "control_mount control_lock NL error %d\n", error);
 826		return error;
 827	}
 828
 829	error = mounted_lock(sdp, DLM_LOCK_NL, 0);
 830	if (error) {
 831		fs_err(sdp, "control_mount mounted_lock NL error %d\n", error);
 832		control_unlock(sdp);
 833		return error;
 834	}
 835	mounted_mode = DLM_LOCK_NL;
 836
 837restart:
 838	if (retries++ && signal_pending(current)) {
 839		error = -EINTR;
 840		goto fail;
 841	}
 842
 843	/*
 844	 * We always start with both locks in NL. control_lock is
 845	 * demoted to NL below so we don't need to do it here.
 846	 */
 847
 848	if (mounted_mode != DLM_LOCK_NL) {
 849		error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 850		if (error)
 851			goto fail;
 852		mounted_mode = DLM_LOCK_NL;
 853	}
 854
 855	/*
 856	 * Other nodes need to do some work in dlm recovery and gfs2_control
 857	 * before the recover_done and control_lock will be ready for us below.
 858	 * A delay here is not required but often avoids having to retry.
 859	 */
 860
 861	msleep_interruptible(500);
 862
 863	/*
 864	 * Acquire control_lock in EX and mounted_lock in either EX or PR.
 865	 * control_lock lvb keeps track of any pending journal recoveries.
 866	 * mounted_lock indicates if any other nodes have the fs mounted.
 867	 */
 868
 869	error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK);
 870	if (error == -EAGAIN) {
 871		goto restart;
 872	} else if (error) {
 873		fs_err(sdp, "control_mount control_lock EX error %d\n", error);
 874		goto fail;
 875	}
 876
 877	/**
 878	 * If we're a spectator, we don't want to take the lock in EX because
 879	 * we cannot do the first-mount responsibility it implies: recovery.
 880	 */
 881	if (sdp->sd_args.ar_spectator)
 882		goto locks_done;
 883
 884	error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
 885	if (!error) {
 886		mounted_mode = DLM_LOCK_EX;
 887		goto locks_done;
 888	} else if (error != -EAGAIN) {
 889		fs_err(sdp, "control_mount mounted_lock EX error %d\n", error);
 890		goto fail;
 891	}
 892
 893	error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
 894	if (!error) {
 895		mounted_mode = DLM_LOCK_PR;
 896		goto locks_done;
 897	} else {
 898		/* not even -EAGAIN should happen here */
 899		fs_err(sdp, "control_mount mounted_lock PR error %d\n", error);
 900		goto fail;
 901	}
 902
 903locks_done:
 904	/*
 905	 * If we got both locks above in EX, then we're the first mounter.
 906	 * If not, then we need to wait for the control_lock lvb to be
 907	 * updated by other mounted nodes to reflect our mount generation.
 908	 *
 909	 * In simple first mounter cases, first mounter will see zero lvb_gen,
 910	 * but in cases where all existing nodes leave/fail before mounting
 911	 * nodes finish control_mount, then all nodes will be mounting and
 912	 * lvb_gen will be non-zero.
 913	 */
 914
 915	control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
 916
 917	if (lvb_gen == 0xFFFFFFFF) {
 918		/* special value to force mount attempts to fail */
 919		fs_err(sdp, "control_mount control_lock disabled\n");
 920		error = -EINVAL;
 921		goto fail;
 922	}
 923
 924	if (mounted_mode == DLM_LOCK_EX) {
 925		/* first mounter, keep both EX while doing first recovery */
 926		spin_lock(&ls->ls_recover_spin);
 927		clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 928		set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
 929		set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
 930		spin_unlock(&ls->ls_recover_spin);
 931		fs_info(sdp, "first mounter control generation %u\n", lvb_gen);
 932		return 0;
 933	}
 934
 935	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 936	if (error)
 937		goto fail;
 938
 939	/*
 940	 * We are not first mounter, now we need to wait for the control_lock
 941	 * lvb generation to be >= the generation from our first recover_done
 942	 * and all lvb bits to be clear (no pending journal recoveries.)
 943	 */
 944
 945	if (!all_jid_bits_clear(ls->ls_lvb_bits)) {
 946		/* journals need recovery, wait until all are clear */
 947		fs_info(sdp, "control_mount wait for journal recovery\n");
 948		goto restart;
 949	}
 950
 951	spin_lock(&ls->ls_recover_spin);
 952	block_gen = ls->ls_recover_block;
 953	start_gen = ls->ls_recover_start;
 954	mount_gen = ls->ls_recover_mount;
 955
 956	if (lvb_gen < mount_gen) {
 957		/* wait for mounted nodes to update control_lock lvb to our
 958		   generation, which might include new recovery bits set */
 959		if (sdp->sd_args.ar_spectator) {
 960			fs_info(sdp, "Recovery is required. Waiting for a "
 961				"non-spectator to mount.\n");
 962			msleep_interruptible(1000);
 963		} else {
 964			fs_info(sdp, "control_mount wait1 block %u start %u "
 965				"mount %u lvb %u flags %lx\n", block_gen,
 966				start_gen, mount_gen, lvb_gen,
 967				ls->ls_recover_flags);
 968		}
 969		spin_unlock(&ls->ls_recover_spin);
 970		goto restart;
 971	}
 972
 973	if (lvb_gen != start_gen) {
 974		/* wait for mounted nodes to update control_lock lvb to the
 975		   latest recovery generation */
 976		fs_info(sdp, "control_mount wait2 block %u start %u mount %u "
 977			"lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 978			lvb_gen, ls->ls_recover_flags);
 979		spin_unlock(&ls->ls_recover_spin);
 980		goto restart;
 981	}
 982
 983	if (block_gen == start_gen) {
 984		/* dlm recovery in progress, wait for it to finish */
 985		fs_info(sdp, "control_mount wait3 block %u start %u mount %u "
 986			"lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 987			lvb_gen, ls->ls_recover_flags);
 988		spin_unlock(&ls->ls_recover_spin);
 989		goto restart;
 990	}
 991
 992	clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 993	set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
 994	memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
 995	memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
 996	spin_unlock(&ls->ls_recover_spin);
 997	return 0;
 998
 999fail:
1000	mounted_unlock(sdp);
1001	control_unlock(sdp);
1002	return error;
1003}
1004
1005static int control_first_done(struct gfs2_sbd *sdp)
1006{
1007	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1008	uint32_t start_gen, block_gen;
1009	int error;
1010
1011restart:
1012	spin_lock(&ls->ls_recover_spin);
1013	start_gen = ls->ls_recover_start;
1014	block_gen = ls->ls_recover_block;
1015
1016	if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) ||
1017	    !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1018	    !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1019		/* sanity check, should not happen */
1020		fs_err(sdp, "control_first_done start %u block %u flags %lx\n",
1021		       start_gen, block_gen, ls->ls_recover_flags);
1022		spin_unlock(&ls->ls_recover_spin);
1023		control_unlock(sdp);
1024		return -1;
1025	}
1026
1027	if (start_gen == block_gen) {
1028		/*
1029		 * Wait for the end of a dlm recovery cycle to switch from
1030		 * first mounter recovery.  We can ignore any recover_slot
1031		 * callbacks between the recover_prep and next recover_done
1032		 * because we are still the first mounter and any failed nodes
1033		 * have not fully mounted, so they don't need recovery.
1034		 */
1035		spin_unlock(&ls->ls_recover_spin);
1036		fs_info(sdp, "control_first_done wait gen %u\n", start_gen);
1037
1038		wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY,
1039			    TASK_UNINTERRUPTIBLE);
1040		goto restart;
1041	}
1042
1043	clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1044	set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags);
1045	memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
1046	memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
1047	spin_unlock(&ls->ls_recover_spin);
1048
1049	memset(ls->ls_lvb_bits, 0, GDLM_LVB_SIZE);
1050	control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
1051
1052	error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT);
1053	if (error)
1054		fs_err(sdp, "control_first_done mounted PR error %d\n", error);
1055
1056	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
1057	if (error)
1058		fs_err(sdp, "control_first_done control NL error %d\n", error);
1059
1060	return error;
1061}
1062
1063/*
1064 * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC)
1065 * to accommodate the largest slot number.  (NB dlm slot numbers start at 1,
1066 * gfs2 jids start at 0, so jid = slot - 1)
1067 */
1068
1069#define RECOVER_SIZE_INC 16
1070
1071static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots,
1072			    int num_slots)
1073{
1074	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1075	uint32_t *submit = NULL;
1076	uint32_t *result = NULL;
1077	uint32_t old_size, new_size;
1078	int i, max_jid;
1079
1080	if (!ls->ls_lvb_bits) {
1081		ls->ls_lvb_bits = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
1082		if (!ls->ls_lvb_bits)
1083			return -ENOMEM;
1084	}
1085
1086	max_jid = 0;
1087	for (i = 0; i < num_slots; i++) {
1088		if (max_jid < slots[i].slot - 1)
1089			max_jid = slots[i].slot - 1;
1090	}
1091
1092	old_size = ls->ls_recover_size;
1093	new_size = old_size;
1094	while (new_size < max_jid + 1)
1095		new_size += RECOVER_SIZE_INC;
1096	if (new_size == old_size)
1097		return 0;
1098
1099	submit = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1100	result = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1101	if (!submit || !result) {
1102		kfree(submit);
1103		kfree(result);
1104		return -ENOMEM;
1105	}
1106
1107	spin_lock(&ls->ls_recover_spin);
1108	memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t));
1109	memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t));
1110	kfree(ls->ls_recover_submit);
1111	kfree(ls->ls_recover_result);
1112	ls->ls_recover_submit = submit;
1113	ls->ls_recover_result = result;
1114	ls->ls_recover_size = new_size;
1115	spin_unlock(&ls->ls_recover_spin);
1116	return 0;
1117}
1118
1119static void free_recover_size(struct lm_lockstruct *ls)
1120{
1121	kfree(ls->ls_lvb_bits);
1122	kfree(ls->ls_recover_submit);
1123	kfree(ls->ls_recover_result);
1124	ls->ls_recover_submit = NULL;
1125	ls->ls_recover_result = NULL;
1126	ls->ls_recover_size = 0;
1127	ls->ls_lvb_bits = NULL;
1128}
1129
1130/* dlm calls before it does lock recovery */
1131
1132static void gdlm_recover_prep(void *arg)
1133{
1134	struct gfs2_sbd *sdp = arg;
1135	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1136
1137	if (gfs2_withdrawing_or_withdrawn(sdp)) {
1138		fs_err(sdp, "recover_prep ignored due to withdraw.\n");
1139		return;
1140	}
1141	spin_lock(&ls->ls_recover_spin);
1142	ls->ls_recover_block = ls->ls_recover_start;
1143	set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1144
1145	if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1146	     test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1147		spin_unlock(&ls->ls_recover_spin);
1148		return;
1149	}
1150	set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
1151	spin_unlock(&ls->ls_recover_spin);
1152}
1153
1154/* dlm calls after recover_prep has been completed on all lockspace members;
1155   identifies slot/jid of failed member */
1156
1157static void gdlm_recover_slot(void *arg, struct dlm_slot *slot)
1158{
1159	struct gfs2_sbd *sdp = arg;
1160	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1161	int jid = slot->slot - 1;
1162
1163	if (gfs2_withdrawing_or_withdrawn(sdp)) {
1164		fs_err(sdp, "recover_slot jid %d ignored due to withdraw.\n",
1165		       jid);
1166		return;
1167	}
1168	spin_lock(&ls->ls_recover_spin);
1169	if (ls->ls_recover_size < jid + 1) {
1170		fs_err(sdp, "recover_slot jid %d gen %u short size %d\n",
1171		       jid, ls->ls_recover_block, ls->ls_recover_size);
1172		spin_unlock(&ls->ls_recover_spin);
1173		return;
1174	}
1175
1176	if (ls->ls_recover_submit[jid]) {
1177		fs_info(sdp, "recover_slot jid %d gen %u prev %u\n",
1178			jid, ls->ls_recover_block, ls->ls_recover_submit[jid]);
1179	}
1180	ls->ls_recover_submit[jid] = ls->ls_recover_block;
1181	spin_unlock(&ls->ls_recover_spin);
1182}
1183
1184/* dlm calls after recover_slot and after it completes lock recovery */
1185
1186static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
1187			      int our_slot, uint32_t generation)
1188{
1189	struct gfs2_sbd *sdp = arg;
1190	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1191
1192	if (gfs2_withdrawing_or_withdrawn(sdp)) {
1193		fs_err(sdp, "recover_done ignored due to withdraw.\n");
1194		return;
1195	}
1196	/* ensure the ls jid arrays are large enough */
1197	set_recover_size(sdp, slots, num_slots);
1198
1199	spin_lock(&ls->ls_recover_spin);
1200	ls->ls_recover_start = generation;
1201
1202	if (!ls->ls_recover_mount) {
1203		ls->ls_recover_mount = generation;
1204		ls->ls_jid = our_slot - 1;
1205	}
1206
1207	if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1208		queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
1209
1210	clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1211	smp_mb__after_atomic();
1212	wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY);
1213	spin_unlock(&ls->ls_recover_spin);
1214}
1215
1216/* gfs2_recover thread has a journal recovery result */
1217
1218static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid,
1219				 unsigned int result)
1220{
1221	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1222
1223	if (gfs2_withdrawing_or_withdrawn(sdp)) {
1224		fs_err(sdp, "recovery_result jid %d ignored due to withdraw.\n",
1225		       jid);
1226		return;
1227	}
1228	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1229		return;
1230
1231	/* don't care about the recovery of own journal during mount */
1232	if (jid == ls->ls_jid)
1233		return;
1234
1235	spin_lock(&ls->ls_recover_spin);
1236	if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1237		spin_unlock(&ls->ls_recover_spin);
1238		return;
1239	}
1240	if (ls->ls_recover_size < jid + 1) {
1241		fs_err(sdp, "recovery_result jid %d short size %d\n",
1242		       jid, ls->ls_recover_size);
1243		spin_unlock(&ls->ls_recover_spin);
1244		return;
1245	}
1246
1247	fs_info(sdp, "recover jid %d result %s\n", jid,
1248		result == LM_RD_GAVEUP ? "busy" : "success");
1249
1250	ls->ls_recover_result[jid] = result;
1251
1252	/* GAVEUP means another node is recovering the journal; delay our
1253	   next attempt to recover it, to give the other node a chance to
1254	   finish before trying again */
1255
1256	if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1257		queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work,
1258				   result == LM_RD_GAVEUP ? HZ : 0);
1259	spin_unlock(&ls->ls_recover_spin);
1260}
1261
1262static const struct dlm_lockspace_ops gdlm_lockspace_ops = {
1263	.recover_prep = gdlm_recover_prep,
1264	.recover_slot = gdlm_recover_slot,
1265	.recover_done = gdlm_recover_done,
1266};
1267
1268static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1269{
1270	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1271	char cluster[GFS2_LOCKNAME_LEN];
1272	const char *fsname;
1273	uint32_t flags;
1274	int error, ops_result;
1275
1276	/*
1277	 * initialize everything
1278	 */
1279
1280	INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func);
1281	spin_lock_init(&ls->ls_recover_spin);
1282	ls->ls_recover_flags = 0;
1283	ls->ls_recover_mount = 0;
1284	ls->ls_recover_start = 0;
1285	ls->ls_recover_block = 0;
1286	ls->ls_recover_size = 0;
1287	ls->ls_recover_submit = NULL;
1288	ls->ls_recover_result = NULL;
1289	ls->ls_lvb_bits = NULL;
1290
1291	error = set_recover_size(sdp, NULL, 0);
1292	if (error)
1293		goto fail;
1294
1295	/*
1296	 * prepare dlm_new_lockspace args
1297	 */
1298
1299	fsname = strchr(table, ':');
1300	if (!fsname) {
1301		fs_info(sdp, "no fsname found\n");
1302		error = -EINVAL;
1303		goto fail_free;
1304	}
1305	memset(cluster, 0, sizeof(cluster));
1306	memcpy(cluster, table, strlen(table) - strlen(fsname));
1307	fsname++;
1308
1309	flags = DLM_LSFL_NEWEXCL;
1310
1311	/*
1312	 * create/join lockspace
1313	 */
1314
1315	error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE,
1316				  &gdlm_lockspace_ops, sdp, &ops_result,
1317				  &ls->ls_dlm);
1318	if (error) {
1319		fs_err(sdp, "dlm_new_lockspace error %d\n", error);
1320		goto fail_free;
1321	}
1322
1323	if (ops_result < 0) {
1324		/*
1325		 * dlm does not support ops callbacks,
1326		 * old dlm_controld/gfs_controld are used, try without ops.
1327		 */
1328		fs_info(sdp, "dlm lockspace ops not used\n");
1329		free_recover_size(ls);
1330		set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags);
1331		return 0;
1332	}
1333
1334	if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) {
1335		fs_err(sdp, "dlm lockspace ops disallow jid preset\n");
1336		error = -EINVAL;
1337		goto fail_release;
1338	}
1339
1340	/*
1341	 * control_mount() uses control_lock to determine first mounter,
1342	 * and for later mounts, waits for any recoveries to be cleared.
1343	 */
1344
1345	error = control_mount(sdp);
1346	if (error) {
1347		fs_err(sdp, "mount control error %d\n", error);
1348		goto fail_release;
1349	}
1350
1351	ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1352	clear_bit(SDF_NOJOURNALID, &sdp->sd_flags);
1353	smp_mb__after_atomic();
1354	wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID);
1355	return 0;
1356
1357fail_release:
1358	dlm_release_lockspace(ls->ls_dlm, 2);
1359fail_free:
1360	free_recover_size(ls);
1361fail:
1362	return error;
1363}
1364
1365static void gdlm_first_done(struct gfs2_sbd *sdp)
1366{
1367	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1368	int error;
1369
1370	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1371		return;
1372
1373	error = control_first_done(sdp);
1374	if (error)
1375		fs_err(sdp, "mount first_done error %d\n", error);
1376}
1377
1378static void gdlm_unmount(struct gfs2_sbd *sdp)
1379{
1380	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1381
1382	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1383		goto release;
1384
1385	/* wait for gfs2_control_wq to be done with this mount */
1386
1387	spin_lock(&ls->ls_recover_spin);
1388	set_bit(DFL_UNMOUNT, &ls->ls_recover_flags);
1389	spin_unlock(&ls->ls_recover_spin);
1390	flush_delayed_work(&sdp->sd_control_work);
1391
1392	/* mounted_lock and control_lock will be purged in dlm recovery */
1393release:
1394	if (ls->ls_dlm) {
1395		dlm_release_lockspace(ls->ls_dlm, 2);
1396		ls->ls_dlm = NULL;
1397	}
1398
1399	free_recover_size(ls);
1400}
1401
1402static const match_table_t dlm_tokens = {
1403	{ Opt_jid, "jid=%d"},
1404	{ Opt_id, "id=%d"},
1405	{ Opt_first, "first=%d"},
1406	{ Opt_nodir, "nodir=%d"},
1407	{ Opt_err, NULL },
1408};
1409
1410const struct lm_lockops gfs2_dlm_ops = {
1411	.lm_proto_name = "lock_dlm",
1412	.lm_mount = gdlm_mount,
1413	.lm_first_done = gdlm_first_done,
1414	.lm_recovery_result = gdlm_recovery_result,
1415	.lm_unmount = gdlm_unmount,
1416	.lm_put_lock = gdlm_put_lock,
1417	.lm_lock = gdlm_lock,
1418	.lm_cancel = gdlm_cancel,
1419	.lm_tokens = &dlm_tokens,
1420};
1421
   1// SPDX-License-Identifier: GPL-2.0-only
   2/*
   3 * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
   4 * Copyright 2004-2011 Red Hat, Inc.
   5 */
   6
   7#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
   8
   9#include <linux/fs.h>
  10#include <linux/dlm.h>
  11#include <linux/slab.h>
  12#include <linux/types.h>
  13#include <linux/delay.h>
  14#include <linux/gfs2_ondisk.h>
  15#include <linux/sched/signal.h>
  16
  17#include "incore.h"
  18#include "glock.h"
 
 
  19#include "util.h"
  20#include "sys.h"
  21#include "trace_gfs2.h"
  22
  23/**
  24 * gfs2_update_stats - Update time based stats
  25 * @mv: Pointer to mean/variance structure to update
 
  26 * @sample: New data to include
  27 *
  28 * @delta is the difference between the current rtt sample and the
  29 * running average srtt. We add 1/8 of that to the srtt in order to
  30 * update the current srtt estimate. The variance estimate is a bit
  31 * more complicated. We subtract the current variance estimate from
  32 * the abs value of the @delta and add 1/4 of that to the running
  33 * total.  That's equivalent to 3/4 of the current variance
  34 * estimate plus 1/4 of the abs of @delta.
  35 *
  36 * Note that the index points at the array entry containing the smoothed
  37 * mean value, and the variance is always in the following entry
  38 *
  39 * Reference: TCP/IP Illustrated, vol 2, p. 831,832
  40 * All times are in units of integer nanoseconds. Unlike the TCP/IP case,
  41 * they are not scaled fixed point.
  42 */
  43
  44static inline void gfs2_update_stats(struct gfs2_lkstats *s, unsigned index,
  45				     s64 sample)
  46{
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  47	s64 delta = sample - s->stats[index];
  48	s->stats[index] += (delta >> 3);
  49	index++;
  50	s->stats[index] += (s64)(abs(delta) - s->stats[index]) >> 2;
  51}
  52
  53/**
  54 * gfs2_update_reply_times - Update locking statistics
  55 * @gl: The glock to update
  56 *
  57 * This assumes that gl->gl_dstamp has been set earlier.
  58 *
  59 * The rtt (lock round trip time) is an estimate of the time
  60 * taken to perform a dlm lock request. We update it on each
  61 * reply from the dlm.
  62 *
  63 * The blocking flag is set on the glock for all dlm requests
  64 * which may potentially block due to lock requests from other nodes.
  65 * DLM requests where the current lock state is exclusive, the
  66 * requested state is null (or unlocked) or where the TRY or
  67 * TRY_1CB flags are set are classified as non-blocking. All
  68 * other DLM requests are counted as (potentially) blocking.
  69 */
  70static inline void gfs2_update_reply_times(struct gfs2_glock *gl)
  71{
  72	struct gfs2_pcpu_lkstats *lks;
  73	const unsigned gltype = gl->gl_name.ln_type;
  74	unsigned index = test_bit(GLF_BLOCKING, &gl->gl_flags) ?
  75			 GFS2_LKS_SRTTB : GFS2_LKS_SRTT;
  76	s64 rtt;
  77
  78	preempt_disable();
  79	rtt = ktime_to_ns(ktime_sub(ktime_get_real(), gl->gl_dstamp));
  80	lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
  81	gfs2_update_stats(&gl->gl_stats, index, rtt);		/* Local */
  82	gfs2_update_stats(&lks->lkstats[gltype], index, rtt);	/* Global */
  83	preempt_enable();
  84
  85	trace_gfs2_glock_lock_time(gl, rtt);
  86}
  87
  88/**
  89 * gfs2_update_request_times - Update locking statistics
  90 * @gl: The glock to update
  91 *
  92 * The irt (lock inter-request times) measures the average time
  93 * between requests to the dlm. It is updated immediately before
  94 * each dlm call.
  95 */
  96
  97static inline void gfs2_update_request_times(struct gfs2_glock *gl)
  98{
  99	struct gfs2_pcpu_lkstats *lks;
 100	const unsigned gltype = gl->gl_name.ln_type;
 101	ktime_t dstamp;
 102	s64 irt;
 103
 104	preempt_disable();
 105	dstamp = gl->gl_dstamp;
 106	gl->gl_dstamp = ktime_get_real();
 107	irt = ktime_to_ns(ktime_sub(gl->gl_dstamp, dstamp));
 108	lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
 109	gfs2_update_stats(&gl->gl_stats, GFS2_LKS_SIRT, irt);		/* Local */
 110	gfs2_update_stats(&lks->lkstats[gltype], GFS2_LKS_SIRT, irt);	/* Global */
 111	preempt_enable();
 112}
 113 
 114static void gdlm_ast(void *arg)
 115{
 116	struct gfs2_glock *gl = arg;
 117	unsigned ret = gl->gl_state;
 118
 
 
 
 
 
 119	gfs2_update_reply_times(gl);
 120	BUG_ON(gl->gl_lksb.sb_flags & DLM_SBF_DEMOTED);
 121
 122	if ((gl->gl_lksb.sb_flags & DLM_SBF_VALNOTVALID) && gl->gl_lksb.sb_lvbptr)
 123		memset(gl->gl_lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
 124
 125	switch (gl->gl_lksb.sb_status) {
 126	case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
 
 
 127		gfs2_glock_free(gl);
 128		return;
 129	case -DLM_ECANCEL: /* Cancel while getting lock */
 130		ret |= LM_OUT_CANCELED;
 131		goto out;
 132	case -EAGAIN: /* Try lock fails */
 133	case -EDEADLK: /* Deadlock detected */
 134		goto out;
 135	case -ETIMEDOUT: /* Canceled due to timeout */
 136		ret |= LM_OUT_ERROR;
 137		goto out;
 138	case 0: /* Success */
 139		break;
 140	default: /* Something unexpected */
 141		BUG();
 142	}
 143
 144	ret = gl->gl_req;
 145	if (gl->gl_lksb.sb_flags & DLM_SBF_ALTMODE) {
 146		if (gl->gl_req == LM_ST_SHARED)
 147			ret = LM_ST_DEFERRED;
 148		else if (gl->gl_req == LM_ST_DEFERRED)
 149			ret = LM_ST_SHARED;
 150		else
 151			BUG();
 152	}
 153
 154	set_bit(GLF_INITIAL, &gl->gl_flags);
 155	gfs2_glock_complete(gl, ret);
 156	return;
 157out:
 158	if (!test_bit(GLF_INITIAL, &gl->gl_flags))
 159		gl->gl_lksb.sb_lkid = 0;
 160	gfs2_glock_complete(gl, ret);
 161}
 162
 163static void gdlm_bast(void *arg, int mode)
 164{
 165	struct gfs2_glock *gl = arg;
 166
 
 
 
 167	switch (mode) {
 168	case DLM_LOCK_EX:
 169		gfs2_glock_cb(gl, LM_ST_UNLOCKED);
 170		break;
 171	case DLM_LOCK_CW:
 172		gfs2_glock_cb(gl, LM_ST_DEFERRED);
 173		break;
 174	case DLM_LOCK_PR:
 175		gfs2_glock_cb(gl, LM_ST_SHARED);
 176		break;
 177	default:
 178		fs_err(gl->gl_name.ln_sbd, "unknown bast mode %d\n", mode);
 179		BUG();
 180	}
 181}
 182
 183/* convert gfs lock-state to dlm lock-mode */
 184
 185static int make_mode(struct gfs2_sbd *sdp, const unsigned int lmstate)
 186{
 187	switch (lmstate) {
 188	case LM_ST_UNLOCKED:
 189		return DLM_LOCK_NL;
 190	case LM_ST_EXCLUSIVE:
 191		return DLM_LOCK_EX;
 192	case LM_ST_DEFERRED:
 193		return DLM_LOCK_CW;
 194	case LM_ST_SHARED:
 195		return DLM_LOCK_PR;
 196	}
 197	fs_err(sdp, "unknown LM state %d\n", lmstate);
 198	BUG();
 199	return -1;
 200}
 201
 202static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
 203		      const int req)
 204{
 205	u32 lkf = 0;
 206
 207	if (gl->gl_lksb.sb_lvbptr)
 208		lkf |= DLM_LKF_VALBLK;
 209
 210	if (gfs_flags & LM_FLAG_TRY)
 211		lkf |= DLM_LKF_NOQUEUE;
 212
 213	if (gfs_flags & LM_FLAG_TRY_1CB) {
 214		lkf |= DLM_LKF_NOQUEUE;
 215		lkf |= DLM_LKF_NOQUEUEBAST;
 216	}
 217
 218	if (gfs_flags & LM_FLAG_PRIORITY) {
 219		lkf |= DLM_LKF_NOORDER;
 220		lkf |= DLM_LKF_HEADQUE;
 221	}
 222
 223	if (gfs_flags & LM_FLAG_ANY) {
 224		if (req == DLM_LOCK_PR)
 225			lkf |= DLM_LKF_ALTCW;
 226		else if (req == DLM_LOCK_CW)
 227			lkf |= DLM_LKF_ALTPR;
 228		else
 229			BUG();
 230	}
 231
 232	if (gl->gl_lksb.sb_lkid != 0) {
 233		lkf |= DLM_LKF_CONVERT;
 234		if (test_bit(GLF_BLOCKING, &gl->gl_flags))
 235			lkf |= DLM_LKF_QUECVT;
 236	}
 237
 238	return lkf;
 239}
 240
 241static void gfs2_reverse_hex(char *c, u64 value)
 242{
 243	*c = '0';
 244	while (value) {
 245		*c-- = hex_asc[value & 0x0f];
 246		value >>= 4;
 247	}
 248}
 249
 250static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
 251		     unsigned int flags)
 252{
 253	struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
 254	int req;
 255	u32 lkf;
 256	char strname[GDLM_STRNAME_BYTES] = "";
 
 257
 258	req = make_mode(gl->gl_name.ln_sbd, req_state);
 259	lkf = make_flags(gl, flags, req);
 260	gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
 261	gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
 262	if (gl->gl_lksb.sb_lkid) {
 263		gfs2_update_request_times(gl);
 264	} else {
 265		memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
 266		strname[GDLM_STRNAME_BYTES - 1] = '\0';
 267		gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
 268		gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
 269		gl->gl_dstamp = ktime_get_real();
 270	}
 271	/*
 272	 * Submit the actual lock request.
 273	 */
 274
 275	return dlm_lock(ls->ls_dlm, req, &gl->gl_lksb, lkf, strname,
 
 276			GDLM_STRNAME_BYTES - 1, 0, gdlm_ast, gl, gdlm_bast);
 
 
 
 
 
 277}
 278
 279static void gdlm_put_lock(struct gfs2_glock *gl)
 280{
 281	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 282	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 283	int lvb_needs_unlock = 0;
 284	int error;
 285
 
 
 286	if (gl->gl_lksb.sb_lkid == 0) {
 287		gfs2_glock_free(gl);
 288		return;
 289	}
 290
 291	clear_bit(GLF_BLOCKING, &gl->gl_flags);
 292	gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
 293	gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
 294	gfs2_update_request_times(gl);
 295
 296	/* don't want to skip dlm_unlock writing the lvb when lock is ex */
 297
 298	if (gl->gl_lksb.sb_lvbptr && (gl->gl_state == LM_ST_EXCLUSIVE))
 299		lvb_needs_unlock = 1;
 
 
 300
 301	if (test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags) &&
 302	    !lvb_needs_unlock) {
 303		gfs2_glock_free(gl);
 304		return;
 305	}
 306
 
 307	error = dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_VALBLK,
 308			   NULL, gl);
 
 
 
 
 
 309	if (error) {
 310		fs_err(sdp, "gdlm_unlock %x,%llx err=%d\n",
 311		       gl->gl_name.ln_type,
 312		       (unsigned long long)gl->gl_name.ln_number, error);
 313		return;
 314	}
 315}
 316
 317static void gdlm_cancel(struct gfs2_glock *gl)
 318{
 319	struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
 320	dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl);
 321}
 322
 323/*
 324 * dlm/gfs2 recovery coordination using dlm_recover callbacks
 325 *
 
 326 *  1. dlm_controld sees lockspace members change
 327 *  2. dlm_controld blocks dlm-kernel locking activity
 328 *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
 329 *  4. dlm_controld starts and finishes its own user level recovery
 330 *  5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery
 331 *  6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot)
 332 *  7. dlm_recoverd does its own lock recovery
 333 *  8. dlm_recoverd unblocks dlm-kernel locking activity
 334 *  9. dlm_recoverd notifies gfs2 when done (recover_done with new generation)
 335 * 10. gfs2_control updates control_lock lvb with new generation and jid bits
 336 * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none)
 337 * 12. gfs2_recover dequeues and recovers journals of failed nodes
 338 * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result)
 339 * 14. gfs2_control updates control_lock lvb jid bits for recovered journals
 340 * 15. gfs2_control unblocks normal locking when all journals are recovered
 341 *
 342 * - failures during recovery
 343 *
 344 * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control
 345 * clears BLOCK_LOCKS (step 15), e.g. another node fails while still
 346 * recovering for a prior failure.  gfs2_control needs a way to detect
 347 * this so it can leave BLOCK_LOCKS set in step 15.  This is managed using
 348 * the recover_block and recover_start values.
 349 *
 350 * recover_done() provides a new lockspace generation number each time it
 351 * is called (step 9).  This generation number is saved as recover_start.
 352 * When recover_prep() is called, it sets BLOCK_LOCKS and sets
 353 * recover_block = recover_start.  So, while recover_block is equal to
 354 * recover_start, BLOCK_LOCKS should remain set.  (recover_spin must
 355 * be held around the BLOCK_LOCKS/recover_block/recover_start logic.)
 356 *
 357 * - more specific gfs2 steps in sequence above
 358 *
 359 *  3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start
 360 *  6. recover_slot records any failed jids (maybe none)
 361 *  9. recover_done sets recover_start = new generation number
 362 * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids
 363 * 12. gfs2_recover does journal recoveries for failed jids identified above
 364 * 14. gfs2_control clears control_lock lvb bits for recovered jids
 365 * 15. gfs2_control checks if recover_block == recover_start (step 3 occured
 366 *     again) then do nothing, otherwise if recover_start > recover_block
 367 *     then clear BLOCK_LOCKS.
 368 *
 369 * - parallel recovery steps across all nodes
 370 *
 371 * All nodes attempt to update the control_lock lvb with the new generation
 372 * number and jid bits, but only the first to get the control_lock EX will
 373 * do so; others will see that it's already done (lvb already contains new
 374 * generation number.)
 375 *
 376 * . All nodes get the same recover_prep/recover_slot/recover_done callbacks
 377 * . All nodes attempt to set control_lock lvb gen + bits for the new gen
 378 * . One node gets control_lock first and writes the lvb, others see it's done
 379 * . All nodes attempt to recover jids for which they see control_lock bits set
 380 * . One node succeeds for a jid, and that one clears the jid bit in the lvb
 381 * . All nodes will eventually see all lvb bits clear and unblock locks
 382 *
 383 * - is there a problem with clearing an lvb bit that should be set
 384 *   and missing a journal recovery?
 385 *
 386 * 1. jid fails
 387 * 2. lvb bit set for step 1
 388 * 3. jid recovered for step 1
 389 * 4. jid taken again (new mount)
 390 * 5. jid fails (for step 4)
 391 * 6. lvb bit set for step 5 (will already be set)
 392 * 7. lvb bit cleared for step 3
 393 *
 394 * This is not a problem because the failure in step 5 does not
 395 * require recovery, because the mount in step 4 could not have
 396 * progressed far enough to unblock locks and access the fs.  The
 397 * control_mount() function waits for all recoveries to be complete
 398 * for the latest lockspace generation before ever unblocking locks
 399 * and returning.  The mount in step 4 waits until the recovery in
 400 * step 1 is done.
 401 *
 402 * - special case of first mounter: first node to mount the fs
 403 *
 404 * The first node to mount a gfs2 fs needs to check all the journals
 405 * and recover any that need recovery before other nodes are allowed
 406 * to mount the fs.  (Others may begin mounting, but they must wait
 407 * for the first mounter to be done before taking locks on the fs
 408 * or accessing the fs.)  This has two parts:
 409 *
 410 * 1. The mounted_lock tells a node it's the first to mount the fs.
 411 * Each node holds the mounted_lock in PR while it's mounted.
 412 * Each node tries to acquire the mounted_lock in EX when it mounts.
 413 * If a node is granted the mounted_lock EX it means there are no
 414 * other mounted nodes (no PR locks exist), and it is the first mounter.
 415 * The mounted_lock is demoted to PR when first recovery is done, so
 416 * others will fail to get an EX lock, but will get a PR lock.
 417 *
 418 * 2. The control_lock blocks others in control_mount() while the first
 419 * mounter is doing first mount recovery of all journals.
 420 * A mounting node needs to acquire control_lock in EX mode before
 421 * it can proceed.  The first mounter holds control_lock in EX while doing
 422 * the first mount recovery, blocking mounts from other nodes, then demotes
 423 * control_lock to NL when it's done (others_may_mount/first_done),
 424 * allowing other nodes to continue mounting.
 425 *
 426 * first mounter:
 427 * control_lock EX/NOQUEUE success
 428 * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters)
 429 * set first=1
 430 * do first mounter recovery
 431 * mounted_lock EX->PR
 432 * control_lock EX->NL, write lvb generation
 433 *
 434 * other mounter:
 435 * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry)
 436 * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR)
 437 * mounted_lock PR/NOQUEUE success
 438 * read lvb generation
 439 * control_lock EX->NL
 440 * set first=0
 441 *
 442 * - mount during recovery
 443 *
 444 * If a node mounts while others are doing recovery (not first mounter),
 445 * the mounting node will get its initial recover_done() callback without
 446 * having seen any previous failures/callbacks.
 447 *
 448 * It must wait for all recoveries preceding its mount to be finished
 449 * before it unblocks locks.  It does this by repeating the "other mounter"
 450 * steps above until the lvb generation number is >= its mount generation
 451 * number (from initial recover_done) and all lvb bits are clear.
 452 *
 453 * - control_lock lvb format
 454 *
 455 * 4 bytes generation number: the latest dlm lockspace generation number
 456 * from recover_done callback.  Indicates the jid bitmap has been updated
 457 * to reflect all slot failures through that generation.
 458 * 4 bytes unused.
 459 * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates
 460 * that jid N needs recovery.
 461 */
 462
 463#define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */
 464
 465static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen,
 466			     char *lvb_bits)
 467{
 468	__le32 gen;
 469	memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE);
 470	memcpy(&gen, lvb_bits, sizeof(__le32));
 471	*lvb_gen = le32_to_cpu(gen);
 472}
 473
 474static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen,
 475			      char *lvb_bits)
 476{
 477	__le32 gen;
 478	memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE);
 479	gen = cpu_to_le32(lvb_gen);
 480	memcpy(ls->ls_control_lvb, &gen, sizeof(__le32));
 481}
 482
 483static int all_jid_bits_clear(char *lvb)
 484{
 485	return !memchr_inv(lvb + JID_BITMAP_OFFSET, 0,
 486			GDLM_LVB_SIZE - JID_BITMAP_OFFSET);
 487}
 488
 489static void sync_wait_cb(void *arg)
 490{
 491	struct lm_lockstruct *ls = arg;
 492	complete(&ls->ls_sync_wait);
 493}
 494
 495static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name)
 496{
 497	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 498	int error;
 499
 500	error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
 501	if (error) {
 502		fs_err(sdp, "%s lkid %x error %d\n",
 503		       name, lksb->sb_lkid, error);
 504		return error;
 505	}
 506
 507	wait_for_completion(&ls->ls_sync_wait);
 508
 509	if (lksb->sb_status != -DLM_EUNLOCK) {
 510		fs_err(sdp, "%s lkid %x status %d\n",
 511		       name, lksb->sb_lkid, lksb->sb_status);
 512		return -1;
 513	}
 514	return 0;
 515}
 516
 517static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags,
 518		     unsigned int num, struct dlm_lksb *lksb, char *name)
 519{
 520	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 521	char strname[GDLM_STRNAME_BYTES];
 522	int error, status;
 523
 524	memset(strname, 0, GDLM_STRNAME_BYTES);
 525	snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
 526
 527	error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
 528			 strname, GDLM_STRNAME_BYTES - 1,
 529			 0, sync_wait_cb, ls, NULL);
 530	if (error) {
 531		fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
 532		       name, lksb->sb_lkid, flags, mode, error);
 533		return error;
 534	}
 535
 536	wait_for_completion(&ls->ls_sync_wait);
 537
 538	status = lksb->sb_status;
 539
 540	if (status && status != -EAGAIN) {
 541		fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n",
 542		       name, lksb->sb_lkid, flags, mode, status);
 543	}
 544
 545	return status;
 546}
 547
 548static int mounted_unlock(struct gfs2_sbd *sdp)
 549{
 550	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 551	return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock");
 552}
 553
 554static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
 555{
 556	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 557	return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK,
 558			 &ls->ls_mounted_lksb, "mounted_lock");
 559}
 560
 561static int control_unlock(struct gfs2_sbd *sdp)
 562{
 563	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 564	return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock");
 565}
 566
 567static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
 568{
 569	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 570	return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK,
 571			 &ls->ls_control_lksb, "control_lock");
 572}
 573
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 574static void gfs2_control_func(struct work_struct *work)
 575{
 576	struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
 577	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 578	uint32_t block_gen, start_gen, lvb_gen, flags;
 579	int recover_set = 0;
 580	int write_lvb = 0;
 581	int recover_size;
 582	int i, error;
 583
 
 
 
 
 
 
 
 584	spin_lock(&ls->ls_recover_spin);
 585	/*
 586	 * No MOUNT_DONE means we're still mounting; control_mount()
 587	 * will set this flag, after which this thread will take over
 588	 * all further clearing of BLOCK_LOCKS.
 589	 *
 590	 * FIRST_MOUNT means this node is doing first mounter recovery,
 591	 * for which recovery control is handled by
 592	 * control_mount()/control_first_done(), not this thread.
 593	 */
 594	if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
 595	     test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
 596		spin_unlock(&ls->ls_recover_spin);
 597		return;
 598	}
 599	block_gen = ls->ls_recover_block;
 600	start_gen = ls->ls_recover_start;
 601	spin_unlock(&ls->ls_recover_spin);
 602
 603	/*
 604	 * Equal block_gen and start_gen implies we are between
 605	 * recover_prep and recover_done callbacks, which means
 606	 * dlm recovery is in progress and dlm locking is blocked.
 607	 * There's no point trying to do any work until recover_done.
 608	 */
 609
 610	if (block_gen == start_gen)
 611		return;
 612
 613	/*
 614	 * Propagate recover_submit[] and recover_result[] to lvb:
 615	 * dlm_recoverd adds to recover_submit[] jids needing recovery
 616	 * gfs2_recover adds to recover_result[] journal recovery results
 617	 *
 618	 * set lvb bit for jids in recover_submit[] if the lvb has not
 619	 * yet been updated for the generation of the failure
 620	 *
 621	 * clear lvb bit for jids in recover_result[] if the result of
 622	 * the journal recovery is SUCCESS
 623	 */
 624
 625	error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
 626	if (error) {
 627		fs_err(sdp, "control lock EX error %d\n", error);
 628		return;
 629	}
 630
 631	control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
 632
 633	spin_lock(&ls->ls_recover_spin);
 634	if (block_gen != ls->ls_recover_block ||
 635	    start_gen != ls->ls_recover_start) {
 636		fs_info(sdp, "recover generation %u block1 %u %u\n",
 637			start_gen, block_gen, ls->ls_recover_block);
 638		spin_unlock(&ls->ls_recover_spin);
 639		control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 640		return;
 641	}
 642
 643	recover_size = ls->ls_recover_size;
 644
 645	if (lvb_gen <= start_gen) {
 646		/*
 647		 * Clear lvb bits for jids we've successfully recovered.
 648		 * Because all nodes attempt to recover failed journals,
 649		 * a journal can be recovered multiple times successfully
 650		 * in succession.  Only the first will really do recovery,
 651		 * the others find it clean, but still report a successful
 652		 * recovery.  So, another node may have already recovered
 653		 * the jid and cleared the lvb bit for it.
 654		 */
 655		for (i = 0; i < recover_size; i++) {
 656			if (ls->ls_recover_result[i] != LM_RD_SUCCESS)
 657				continue;
 658
 659			ls->ls_recover_result[i] = 0;
 660
 661			if (!test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET))
 662				continue;
 663
 664			__clear_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
 665			write_lvb = 1;
 666		}
 667	}
 668
 669	if (lvb_gen == start_gen) {
 670		/*
 671		 * Failed slots before start_gen are already set in lvb.
 672		 */
 673		for (i = 0; i < recover_size; i++) {
 674			if (!ls->ls_recover_submit[i])
 675				continue;
 676			if (ls->ls_recover_submit[i] < lvb_gen)
 677				ls->ls_recover_submit[i] = 0;
 678		}
 679	} else if (lvb_gen < start_gen) {
 680		/*
 681		 * Failed slots before start_gen are not yet set in lvb.
 682		 */
 683		for (i = 0; i < recover_size; i++) {
 684			if (!ls->ls_recover_submit[i])
 685				continue;
 686			if (ls->ls_recover_submit[i] < start_gen) {
 687				ls->ls_recover_submit[i] = 0;
 688				__set_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
 689			}
 690		}
 691		/* even if there are no bits to set, we need to write the
 692		   latest generation to the lvb */
 693		write_lvb = 1;
 694	} else {
 695		/*
 696		 * we should be getting a recover_done() for lvb_gen soon
 697		 */
 698	}
 699	spin_unlock(&ls->ls_recover_spin);
 700
 701	if (write_lvb) {
 702		control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
 703		flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK;
 704	} else {
 705		flags = DLM_LKF_CONVERT;
 706	}
 707
 708	error = control_lock(sdp, DLM_LOCK_NL, flags);
 709	if (error) {
 710		fs_err(sdp, "control lock NL error %d\n", error);
 711		return;
 712	}
 713
 714	/*
 715	 * Everyone will see jid bits set in the lvb, run gfs2_recover_set(),
 716	 * and clear a jid bit in the lvb if the recovery is a success.
 717	 * Eventually all journals will be recovered, all jid bits will
 718	 * be cleared in the lvb, and everyone will clear BLOCK_LOCKS.
 719	 */
 720
 721	for (i = 0; i < recover_size; i++) {
 722		if (test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET)) {
 723			fs_info(sdp, "recover generation %u jid %d\n",
 724				start_gen, i);
 725			gfs2_recover_set(sdp, i);
 726			recover_set++;
 727		}
 728	}
 729	if (recover_set)
 730		return;
 731
 732	/*
 733	 * No more jid bits set in lvb, all recovery is done, unblock locks
 734	 * (unless a new recover_prep callback has occured blocking locks
 735	 * again while working above)
 736	 */
 737
 738	spin_lock(&ls->ls_recover_spin);
 739	if (ls->ls_recover_block == block_gen &&
 740	    ls->ls_recover_start == start_gen) {
 741		clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 742		spin_unlock(&ls->ls_recover_spin);
 743		fs_info(sdp, "recover generation %u done\n", start_gen);
 744		gfs2_glock_thaw(sdp);
 745	} else {
 746		fs_info(sdp, "recover generation %u block2 %u %u\n",
 747			start_gen, block_gen, ls->ls_recover_block);
 748		spin_unlock(&ls->ls_recover_spin);
 749	}
 750}
 751
 752static int control_mount(struct gfs2_sbd *sdp)
 753{
 754	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 755	uint32_t start_gen, block_gen, mount_gen, lvb_gen;
 756	int mounted_mode;
 757	int retries = 0;
 758	int error;
 759
 760	memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb));
 761	memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb));
 762	memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE);
 763	ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb;
 764	init_completion(&ls->ls_sync_wait);
 765
 766	set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 767
 768	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK);
 769	if (error) {
 770		fs_err(sdp, "control_mount control_lock NL error %d\n", error);
 771		return error;
 772	}
 773
 774	error = mounted_lock(sdp, DLM_LOCK_NL, 0);
 775	if (error) {
 776		fs_err(sdp, "control_mount mounted_lock NL error %d\n", error);
 777		control_unlock(sdp);
 778		return error;
 779	}
 780	mounted_mode = DLM_LOCK_NL;
 781
 782restart:
 783	if (retries++ && signal_pending(current)) {
 784		error = -EINTR;
 785		goto fail;
 786	}
 787
 788	/*
 789	 * We always start with both locks in NL. control_lock is
 790	 * demoted to NL below so we don't need to do it here.
 791	 */
 792
 793	if (mounted_mode != DLM_LOCK_NL) {
 794		error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 795		if (error)
 796			goto fail;
 797		mounted_mode = DLM_LOCK_NL;
 798	}
 799
 800	/*
 801	 * Other nodes need to do some work in dlm recovery and gfs2_control
 802	 * before the recover_done and control_lock will be ready for us below.
 803	 * A delay here is not required but often avoids having to retry.
 804	 */
 805
 806	msleep_interruptible(500);
 807
 808	/*
 809	 * Acquire control_lock in EX and mounted_lock in either EX or PR.
 810	 * control_lock lvb keeps track of any pending journal recoveries.
 811	 * mounted_lock indicates if any other nodes have the fs mounted.
 812	 */
 813
 814	error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK);
 815	if (error == -EAGAIN) {
 816		goto restart;
 817	} else if (error) {
 818		fs_err(sdp, "control_mount control_lock EX error %d\n", error);
 819		goto fail;
 820	}
 821
 822	/**
 823	 * If we're a spectator, we don't want to take the lock in EX because
 824	 * we cannot do the first-mount responsibility it implies: recovery.
 825	 */
 826	if (sdp->sd_args.ar_spectator)
 827		goto locks_done;
 828
 829	error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
 830	if (!error) {
 831		mounted_mode = DLM_LOCK_EX;
 832		goto locks_done;
 833	} else if (error != -EAGAIN) {
 834		fs_err(sdp, "control_mount mounted_lock EX error %d\n", error);
 835		goto fail;
 836	}
 837
 838	error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
 839	if (!error) {
 840		mounted_mode = DLM_LOCK_PR;
 841		goto locks_done;
 842	} else {
 843		/* not even -EAGAIN should happen here */
 844		fs_err(sdp, "control_mount mounted_lock PR error %d\n", error);
 845		goto fail;
 846	}
 847
 848locks_done:
 849	/*
 850	 * If we got both locks above in EX, then we're the first mounter.
 851	 * If not, then we need to wait for the control_lock lvb to be
 852	 * updated by other mounted nodes to reflect our mount generation.
 853	 *
 854	 * In simple first mounter cases, first mounter will see zero lvb_gen,
 855	 * but in cases where all existing nodes leave/fail before mounting
 856	 * nodes finish control_mount, then all nodes will be mounting and
 857	 * lvb_gen will be non-zero.
 858	 */
 859
 860	control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
 861
 862	if (lvb_gen == 0xFFFFFFFF) {
 863		/* special value to force mount attempts to fail */
 864		fs_err(sdp, "control_mount control_lock disabled\n");
 865		error = -EINVAL;
 866		goto fail;
 867	}
 868
 869	if (mounted_mode == DLM_LOCK_EX) {
 870		/* first mounter, keep both EX while doing first recovery */
 871		spin_lock(&ls->ls_recover_spin);
 872		clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 873		set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
 874		set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
 875		spin_unlock(&ls->ls_recover_spin);
 876		fs_info(sdp, "first mounter control generation %u\n", lvb_gen);
 877		return 0;
 878	}
 879
 880	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
 881	if (error)
 882		goto fail;
 883
 884	/*
 885	 * We are not first mounter, now we need to wait for the control_lock
 886	 * lvb generation to be >= the generation from our first recover_done
 887	 * and all lvb bits to be clear (no pending journal recoveries.)
 888	 */
 889
 890	if (!all_jid_bits_clear(ls->ls_lvb_bits)) {
 891		/* journals need recovery, wait until all are clear */
 892		fs_info(sdp, "control_mount wait for journal recovery\n");
 893		goto restart;
 894	}
 895
 896	spin_lock(&ls->ls_recover_spin);
 897	block_gen = ls->ls_recover_block;
 898	start_gen = ls->ls_recover_start;
 899	mount_gen = ls->ls_recover_mount;
 900
 901	if (lvb_gen < mount_gen) {
 902		/* wait for mounted nodes to update control_lock lvb to our
 903		   generation, which might include new recovery bits set */
 904		if (sdp->sd_args.ar_spectator) {
 905			fs_info(sdp, "Recovery is required. Waiting for a "
 906				"non-spectator to mount.\n");
 907			msleep_interruptible(1000);
 908		} else {
 909			fs_info(sdp, "control_mount wait1 block %u start %u "
 910				"mount %u lvb %u flags %lx\n", block_gen,
 911				start_gen, mount_gen, lvb_gen,
 912				ls->ls_recover_flags);
 913		}
 914		spin_unlock(&ls->ls_recover_spin);
 915		goto restart;
 916	}
 917
 918	if (lvb_gen != start_gen) {
 919		/* wait for mounted nodes to update control_lock lvb to the
 920		   latest recovery generation */
 921		fs_info(sdp, "control_mount wait2 block %u start %u mount %u "
 922			"lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 923			lvb_gen, ls->ls_recover_flags);
 924		spin_unlock(&ls->ls_recover_spin);
 925		goto restart;
 926	}
 927
 928	if (block_gen == start_gen) {
 929		/* dlm recovery in progress, wait for it to finish */
 930		fs_info(sdp, "control_mount wait3 block %u start %u mount %u "
 931			"lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
 932			lvb_gen, ls->ls_recover_flags);
 933		spin_unlock(&ls->ls_recover_spin);
 934		goto restart;
 935	}
 936
 937	clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
 938	set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
 939	memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
 940	memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
 941	spin_unlock(&ls->ls_recover_spin);
 942	return 0;
 943
 944fail:
 945	mounted_unlock(sdp);
 946	control_unlock(sdp);
 947	return error;
 948}
 949
 950static int control_first_done(struct gfs2_sbd *sdp)
 951{
 952	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
 953	uint32_t start_gen, block_gen;
 954	int error;
 955
 956restart:
 957	spin_lock(&ls->ls_recover_spin);
 958	start_gen = ls->ls_recover_start;
 959	block_gen = ls->ls_recover_block;
 960
 961	if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) ||
 962	    !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
 963	    !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
 964		/* sanity check, should not happen */
 965		fs_err(sdp, "control_first_done start %u block %u flags %lx\n",
 966		       start_gen, block_gen, ls->ls_recover_flags);
 967		spin_unlock(&ls->ls_recover_spin);
 968		control_unlock(sdp);
 969		return -1;
 970	}
 971
 972	if (start_gen == block_gen) {
 973		/*
 974		 * Wait for the end of a dlm recovery cycle to switch from
 975		 * first mounter recovery.  We can ignore any recover_slot
 976		 * callbacks between the recover_prep and next recover_done
 977		 * because we are still the first mounter and any failed nodes
 978		 * have not fully mounted, so they don't need recovery.
 979		 */
 980		spin_unlock(&ls->ls_recover_spin);
 981		fs_info(sdp, "control_first_done wait gen %u\n", start_gen);
 982
 983		wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY,
 984			    TASK_UNINTERRUPTIBLE);
 985		goto restart;
 986	}
 987
 988	clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
 989	set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags);
 990	memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
 991	memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
 992	spin_unlock(&ls->ls_recover_spin);
 993
 994	memset(ls->ls_lvb_bits, 0, GDLM_LVB_SIZE);
 995	control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
 996
 997	error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT);
 998	if (error)
 999		fs_err(sdp, "control_first_done mounted PR error %d\n", error);
1000
1001	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
1002	if (error)
1003		fs_err(sdp, "control_first_done control NL error %d\n", error);
1004
1005	return error;
1006}
1007
1008/*
1009 * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC)
1010 * to accomodate the largest slot number.  (NB dlm slot numbers start at 1,
1011 * gfs2 jids start at 0, so jid = slot - 1)
1012 */
1013
1014#define RECOVER_SIZE_INC 16
1015
1016static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots,
1017			    int num_slots)
1018{
1019	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1020	uint32_t *submit = NULL;
1021	uint32_t *result = NULL;
1022	uint32_t old_size, new_size;
1023	int i, max_jid;
1024
1025	if (!ls->ls_lvb_bits) {
1026		ls->ls_lvb_bits = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
1027		if (!ls->ls_lvb_bits)
1028			return -ENOMEM;
1029	}
1030
1031	max_jid = 0;
1032	for (i = 0; i < num_slots; i++) {
1033		if (max_jid < slots[i].slot - 1)
1034			max_jid = slots[i].slot - 1;
1035	}
1036
1037	old_size = ls->ls_recover_size;
1038	new_size = old_size;
1039	while (new_size < max_jid + 1)
1040		new_size += RECOVER_SIZE_INC;
1041	if (new_size == old_size)
1042		return 0;
1043
1044	submit = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1045	result = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1046	if (!submit || !result) {
1047		kfree(submit);
1048		kfree(result);
1049		return -ENOMEM;
1050	}
1051
1052	spin_lock(&ls->ls_recover_spin);
1053	memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t));
1054	memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t));
1055	kfree(ls->ls_recover_submit);
1056	kfree(ls->ls_recover_result);
1057	ls->ls_recover_submit = submit;
1058	ls->ls_recover_result = result;
1059	ls->ls_recover_size = new_size;
1060	spin_unlock(&ls->ls_recover_spin);
1061	return 0;
1062}
1063
1064static void free_recover_size(struct lm_lockstruct *ls)
1065{
1066	kfree(ls->ls_lvb_bits);
1067	kfree(ls->ls_recover_submit);
1068	kfree(ls->ls_recover_result);
1069	ls->ls_recover_submit = NULL;
1070	ls->ls_recover_result = NULL;
1071	ls->ls_recover_size = 0;
1072	ls->ls_lvb_bits = NULL;
1073}
1074
1075/* dlm calls before it does lock recovery */
1076
1077static void gdlm_recover_prep(void *arg)
1078{
1079	struct gfs2_sbd *sdp = arg;
1080	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1081
 
 
 
 
1082	spin_lock(&ls->ls_recover_spin);
1083	ls->ls_recover_block = ls->ls_recover_start;
1084	set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1085
1086	if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1087	     test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1088		spin_unlock(&ls->ls_recover_spin);
1089		return;
1090	}
1091	set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
1092	spin_unlock(&ls->ls_recover_spin);
1093}
1094
1095/* dlm calls after recover_prep has been completed on all lockspace members;
1096   identifies slot/jid of failed member */
1097
1098static void gdlm_recover_slot(void *arg, struct dlm_slot *slot)
1099{
1100	struct gfs2_sbd *sdp = arg;
1101	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1102	int jid = slot->slot - 1;
1103
 
 
 
 
 
1104	spin_lock(&ls->ls_recover_spin);
1105	if (ls->ls_recover_size < jid + 1) {
1106		fs_err(sdp, "recover_slot jid %d gen %u short size %d\n",
1107		       jid, ls->ls_recover_block, ls->ls_recover_size);
1108		spin_unlock(&ls->ls_recover_spin);
1109		return;
1110	}
1111
1112	if (ls->ls_recover_submit[jid]) {
1113		fs_info(sdp, "recover_slot jid %d gen %u prev %u\n",
1114			jid, ls->ls_recover_block, ls->ls_recover_submit[jid]);
1115	}
1116	ls->ls_recover_submit[jid] = ls->ls_recover_block;
1117	spin_unlock(&ls->ls_recover_spin);
1118}
1119
1120/* dlm calls after recover_slot and after it completes lock recovery */
1121
1122static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
1123			      int our_slot, uint32_t generation)
1124{
1125	struct gfs2_sbd *sdp = arg;
1126	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1127
 
 
 
 
1128	/* ensure the ls jid arrays are large enough */
1129	set_recover_size(sdp, slots, num_slots);
1130
1131	spin_lock(&ls->ls_recover_spin);
1132	ls->ls_recover_start = generation;
1133
1134	if (!ls->ls_recover_mount) {
1135		ls->ls_recover_mount = generation;
1136		ls->ls_jid = our_slot - 1;
1137	}
1138
1139	if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1140		queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
1141
1142	clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1143	smp_mb__after_atomic();
1144	wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY);
1145	spin_unlock(&ls->ls_recover_spin);
1146}
1147
1148/* gfs2_recover thread has a journal recovery result */
1149
1150static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid,
1151				 unsigned int result)
1152{
1153	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1154
 
 
 
 
 
1155	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1156		return;
1157
1158	/* don't care about the recovery of own journal during mount */
1159	if (jid == ls->ls_jid)
1160		return;
1161
1162	spin_lock(&ls->ls_recover_spin);
1163	if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1164		spin_unlock(&ls->ls_recover_spin);
1165		return;
1166	}
1167	if (ls->ls_recover_size < jid + 1) {
1168		fs_err(sdp, "recovery_result jid %d short size %d\n",
1169		       jid, ls->ls_recover_size);
1170		spin_unlock(&ls->ls_recover_spin);
1171		return;
1172	}
1173
1174	fs_info(sdp, "recover jid %d result %s\n", jid,
1175		result == LM_RD_GAVEUP ? "busy" : "success");
1176
1177	ls->ls_recover_result[jid] = result;
1178
1179	/* GAVEUP means another node is recovering the journal; delay our
1180	   next attempt to recover it, to give the other node a chance to
1181	   finish before trying again */
1182
1183	if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1184		queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work,
1185				   result == LM_RD_GAVEUP ? HZ : 0);
1186	spin_unlock(&ls->ls_recover_spin);
1187}
1188
1189static const struct dlm_lockspace_ops gdlm_lockspace_ops = {
1190	.recover_prep = gdlm_recover_prep,
1191	.recover_slot = gdlm_recover_slot,
1192	.recover_done = gdlm_recover_done,
1193};
1194
1195static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1196{
1197	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1198	char cluster[GFS2_LOCKNAME_LEN];
1199	const char *fsname;
1200	uint32_t flags;
1201	int error, ops_result;
1202
1203	/*
1204	 * initialize everything
1205	 */
1206
1207	INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func);
1208	spin_lock_init(&ls->ls_recover_spin);
1209	ls->ls_recover_flags = 0;
1210	ls->ls_recover_mount = 0;
1211	ls->ls_recover_start = 0;
1212	ls->ls_recover_block = 0;
1213	ls->ls_recover_size = 0;
1214	ls->ls_recover_submit = NULL;
1215	ls->ls_recover_result = NULL;
1216	ls->ls_lvb_bits = NULL;
1217
1218	error = set_recover_size(sdp, NULL, 0);
1219	if (error)
1220		goto fail;
1221
1222	/*
1223	 * prepare dlm_new_lockspace args
1224	 */
1225
1226	fsname = strchr(table, ':');
1227	if (!fsname) {
1228		fs_info(sdp, "no fsname found\n");
1229		error = -EINVAL;
1230		goto fail_free;
1231	}
1232	memset(cluster, 0, sizeof(cluster));
1233	memcpy(cluster, table, strlen(table) - strlen(fsname));
1234	fsname++;
1235
1236	flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
1237
1238	/*
1239	 * create/join lockspace
1240	 */
1241
1242	error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE,
1243				  &gdlm_lockspace_ops, sdp, &ops_result,
1244				  &ls->ls_dlm);
1245	if (error) {
1246		fs_err(sdp, "dlm_new_lockspace error %d\n", error);
1247		goto fail_free;
1248	}
1249
1250	if (ops_result < 0) {
1251		/*
1252		 * dlm does not support ops callbacks,
1253		 * old dlm_controld/gfs_controld are used, try without ops.
1254		 */
1255		fs_info(sdp, "dlm lockspace ops not used\n");
1256		free_recover_size(ls);
1257		set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags);
1258		return 0;
1259	}
1260
1261	if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) {
1262		fs_err(sdp, "dlm lockspace ops disallow jid preset\n");
1263		error = -EINVAL;
1264		goto fail_release;
1265	}
1266
1267	/*
1268	 * control_mount() uses control_lock to determine first mounter,
1269	 * and for later mounts, waits for any recoveries to be cleared.
1270	 */
1271
1272	error = control_mount(sdp);
1273	if (error) {
1274		fs_err(sdp, "mount control error %d\n", error);
1275		goto fail_release;
1276	}
1277
1278	ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1279	clear_bit(SDF_NOJOURNALID, &sdp->sd_flags);
1280	smp_mb__after_atomic();
1281	wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID);
1282	return 0;
1283
1284fail_release:
1285	dlm_release_lockspace(ls->ls_dlm, 2);
1286fail_free:
1287	free_recover_size(ls);
1288fail:
1289	return error;
1290}
1291
1292static void gdlm_first_done(struct gfs2_sbd *sdp)
1293{
1294	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1295	int error;
1296
1297	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1298		return;
1299
1300	error = control_first_done(sdp);
1301	if (error)
1302		fs_err(sdp, "mount first_done error %d\n", error);
1303}
1304
1305static void gdlm_unmount(struct gfs2_sbd *sdp)
1306{
1307	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1308
1309	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1310		goto release;
1311
1312	/* wait for gfs2_control_wq to be done with this mount */
1313
1314	spin_lock(&ls->ls_recover_spin);
1315	set_bit(DFL_UNMOUNT, &ls->ls_recover_flags);
1316	spin_unlock(&ls->ls_recover_spin);
1317	flush_delayed_work(&sdp->sd_control_work);
1318
1319	/* mounted_lock and control_lock will be purged in dlm recovery */
1320release:
1321	if (ls->ls_dlm) {
1322		dlm_release_lockspace(ls->ls_dlm, 2);
1323		ls->ls_dlm = NULL;
1324	}
1325
1326	free_recover_size(ls);
1327}
1328
1329static const match_table_t dlm_tokens = {
1330	{ Opt_jid, "jid=%d"},
1331	{ Opt_id, "id=%d"},
1332	{ Opt_first, "first=%d"},
1333	{ Opt_nodir, "nodir=%d"},
1334	{ Opt_err, NULL },
1335};
1336
1337const struct lm_lockops gfs2_dlm_ops = {
1338	.lm_proto_name = "lock_dlm",
1339	.lm_mount = gdlm_mount,
1340	.lm_first_done = gdlm_first_done,
1341	.lm_recovery_result = gdlm_recovery_result,
1342	.lm_unmount = gdlm_unmount,
1343	.lm_put_lock = gdlm_put_lock,
1344	.lm_lock = gdlm_lock,
1345	.lm_cancel = gdlm_cancel,
1346	.lm_tokens = &dlm_tokens,
1347};
1348