Linux Audio

Check our new training course

Linux debugging, profiling, tracing and performance analysis training

Apr 14-17, 2025
Register
Loading...
v6.2
  1// SPDX-License-Identifier: GPL-2.0-or-later
  2/*
 
 
  3 * dlmlock.c
  4 *
  5 * underlying calls for lock creation
  6 *
  7 * Copyright (C) 2004 Oracle.  All rights reserved.
  8 */
  9
 10
 11#include <linux/module.h>
 12#include <linux/fs.h>
 13#include <linux/types.h>
 14#include <linux/slab.h>
 15#include <linux/highmem.h>
 16#include <linux/init.h>
 17#include <linux/sysctl.h>
 18#include <linux/random.h>
 19#include <linux/blkdev.h>
 20#include <linux/socket.h>
 21#include <linux/inet.h>
 22#include <linux/spinlock.h>
 23#include <linux/delay.h>
 24
 25
 26#include "../cluster/heartbeat.h"
 27#include "../cluster/nodemanager.h"
 28#include "../cluster/tcp.h"
 29
 30#include "dlmapi.h"
 31#include "dlmcommon.h"
 32
 33#include "dlmconvert.h"
 34
 35#define MLOG_MASK_PREFIX ML_DLM
 36#include "../cluster/masklog.h"
 37
 38static struct kmem_cache *dlm_lock_cache;
 39
 40static DEFINE_SPINLOCK(dlm_cookie_lock);
 41static u64 dlm_next_cookie = 1;
 42
 43static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
 44					       struct dlm_lock_resource *res,
 45					       struct dlm_lock *lock, int flags);
 46static void dlm_init_lock(struct dlm_lock *newlock, int type,
 47			  u8 node, u64 cookie);
 48static void dlm_lock_release(struct kref *kref);
 49static void dlm_lock_detach_lockres(struct dlm_lock *lock);
 50
 51int dlm_init_lock_cache(void)
 52{
 53	dlm_lock_cache = kmem_cache_create("o2dlm_lock",
 54					   sizeof(struct dlm_lock),
 55					   0, SLAB_HWCACHE_ALIGN, NULL);
 56	if (dlm_lock_cache == NULL)
 57		return -ENOMEM;
 58	return 0;
 59}
 60
 61void dlm_destroy_lock_cache(void)
 62{
 63	kmem_cache_destroy(dlm_lock_cache);
 64}
 65
 66/* Tell us whether we can grant a new lock request.
 67 * locking:
 68 *   caller needs:  res->spinlock
 69 *   taken:         none
 70 *   held on exit:  none
 71 * returns: 1 if the lock can be granted, 0 otherwise.
 72 */
 73static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
 74				  struct dlm_lock *lock)
 75{
 76	struct dlm_lock *tmplock;
 77
 78	list_for_each_entry(tmplock, &res->granted, list) {
 79		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
 80			return 0;
 81	}
 82
 83	list_for_each_entry(tmplock, &res->converting, list) {
 84		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
 85			return 0;
 86		if (!dlm_lock_compatible(tmplock->ml.convert_type,
 87					 lock->ml.type))
 88			return 0;
 89	}
 90
 91	return 1;
 92}
 93
 94/* performs lock creation at the lockres master site
 95 * locking:
 96 *   caller needs:  none
 97 *   taken:         takes and drops res->spinlock
 98 *   held on exit:  none
 99 * returns: DLM_NORMAL, DLM_NOTQUEUED
100 */
101static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
102				      struct dlm_lock_resource *res,
103				      struct dlm_lock *lock, int flags)
104{
105	int call_ast = 0, kick_thread = 0;
106	enum dlm_status status = DLM_NORMAL;
107
108	mlog(0, "type=%d\n", lock->ml.type);
109
110	spin_lock(&res->spinlock);
111	/* if called from dlm_create_lock_handler, need to
112	 * ensure it will not sleep in dlm_wait_on_lockres */
113	status = __dlm_lockres_state_to_status(res);
114	if (status != DLM_NORMAL &&
115	    lock->ml.node != dlm->node_num) {
116		/* erf.  state changed after lock was dropped. */
117		spin_unlock(&res->spinlock);
118		dlm_error(status);
119		return status;
120	}
121	__dlm_wait_on_lockres(res);
122	__dlm_lockres_reserve_ast(res);
123
124	if (dlm_can_grant_new_lock(res, lock)) {
125		mlog(0, "I can grant this lock right away\n");
126		/* got it right away */
127		lock->lksb->status = DLM_NORMAL;
128		status = DLM_NORMAL;
129		dlm_lock_get(lock);
130		list_add_tail(&lock->list, &res->granted);
131
132		/* for the recovery lock, we can't allow the ast
133		 * to be queued since the dlmthread is already
134		 * frozen.  but the recovery lock is always locked
135		 * with LKM_NOQUEUE so we do not need the ast in
136		 * this special case */
137		if (!dlm_is_recovery_lock(res->lockname.name,
138					  res->lockname.len)) {
139			kick_thread = 1;
140			call_ast = 1;
141		} else {
142			mlog(0, "%s: returning DLM_NORMAL to "
143			     "node %u for reco lock\n", dlm->name,
144			     lock->ml.node);
145		}
146	} else {
147		/* for NOQUEUE request, unless we get the
148		 * lock right away, return DLM_NOTQUEUED */
149		if (flags & LKM_NOQUEUE) {
150			status = DLM_NOTQUEUED;
151			if (dlm_is_recovery_lock(res->lockname.name,
152						 res->lockname.len)) {
153				mlog(0, "%s: returning NOTQUEUED to "
154				     "node %u for reco lock\n", dlm->name,
155				     lock->ml.node);
156			}
157		} else {
158			status = DLM_NORMAL;
159			dlm_lock_get(lock);
160			list_add_tail(&lock->list, &res->blocked);
161			kick_thread = 1;
162		}
163	}
164
165	spin_unlock(&res->spinlock);
166	wake_up(&res->wq);
167
168	/* either queue the ast or release it */
169	if (call_ast)
170		dlm_queue_ast(dlm, lock);
171	else
172		dlm_lockres_release_ast(dlm, res);
173
174	dlm_lockres_calc_usage(dlm, res);
175	if (kick_thread)
176		dlm_kick_thread(dlm, res);
177
178	return status;
179}
180
181void dlm_revert_pending_lock(struct dlm_lock_resource *res,
182			     struct dlm_lock *lock)
183{
184	/* remove from local queue if it failed */
185	list_del_init(&lock->list);
186	lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
187}
188
189
190/*
191 * locking:
192 *   caller needs:  none
193 *   taken:         takes and drops res->spinlock
194 *   held on exit:  none
195 * returns: DLM_DENIED, DLM_RECOVERING, or net status
196 */
197static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
198				      struct dlm_lock_resource *res,
199				      struct dlm_lock *lock, int flags)
200{
201	enum dlm_status status = DLM_DENIED;
202	int lockres_changed = 1;
203
204	mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n",
205	     lock->ml.type, res->lockname.len,
206	     res->lockname.name, flags);
207
208	/*
209	 * Wait if resource is getting recovered, remastered, etc.
210	 * If the resource was remastered and new owner is self, then exit.
211	 */
212	spin_lock(&res->spinlock);
213	__dlm_wait_on_lockres(res);
214	if (res->owner == dlm->node_num) {
215		spin_unlock(&res->spinlock);
216		return DLM_RECOVERING;
217	}
218	res->state |= DLM_LOCK_RES_IN_PROGRESS;
219
220	/* add lock to local (secondary) queue */
221	dlm_lock_get(lock);
222	list_add_tail(&lock->list, &res->blocked);
223	lock->lock_pending = 1;
224	spin_unlock(&res->spinlock);
225
226	/* spec seems to say that you will get DLM_NORMAL when the lock
227	 * has been queued, meaning we need to wait for a reply here. */
228	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
229
230	spin_lock(&res->spinlock);
231	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
232	lock->lock_pending = 0;
233	if (status != DLM_NORMAL) {
234		if (status == DLM_RECOVERING &&
235		    dlm_is_recovery_lock(res->lockname.name,
236					 res->lockname.len)) {
237			/* recovery lock was mastered by dead node.
238			 * we need to have calc_usage shoot down this
239			 * lockres and completely remaster it. */
240			mlog(0, "%s: recovery lock was owned by "
241			     "dead node %u, remaster it now.\n",
242			     dlm->name, res->owner);
243		} else if (status != DLM_NOTQUEUED) {
244			/*
245			 * DO NOT call calc_usage, as this would unhash
246			 * the remote lockres before we ever get to use
247			 * it.  treat as if we never made any change to
248			 * the lockres.
249			 */
250			lockres_changed = 0;
251			dlm_error(status);
252		}
253		dlm_revert_pending_lock(res, lock);
254		dlm_lock_put(lock);
255	} else if (dlm_is_recovery_lock(res->lockname.name,
256					res->lockname.len)) {
257		/* special case for the $RECOVERY lock.
258		 * there will never be an AST delivered to put
259		 * this lock on the proper secondary queue
260		 * (granted), so do it manually. */
261		mlog(0, "%s: $RECOVERY lock for this node (%u) is "
262		     "mastered by %u; got lock, manually granting (no ast)\n",
263		     dlm->name, dlm->node_num, res->owner);
264		list_move_tail(&lock->list, &res->granted);
265	}
266	spin_unlock(&res->spinlock);
267
268	if (lockres_changed)
269		dlm_lockres_calc_usage(dlm, res);
270
271	wake_up(&res->wq);
272	return status;
273}
274
275
276/* for remote lock creation.
277 * locking:
278 *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
279 *   taken:         none
280 *   held on exit:  none
281 * returns: DLM_NOLOCKMGR, or net status
282 */
283static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
284					       struct dlm_lock_resource *res,
285					       struct dlm_lock *lock, int flags)
286{
287	struct dlm_create_lock create;
288	int tmpret, status = 0;
289	enum dlm_status ret;
290
291	memset(&create, 0, sizeof(create));
292	create.node_idx = dlm->node_num;
293	create.requested_type = lock->ml.type;
294	create.cookie = lock->ml.cookie;
295	create.namelen = res->lockname.len;
296	create.flags = cpu_to_be32(flags);
297	memcpy(create.name, res->lockname.name, create.namelen);
298
299	tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
300				    sizeof(create), res->owner, &status);
301	if (tmpret >= 0) {
302		ret = status;
303		if (ret == DLM_REJECTED) {
304			mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
305			     "owned by node %u. That node is coming back up "
306			     "currently.\n", dlm->name, create.namelen,
307			     create.name, res->owner);
308			dlm_print_one_lock_resource(res);
309			BUG();
310		}
311	} else {
312		mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
313		     "node %u\n", dlm->name, create.namelen, create.name,
314		     tmpret, res->owner);
315		if (dlm_is_host_down(tmpret))
316			ret = DLM_RECOVERING;
317		else
318			ret = dlm_err_to_dlm_status(tmpret);
319	}
320
321	return ret;
322}
323
324void dlm_lock_get(struct dlm_lock *lock)
325{
326	kref_get(&lock->lock_refs);
327}
328
329void dlm_lock_put(struct dlm_lock *lock)
330{
331	kref_put(&lock->lock_refs, dlm_lock_release);
332}
333
334static void dlm_lock_release(struct kref *kref)
335{
336	struct dlm_lock *lock;
337
338	lock = container_of(kref, struct dlm_lock, lock_refs);
339
340	BUG_ON(!list_empty(&lock->list));
341	BUG_ON(!list_empty(&lock->ast_list));
342	BUG_ON(!list_empty(&lock->bast_list));
343	BUG_ON(lock->ast_pending);
344	BUG_ON(lock->bast_pending);
345
346	dlm_lock_detach_lockres(lock);
347
348	if (lock->lksb_kernel_allocated) {
349		mlog(0, "freeing kernel-allocated lksb\n");
350		kfree(lock->lksb);
351	}
352	kmem_cache_free(dlm_lock_cache, lock);
353}
354
355/* associate a lock with it's lockres, getting a ref on the lockres */
356void dlm_lock_attach_lockres(struct dlm_lock *lock,
357			     struct dlm_lock_resource *res)
358{
359	dlm_lockres_get(res);
360	lock->lockres = res;
361}
362
363/* drop ref on lockres, if there is still one associated with lock */
364static void dlm_lock_detach_lockres(struct dlm_lock *lock)
365{
366	struct dlm_lock_resource *res;
367
368	res = lock->lockres;
369	if (res) {
370		lock->lockres = NULL;
371		mlog(0, "removing lock's lockres reference\n");
372		dlm_lockres_put(res);
373	}
374}
375
376static void dlm_init_lock(struct dlm_lock *newlock, int type,
377			  u8 node, u64 cookie)
378{
379	INIT_LIST_HEAD(&newlock->list);
380	INIT_LIST_HEAD(&newlock->ast_list);
381	INIT_LIST_HEAD(&newlock->bast_list);
382	spin_lock_init(&newlock->spinlock);
383	newlock->ml.type = type;
384	newlock->ml.convert_type = LKM_IVMODE;
385	newlock->ml.highest_blocked = LKM_IVMODE;
386	newlock->ml.node = node;
387	newlock->ml.pad1 = 0;
388	newlock->ml.list = 0;
389	newlock->ml.flags = 0;
390	newlock->ast = NULL;
391	newlock->bast = NULL;
392	newlock->astdata = NULL;
393	newlock->ml.cookie = cpu_to_be64(cookie);
394	newlock->ast_pending = 0;
395	newlock->bast_pending = 0;
396	newlock->convert_pending = 0;
397	newlock->lock_pending = 0;
398	newlock->unlock_pending = 0;
399	newlock->cancel_pending = 0;
400	newlock->lksb_kernel_allocated = 0;
401
402	kref_init(&newlock->lock_refs);
403}
404
405struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
406			       struct dlm_lockstatus *lksb)
407{
408	struct dlm_lock *lock;
409	int kernel_allocated = 0;
410
411	lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
412	if (!lock)
413		return NULL;
414
415	if (!lksb) {
416		/* zero memory only if kernel-allocated */
417		lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
418		if (!lksb) {
419			kmem_cache_free(dlm_lock_cache, lock);
420			return NULL;
421		}
422		kernel_allocated = 1;
423	}
424
425	dlm_init_lock(lock, type, node, cookie);
426	if (kernel_allocated)
427		lock->lksb_kernel_allocated = 1;
428	lock->lksb = lksb;
429	lksb->lockid = lock;
430	return lock;
431}
432
433/* handler for lock creation net message
434 * locking:
435 *   caller needs:  none
436 *   taken:         takes and drops res->spinlock
437 *   held on exit:  none
438 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
439 */
440int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
441			    void **ret_data)
442{
443	struct dlm_ctxt *dlm = data;
444	struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
445	struct dlm_lock_resource *res = NULL;
446	struct dlm_lock *newlock = NULL;
447	struct dlm_lockstatus *lksb = NULL;
448	enum dlm_status status = DLM_NORMAL;
449	char *name;
450	unsigned int namelen;
451
452	BUG_ON(!dlm);
453
454	if (!dlm_grab(dlm))
455		return DLM_REJECTED;
456
457	name = create->name;
458	namelen = create->namelen;
459	status = DLM_REJECTED;
460	if (!dlm_domain_fully_joined(dlm)) {
461		mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
462		     "sending a create_lock message for lock %.*s!\n",
463		     dlm->name, create->node_idx, namelen, name);
464		dlm_error(status);
465		goto leave;
466	}
467
468	status = DLM_IVBUFLEN;
469	if (namelen > DLM_LOCKID_NAME_MAX) {
470		dlm_error(status);
471		goto leave;
472	}
473
474	status = DLM_SYSERR;
475	newlock = dlm_new_lock(create->requested_type,
476			       create->node_idx,
477			       be64_to_cpu(create->cookie), NULL);
478	if (!newlock) {
479		dlm_error(status);
480		goto leave;
481	}
482
483	lksb = newlock->lksb;
484
485	if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
486		lksb->flags |= DLM_LKSB_GET_LVB;
487		mlog(0, "set DLM_LKSB_GET_LVB flag\n");
488	}
489
490	status = DLM_IVLOCKID;
491	res = dlm_lookup_lockres(dlm, name, namelen);
492	if (!res) {
493		dlm_error(status);
494		goto leave;
495	}
496
497	spin_lock(&res->spinlock);
498	status = __dlm_lockres_state_to_status(res);
499	spin_unlock(&res->spinlock);
500
501	if (status != DLM_NORMAL) {
502		mlog(0, "lockres recovering/migrating/in-progress\n");
503		goto leave;
504	}
505
506	dlm_lock_attach_lockres(newlock, res);
507
508	status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
509leave:
510	if (status != DLM_NORMAL)
511		if (newlock)
512			dlm_lock_put(newlock);
513
514	if (res)
515		dlm_lockres_put(res);
516
517	dlm_put(dlm);
518
519	return status;
520}
521
522
523/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
524static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
525{
526	u64 tmpnode = node_num;
527
528	/* shift single byte of node num into top 8 bits */
529	tmpnode <<= 56;
530
531	spin_lock(&dlm_cookie_lock);
532	*cookie = (dlm_next_cookie | tmpnode);
533	if (++dlm_next_cookie & 0xff00000000000000ull) {
534		mlog(0, "This node's cookie will now wrap!\n");
535		dlm_next_cookie = 1;
536	}
537	spin_unlock(&dlm_cookie_lock);
538}
539
540enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
541			struct dlm_lockstatus *lksb, int flags,
542			const char *name, int namelen, dlm_astlockfunc_t *ast,
543			void *data, dlm_bastlockfunc_t *bast)
544{
545	enum dlm_status status;
546	struct dlm_lock_resource *res = NULL;
547	struct dlm_lock *lock = NULL;
548	int convert = 0, recovery = 0;
549
550	/* yes this function is a mess.
551	 * TODO: clean this up.  lots of common code in the
552	 *       lock and convert paths, especially in the retry blocks */
553	if (!lksb) {
554		dlm_error(DLM_BADARGS);
555		return DLM_BADARGS;
556	}
557
558	status = DLM_BADPARAM;
559	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
560		dlm_error(status);
561		goto error;
562	}
563
564	if (flags & ~LKM_VALID_FLAGS) {
565		dlm_error(status);
566		goto error;
567	}
568
569	convert = (flags & LKM_CONVERT);
570	recovery = (flags & LKM_RECOVERY);
571
572	if (recovery &&
573	    (!dlm_is_recovery_lock(name, namelen) || convert) ) {
574		dlm_error(status);
575		goto error;
576	}
577	if (convert && (flags & LKM_LOCAL)) {
578		mlog(ML_ERROR, "strange LOCAL convert request!\n");
579		goto error;
580	}
581
582	if (convert) {
583		/* CONVERT request */
584
585		/* if converting, must pass in a valid dlm_lock */
586		lock = lksb->lockid;
587		if (!lock) {
588			mlog(ML_ERROR, "NULL lock pointer in convert "
589			     "request\n");
590			goto error;
591		}
592
593		res = lock->lockres;
594		if (!res) {
595			mlog(ML_ERROR, "NULL lockres pointer in convert "
596			     "request\n");
597			goto error;
598		}
599		dlm_lockres_get(res);
600
601		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
602	 	 * static after the original lock call.  convert requests will
603		 * ensure that everything is the same, or return DLM_BADARGS.
604	 	 * this means that DLM_DENIED_NOASTS will never be returned.
605	 	 */
606		if (lock->lksb != lksb || lock->ast != ast ||
607		    lock->bast != bast || lock->astdata != data) {
608			status = DLM_BADARGS;
609			mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
610			     "astdata=%p\n", lksb, ast, bast, data);
611			mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
612			     "astdata=%p\n", lock->lksb, lock->ast,
613			     lock->bast, lock->astdata);
614			goto error;
615		}
616retry_convert:
617		dlm_wait_for_recovery(dlm);
618
619		if (res->owner == dlm->node_num)
620			status = dlmconvert_master(dlm, res, lock, flags, mode);
621		else
622			status = dlmconvert_remote(dlm, res, lock, flags, mode);
623		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
624		    status == DLM_FORWARD) {
625			/* for now, see how this works without sleeping
626			 * and just retry right away.  I suspect the reco
627			 * or migration will complete fast enough that
628			 * no waiting will be necessary */
629			mlog(0, "retrying convert with migration/recovery/"
630			     "in-progress\n");
631			msleep(100);
632			goto retry_convert;
633		}
634	} else {
635		u64 tmpcookie;
636
637		/* LOCK request */
638		status = DLM_BADARGS;
639		if (!name) {
640			dlm_error(status);
641			goto error;
642		}
643
644		status = DLM_IVBUFLEN;
645		if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
646			dlm_error(status);
647			goto error;
648		}
649
650		dlm_get_next_cookie(dlm->node_num, &tmpcookie);
651		lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
652		if (!lock) {
653			dlm_error(status);
654			goto error;
655		}
656
657		if (!recovery)
658			dlm_wait_for_recovery(dlm);
659
660		/* find or create the lock resource */
661		res = dlm_get_lock_resource(dlm, name, namelen, flags);
662		if (!res) {
663			status = DLM_IVLOCKID;
664			dlm_error(status);
665			goto error;
666		}
667
668		mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
669		mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
670
671		dlm_lock_attach_lockres(lock, res);
672		lock->ast = ast;
673		lock->bast = bast;
674		lock->astdata = data;
675
676retry_lock:
677		if (flags & LKM_VALBLK) {
678			mlog(0, "LKM_VALBLK passed by caller\n");
679
680			/* LVB requests for non PR, PW or EX locks are
681			 * ignored. */
682			if (mode < LKM_PRMODE)
683				flags &= ~LKM_VALBLK;
684			else {
685				flags |= LKM_GET_LVB;
686				lock->lksb->flags |= DLM_LKSB_GET_LVB;
687			}
688		}
689
690		if (res->owner == dlm->node_num)
691			status = dlmlock_master(dlm, res, lock, flags);
692		else
693			status = dlmlock_remote(dlm, res, lock, flags);
694
695		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
696		    status == DLM_FORWARD) {
697			msleep(100);
698			if (recovery) {
699				if (status != DLM_RECOVERING)
700					goto retry_lock;
701				/* wait to see the node go down, then
702				 * drop down and allow the lockres to
703				 * get cleaned up.  need to remaster. */
704				dlm_wait_for_node_death(dlm, res->owner,
705						DLM_NODE_DEATH_WAIT_MAX);
706			} else {
707				dlm_wait_for_recovery(dlm);
708				goto retry_lock;
709			}
710		}
711
712		/* Inflight taken in dlm_get_lock_resource() is dropped here */
713		spin_lock(&res->spinlock);
714		dlm_lockres_drop_inflight_ref(dlm, res);
715		spin_unlock(&res->spinlock);
716
717		dlm_lockres_calc_usage(dlm, res);
718		dlm_kick_thread(dlm, res);
719
720		if (status != DLM_NORMAL) {
721			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
722			if (status != DLM_NOTQUEUED)
723				dlm_error(status);
724			goto error;
725		}
726	}
727
728error:
729	if (status != DLM_NORMAL) {
730		if (lock && !convert)
731			dlm_lock_put(lock);
732		// this is kind of unnecessary
733		lksb->status = status;
734	}
735
736	/* put lockres ref from the convert path
737	 * or from dlm_get_lock_resource */
738	if (res)
739		dlm_lockres_put(res);
740
741	return status;
742}
743EXPORT_SYMBOL_GPL(dlmlock);
v5.4
  1// SPDX-License-Identifier: GPL-2.0-or-later
  2/* -*- mode: c; c-basic-offset: 8; -*-
  3 * vim: noexpandtab sw=8 ts=8 sts=0:
  4 *
  5 * dlmlock.c
  6 *
  7 * underlying calls for lock creation
  8 *
  9 * Copyright (C) 2004 Oracle.  All rights reserved.
 10 */
 11
 12
 13#include <linux/module.h>
 14#include <linux/fs.h>
 15#include <linux/types.h>
 16#include <linux/slab.h>
 17#include <linux/highmem.h>
 18#include <linux/init.h>
 19#include <linux/sysctl.h>
 20#include <linux/random.h>
 21#include <linux/blkdev.h>
 22#include <linux/socket.h>
 23#include <linux/inet.h>
 24#include <linux/spinlock.h>
 25#include <linux/delay.h>
 26
 27
 28#include "cluster/heartbeat.h"
 29#include "cluster/nodemanager.h"
 30#include "cluster/tcp.h"
 31
 32#include "dlmapi.h"
 33#include "dlmcommon.h"
 34
 35#include "dlmconvert.h"
 36
 37#define MLOG_MASK_PREFIX ML_DLM
 38#include "cluster/masklog.h"
 39
 40static struct kmem_cache *dlm_lock_cache;
 41
 42static DEFINE_SPINLOCK(dlm_cookie_lock);
 43static u64 dlm_next_cookie = 1;
 44
 45static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
 46					       struct dlm_lock_resource *res,
 47					       struct dlm_lock *lock, int flags);
 48static void dlm_init_lock(struct dlm_lock *newlock, int type,
 49			  u8 node, u64 cookie);
 50static void dlm_lock_release(struct kref *kref);
 51static void dlm_lock_detach_lockres(struct dlm_lock *lock);
 52
 53int dlm_init_lock_cache(void)
 54{
 55	dlm_lock_cache = kmem_cache_create("o2dlm_lock",
 56					   sizeof(struct dlm_lock),
 57					   0, SLAB_HWCACHE_ALIGN, NULL);
 58	if (dlm_lock_cache == NULL)
 59		return -ENOMEM;
 60	return 0;
 61}
 62
 63void dlm_destroy_lock_cache(void)
 64{
 65	kmem_cache_destroy(dlm_lock_cache);
 66}
 67
 68/* Tell us whether we can grant a new lock request.
 69 * locking:
 70 *   caller needs:  res->spinlock
 71 *   taken:         none
 72 *   held on exit:  none
 73 * returns: 1 if the lock can be granted, 0 otherwise.
 74 */
 75static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
 76				  struct dlm_lock *lock)
 77{
 78	struct dlm_lock *tmplock;
 79
 80	list_for_each_entry(tmplock, &res->granted, list) {
 81		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
 82			return 0;
 83	}
 84
 85	list_for_each_entry(tmplock, &res->converting, list) {
 86		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
 87			return 0;
 88		if (!dlm_lock_compatible(tmplock->ml.convert_type,
 89					 lock->ml.type))
 90			return 0;
 91	}
 92
 93	return 1;
 94}
 95
 96/* performs lock creation at the lockres master site
 97 * locking:
 98 *   caller needs:  none
 99 *   taken:         takes and drops res->spinlock
100 *   held on exit:  none
101 * returns: DLM_NORMAL, DLM_NOTQUEUED
102 */
103static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
104				      struct dlm_lock_resource *res,
105				      struct dlm_lock *lock, int flags)
106{
107	int call_ast = 0, kick_thread = 0;
108	enum dlm_status status = DLM_NORMAL;
109
110	mlog(0, "type=%d\n", lock->ml.type);
111
112	spin_lock(&res->spinlock);
113	/* if called from dlm_create_lock_handler, need to
114	 * ensure it will not sleep in dlm_wait_on_lockres */
115	status = __dlm_lockres_state_to_status(res);
116	if (status != DLM_NORMAL &&
117	    lock->ml.node != dlm->node_num) {
118		/* erf.  state changed after lock was dropped. */
119		spin_unlock(&res->spinlock);
120		dlm_error(status);
121		return status;
122	}
123	__dlm_wait_on_lockres(res);
124	__dlm_lockres_reserve_ast(res);
125
126	if (dlm_can_grant_new_lock(res, lock)) {
127		mlog(0, "I can grant this lock right away\n");
128		/* got it right away */
129		lock->lksb->status = DLM_NORMAL;
130		status = DLM_NORMAL;
131		dlm_lock_get(lock);
132		list_add_tail(&lock->list, &res->granted);
133
134		/* for the recovery lock, we can't allow the ast
135		 * to be queued since the dlmthread is already
136		 * frozen.  but the recovery lock is always locked
137		 * with LKM_NOQUEUE so we do not need the ast in
138		 * this special case */
139		if (!dlm_is_recovery_lock(res->lockname.name,
140					  res->lockname.len)) {
141			kick_thread = 1;
142			call_ast = 1;
143		} else {
144			mlog(0, "%s: returning DLM_NORMAL to "
145			     "node %u for reco lock\n", dlm->name,
146			     lock->ml.node);
147		}
148	} else {
149		/* for NOQUEUE request, unless we get the
150		 * lock right away, return DLM_NOTQUEUED */
151		if (flags & LKM_NOQUEUE) {
152			status = DLM_NOTQUEUED;
153			if (dlm_is_recovery_lock(res->lockname.name,
154						 res->lockname.len)) {
155				mlog(0, "%s: returning NOTQUEUED to "
156				     "node %u for reco lock\n", dlm->name,
157				     lock->ml.node);
158			}
159		} else {
160			status = DLM_NORMAL;
161			dlm_lock_get(lock);
162			list_add_tail(&lock->list, &res->blocked);
163			kick_thread = 1;
164		}
165	}
166
167	spin_unlock(&res->spinlock);
168	wake_up(&res->wq);
169
170	/* either queue the ast or release it */
171	if (call_ast)
172		dlm_queue_ast(dlm, lock);
173	else
174		dlm_lockres_release_ast(dlm, res);
175
176	dlm_lockres_calc_usage(dlm, res);
177	if (kick_thread)
178		dlm_kick_thread(dlm, res);
179
180	return status;
181}
182
183void dlm_revert_pending_lock(struct dlm_lock_resource *res,
184			     struct dlm_lock *lock)
185{
186	/* remove from local queue if it failed */
187	list_del_init(&lock->list);
188	lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
189}
190
191
192/*
193 * locking:
194 *   caller needs:  none
195 *   taken:         takes and drops res->spinlock
196 *   held on exit:  none
197 * returns: DLM_DENIED, DLM_RECOVERING, or net status
198 */
199static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
200				      struct dlm_lock_resource *res,
201				      struct dlm_lock *lock, int flags)
202{
203	enum dlm_status status = DLM_DENIED;
204	int lockres_changed = 1;
205
206	mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n",
207	     lock->ml.type, res->lockname.len,
208	     res->lockname.name, flags);
209
210	/*
211	 * Wait if resource is getting recovered, remastered, etc.
212	 * If the resource was remastered and new owner is self, then exit.
213	 */
214	spin_lock(&res->spinlock);
215	__dlm_wait_on_lockres(res);
216	if (res->owner == dlm->node_num) {
217		spin_unlock(&res->spinlock);
218		return DLM_RECOVERING;
219	}
220	res->state |= DLM_LOCK_RES_IN_PROGRESS;
221
222	/* add lock to local (secondary) queue */
223	dlm_lock_get(lock);
224	list_add_tail(&lock->list, &res->blocked);
225	lock->lock_pending = 1;
226	spin_unlock(&res->spinlock);
227
228	/* spec seems to say that you will get DLM_NORMAL when the lock
229	 * has been queued, meaning we need to wait for a reply here. */
230	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
231
232	spin_lock(&res->spinlock);
233	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
234	lock->lock_pending = 0;
235	if (status != DLM_NORMAL) {
236		if (status == DLM_RECOVERING &&
237		    dlm_is_recovery_lock(res->lockname.name,
238					 res->lockname.len)) {
239			/* recovery lock was mastered by dead node.
240			 * we need to have calc_usage shoot down this
241			 * lockres and completely remaster it. */
242			mlog(0, "%s: recovery lock was owned by "
243			     "dead node %u, remaster it now.\n",
244			     dlm->name, res->owner);
245		} else if (status != DLM_NOTQUEUED) {
246			/*
247			 * DO NOT call calc_usage, as this would unhash
248			 * the remote lockres before we ever get to use
249			 * it.  treat as if we never made any change to
250			 * the lockres.
251			 */
252			lockres_changed = 0;
253			dlm_error(status);
254		}
255		dlm_revert_pending_lock(res, lock);
256		dlm_lock_put(lock);
257	} else if (dlm_is_recovery_lock(res->lockname.name,
258					res->lockname.len)) {
259		/* special case for the $RECOVERY lock.
260		 * there will never be an AST delivered to put
261		 * this lock on the proper secondary queue
262		 * (granted), so do it manually. */
263		mlog(0, "%s: $RECOVERY lock for this node (%u) is "
264		     "mastered by %u; got lock, manually granting (no ast)\n",
265		     dlm->name, dlm->node_num, res->owner);
266		list_move_tail(&lock->list, &res->granted);
267	}
268	spin_unlock(&res->spinlock);
269
270	if (lockres_changed)
271		dlm_lockres_calc_usage(dlm, res);
272
273	wake_up(&res->wq);
274	return status;
275}
276
277
278/* for remote lock creation.
279 * locking:
280 *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
281 *   taken:         none
282 *   held on exit:  none
283 * returns: DLM_NOLOCKMGR, or net status
284 */
285static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
286					       struct dlm_lock_resource *res,
287					       struct dlm_lock *lock, int flags)
288{
289	struct dlm_create_lock create;
290	int tmpret, status = 0;
291	enum dlm_status ret;
292
293	memset(&create, 0, sizeof(create));
294	create.node_idx = dlm->node_num;
295	create.requested_type = lock->ml.type;
296	create.cookie = lock->ml.cookie;
297	create.namelen = res->lockname.len;
298	create.flags = cpu_to_be32(flags);
299	memcpy(create.name, res->lockname.name, create.namelen);
300
301	tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
302				    sizeof(create), res->owner, &status);
303	if (tmpret >= 0) {
304		ret = status;
305		if (ret == DLM_REJECTED) {
306			mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
307			     "owned by node %u. That node is coming back up "
308			     "currently.\n", dlm->name, create.namelen,
309			     create.name, res->owner);
310			dlm_print_one_lock_resource(res);
311			BUG();
312		}
313	} else {
314		mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
315		     "node %u\n", dlm->name, create.namelen, create.name,
316		     tmpret, res->owner);
317		if (dlm_is_host_down(tmpret))
318			ret = DLM_RECOVERING;
319		else
320			ret = dlm_err_to_dlm_status(tmpret);
321	}
322
323	return ret;
324}
325
326void dlm_lock_get(struct dlm_lock *lock)
327{
328	kref_get(&lock->lock_refs);
329}
330
331void dlm_lock_put(struct dlm_lock *lock)
332{
333	kref_put(&lock->lock_refs, dlm_lock_release);
334}
335
336static void dlm_lock_release(struct kref *kref)
337{
338	struct dlm_lock *lock;
339
340	lock = container_of(kref, struct dlm_lock, lock_refs);
341
342	BUG_ON(!list_empty(&lock->list));
343	BUG_ON(!list_empty(&lock->ast_list));
344	BUG_ON(!list_empty(&lock->bast_list));
345	BUG_ON(lock->ast_pending);
346	BUG_ON(lock->bast_pending);
347
348	dlm_lock_detach_lockres(lock);
349
350	if (lock->lksb_kernel_allocated) {
351		mlog(0, "freeing kernel-allocated lksb\n");
352		kfree(lock->lksb);
353	}
354	kmem_cache_free(dlm_lock_cache, lock);
355}
356
357/* associate a lock with it's lockres, getting a ref on the lockres */
358void dlm_lock_attach_lockres(struct dlm_lock *lock,
359			     struct dlm_lock_resource *res)
360{
361	dlm_lockres_get(res);
362	lock->lockres = res;
363}
364
365/* drop ref on lockres, if there is still one associated with lock */
366static void dlm_lock_detach_lockres(struct dlm_lock *lock)
367{
368	struct dlm_lock_resource *res;
369
370	res = lock->lockres;
371	if (res) {
372		lock->lockres = NULL;
373		mlog(0, "removing lock's lockres reference\n");
374		dlm_lockres_put(res);
375	}
376}
377
378static void dlm_init_lock(struct dlm_lock *newlock, int type,
379			  u8 node, u64 cookie)
380{
381	INIT_LIST_HEAD(&newlock->list);
382	INIT_LIST_HEAD(&newlock->ast_list);
383	INIT_LIST_HEAD(&newlock->bast_list);
384	spin_lock_init(&newlock->spinlock);
385	newlock->ml.type = type;
386	newlock->ml.convert_type = LKM_IVMODE;
387	newlock->ml.highest_blocked = LKM_IVMODE;
388	newlock->ml.node = node;
389	newlock->ml.pad1 = 0;
390	newlock->ml.list = 0;
391	newlock->ml.flags = 0;
392	newlock->ast = NULL;
393	newlock->bast = NULL;
394	newlock->astdata = NULL;
395	newlock->ml.cookie = cpu_to_be64(cookie);
396	newlock->ast_pending = 0;
397	newlock->bast_pending = 0;
398	newlock->convert_pending = 0;
399	newlock->lock_pending = 0;
400	newlock->unlock_pending = 0;
401	newlock->cancel_pending = 0;
402	newlock->lksb_kernel_allocated = 0;
403
404	kref_init(&newlock->lock_refs);
405}
406
407struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
408			       struct dlm_lockstatus *lksb)
409{
410	struct dlm_lock *lock;
411	int kernel_allocated = 0;
412
413	lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
414	if (!lock)
415		return NULL;
416
417	if (!lksb) {
418		/* zero memory only if kernel-allocated */
419		lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
420		if (!lksb) {
421			kmem_cache_free(dlm_lock_cache, lock);
422			return NULL;
423		}
424		kernel_allocated = 1;
425	}
426
427	dlm_init_lock(lock, type, node, cookie);
428	if (kernel_allocated)
429		lock->lksb_kernel_allocated = 1;
430	lock->lksb = lksb;
431	lksb->lockid = lock;
432	return lock;
433}
434
435/* handler for lock creation net message
436 * locking:
437 *   caller needs:  none
438 *   taken:         takes and drops res->spinlock
439 *   held on exit:  none
440 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
441 */
442int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
443			    void **ret_data)
444{
445	struct dlm_ctxt *dlm = data;
446	struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
447	struct dlm_lock_resource *res = NULL;
448	struct dlm_lock *newlock = NULL;
449	struct dlm_lockstatus *lksb = NULL;
450	enum dlm_status status = DLM_NORMAL;
451	char *name;
452	unsigned int namelen;
453
454	BUG_ON(!dlm);
455
456	if (!dlm_grab(dlm))
457		return DLM_REJECTED;
458
459	name = create->name;
460	namelen = create->namelen;
461	status = DLM_REJECTED;
462	if (!dlm_domain_fully_joined(dlm)) {
463		mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
464		     "sending a create_lock message for lock %.*s!\n",
465		     dlm->name, create->node_idx, namelen, name);
466		dlm_error(status);
467		goto leave;
468	}
469
470	status = DLM_IVBUFLEN;
471	if (namelen > DLM_LOCKID_NAME_MAX) {
472		dlm_error(status);
473		goto leave;
474	}
475
476	status = DLM_SYSERR;
477	newlock = dlm_new_lock(create->requested_type,
478			       create->node_idx,
479			       be64_to_cpu(create->cookie), NULL);
480	if (!newlock) {
481		dlm_error(status);
482		goto leave;
483	}
484
485	lksb = newlock->lksb;
486
487	if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
488		lksb->flags |= DLM_LKSB_GET_LVB;
489		mlog(0, "set DLM_LKSB_GET_LVB flag\n");
490	}
491
492	status = DLM_IVLOCKID;
493	res = dlm_lookup_lockres(dlm, name, namelen);
494	if (!res) {
495		dlm_error(status);
496		goto leave;
497	}
498
499	spin_lock(&res->spinlock);
500	status = __dlm_lockres_state_to_status(res);
501	spin_unlock(&res->spinlock);
502
503	if (status != DLM_NORMAL) {
504		mlog(0, "lockres recovering/migrating/in-progress\n");
505		goto leave;
506	}
507
508	dlm_lock_attach_lockres(newlock, res);
509
510	status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
511leave:
512	if (status != DLM_NORMAL)
513		if (newlock)
514			dlm_lock_put(newlock);
515
516	if (res)
517		dlm_lockres_put(res);
518
519	dlm_put(dlm);
520
521	return status;
522}
523
524
525/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
526static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
527{
528	u64 tmpnode = node_num;
529
530	/* shift single byte of node num into top 8 bits */
531	tmpnode <<= 56;
532
533	spin_lock(&dlm_cookie_lock);
534	*cookie = (dlm_next_cookie | tmpnode);
535	if (++dlm_next_cookie & 0xff00000000000000ull) {
536		mlog(0, "This node's cookie will now wrap!\n");
537		dlm_next_cookie = 1;
538	}
539	spin_unlock(&dlm_cookie_lock);
540}
541
542enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
543			struct dlm_lockstatus *lksb, int flags,
544			const char *name, int namelen, dlm_astlockfunc_t *ast,
545			void *data, dlm_bastlockfunc_t *bast)
546{
547	enum dlm_status status;
548	struct dlm_lock_resource *res = NULL;
549	struct dlm_lock *lock = NULL;
550	int convert = 0, recovery = 0;
551
552	/* yes this function is a mess.
553	 * TODO: clean this up.  lots of common code in the
554	 *       lock and convert paths, especially in the retry blocks */
555	if (!lksb) {
556		dlm_error(DLM_BADARGS);
557		return DLM_BADARGS;
558	}
559
560	status = DLM_BADPARAM;
561	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
562		dlm_error(status);
563		goto error;
564	}
565
566	if (flags & ~LKM_VALID_FLAGS) {
567		dlm_error(status);
568		goto error;
569	}
570
571	convert = (flags & LKM_CONVERT);
572	recovery = (flags & LKM_RECOVERY);
573
574	if (recovery &&
575	    (!dlm_is_recovery_lock(name, namelen) || convert) ) {
576		dlm_error(status);
577		goto error;
578	}
579	if (convert && (flags & LKM_LOCAL)) {
580		mlog(ML_ERROR, "strange LOCAL convert request!\n");
581		goto error;
582	}
583
584	if (convert) {
585		/* CONVERT request */
586
587		/* if converting, must pass in a valid dlm_lock */
588		lock = lksb->lockid;
589		if (!lock) {
590			mlog(ML_ERROR, "NULL lock pointer in convert "
591			     "request\n");
592			goto error;
593		}
594
595		res = lock->lockres;
596		if (!res) {
597			mlog(ML_ERROR, "NULL lockres pointer in convert "
598			     "request\n");
599			goto error;
600		}
601		dlm_lockres_get(res);
602
603		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
604	 	 * static after the original lock call.  convert requests will
605		 * ensure that everything is the same, or return DLM_BADARGS.
606	 	 * this means that DLM_DENIED_NOASTS will never be returned.
607	 	 */
608		if (lock->lksb != lksb || lock->ast != ast ||
609		    lock->bast != bast || lock->astdata != data) {
610			status = DLM_BADARGS;
611			mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
612			     "astdata=%p\n", lksb, ast, bast, data);
613			mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
614			     "astdata=%p\n", lock->lksb, lock->ast,
615			     lock->bast, lock->astdata);
616			goto error;
617		}
618retry_convert:
619		dlm_wait_for_recovery(dlm);
620
621		if (res->owner == dlm->node_num)
622			status = dlmconvert_master(dlm, res, lock, flags, mode);
623		else
624			status = dlmconvert_remote(dlm, res, lock, flags, mode);
625		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
626		    status == DLM_FORWARD) {
627			/* for now, see how this works without sleeping
628			 * and just retry right away.  I suspect the reco
629			 * or migration will complete fast enough that
630			 * no waiting will be necessary */
631			mlog(0, "retrying convert with migration/recovery/"
632			     "in-progress\n");
633			msleep(100);
634			goto retry_convert;
635		}
636	} else {
637		u64 tmpcookie;
638
639		/* LOCK request */
640		status = DLM_BADARGS;
641		if (!name) {
642			dlm_error(status);
643			goto error;
644		}
645
646		status = DLM_IVBUFLEN;
647		if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
648			dlm_error(status);
649			goto error;
650		}
651
652		dlm_get_next_cookie(dlm->node_num, &tmpcookie);
653		lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
654		if (!lock) {
655			dlm_error(status);
656			goto error;
657		}
658
659		if (!recovery)
660			dlm_wait_for_recovery(dlm);
661
662		/* find or create the lock resource */
663		res = dlm_get_lock_resource(dlm, name, namelen, flags);
664		if (!res) {
665			status = DLM_IVLOCKID;
666			dlm_error(status);
667			goto error;
668		}
669
670		mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
671		mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
672
673		dlm_lock_attach_lockres(lock, res);
674		lock->ast = ast;
675		lock->bast = bast;
676		lock->astdata = data;
677
678retry_lock:
679		if (flags & LKM_VALBLK) {
680			mlog(0, "LKM_VALBLK passed by caller\n");
681
682			/* LVB requests for non PR, PW or EX locks are
683			 * ignored. */
684			if (mode < LKM_PRMODE)
685				flags &= ~LKM_VALBLK;
686			else {
687				flags |= LKM_GET_LVB;
688				lock->lksb->flags |= DLM_LKSB_GET_LVB;
689			}
690		}
691
692		if (res->owner == dlm->node_num)
693			status = dlmlock_master(dlm, res, lock, flags);
694		else
695			status = dlmlock_remote(dlm, res, lock, flags);
696
697		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
698		    status == DLM_FORWARD) {
699			msleep(100);
700			if (recovery) {
701				if (status != DLM_RECOVERING)
702					goto retry_lock;
703				/* wait to see the node go down, then
704				 * drop down and allow the lockres to
705				 * get cleaned up.  need to remaster. */
706				dlm_wait_for_node_death(dlm, res->owner,
707						DLM_NODE_DEATH_WAIT_MAX);
708			} else {
709				dlm_wait_for_recovery(dlm);
710				goto retry_lock;
711			}
712		}
713
714		/* Inflight taken in dlm_get_lock_resource() is dropped here */
715		spin_lock(&res->spinlock);
716		dlm_lockres_drop_inflight_ref(dlm, res);
717		spin_unlock(&res->spinlock);
718
719		dlm_lockres_calc_usage(dlm, res);
720		dlm_kick_thread(dlm, res);
721
722		if (status != DLM_NORMAL) {
723			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
724			if (status != DLM_NOTQUEUED)
725				dlm_error(status);
726			goto error;
727		}
728	}
729
730error:
731	if (status != DLM_NORMAL) {
732		if (lock && !convert)
733			dlm_lock_put(lock);
734		// this is kind of unnecessary
735		lksb->status = status;
736	}
737
738	/* put lockres ref from the convert path
739	 * or from dlm_get_lock_resource */
740	if (res)
741		dlm_lockres_put(res);
742
743	return status;
744}
745EXPORT_SYMBOL_GPL(dlmlock);