Linux Audio

Check our new training course

Loading...
v3.5.6
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/fs.h>
   4#include <linux/wait.h>
   5#include <linux/slab.h>
   6#include <linux/sched.h>
   7#include <linux/debugfs.h>
   8#include <linux/seq_file.h>
   9
  10#include "super.h"
  11#include "mds_client.h"
  12
  13#include <linux/ceph/messenger.h>
  14#include <linux/ceph/decode.h>
  15#include <linux/ceph/pagelist.h>
  16#include <linux/ceph/auth.h>
  17#include <linux/ceph/debugfs.h>
  18
  19/*
  20 * A cluster of MDS (metadata server) daemons is responsible for
  21 * managing the file system namespace (the directory hierarchy and
  22 * inodes) and for coordinating shared access to storage.  Metadata is
  23 * partitioning hierarchically across a number of servers, and that
  24 * partition varies over time as the cluster adjusts the distribution
  25 * in order to balance load.
  26 *
  27 * The MDS client is primarily responsible to managing synchronous
  28 * metadata requests for operations like open, unlink, and so forth.
  29 * If there is a MDS failure, we find out about it when we (possibly
  30 * request and) receive a new MDS map, and can resubmit affected
  31 * requests.
  32 *
  33 * For the most part, though, we take advantage of a lossless
  34 * communications channel to the MDS, and do not need to worry about
  35 * timing out or resubmitting requests.
  36 *
  37 * We maintain a stateful "session" with each MDS we interact with.
  38 * Within each session, we sent periodic heartbeat messages to ensure
  39 * any capabilities or leases we have been issues remain valid.  If
  40 * the session times out and goes stale, our leases and capabilities
  41 * are no longer valid.
  42 */
  43
  44struct ceph_reconnect_state {
  45	struct ceph_pagelist *pagelist;
  46	bool flock;
  47};
  48
  49static void __wake_requests(struct ceph_mds_client *mdsc,
  50			    struct list_head *head);
  51
  52static const struct ceph_connection_operations mds_con_ops;
  53
  54
  55/*
  56 * mds reply parsing
  57 */
  58
  59/*
  60 * parse individual inode info
  61 */
  62static int parse_reply_info_in(void **p, void *end,
  63			       struct ceph_mds_reply_info_in *info,
  64			       int features)
  65{
  66	int err = -EIO;
  67
  68	info->in = *p;
  69	*p += sizeof(struct ceph_mds_reply_inode) +
  70		sizeof(*info->in->fragtree.splits) *
  71		le32_to_cpu(info->in->fragtree.nsplits);
  72
  73	ceph_decode_32_safe(p, end, info->symlink_len, bad);
  74	ceph_decode_need(p, end, info->symlink_len, bad);
  75	info->symlink = *p;
  76	*p += info->symlink_len;
  77
  78	if (features & CEPH_FEATURE_DIRLAYOUTHASH)
  79		ceph_decode_copy_safe(p, end, &info->dir_layout,
  80				      sizeof(info->dir_layout), bad);
  81	else
  82		memset(&info->dir_layout, 0, sizeof(info->dir_layout));
  83
  84	ceph_decode_32_safe(p, end, info->xattr_len, bad);
  85	ceph_decode_need(p, end, info->xattr_len, bad);
  86	info->xattr_data = *p;
  87	*p += info->xattr_len;
  88	return 0;
  89bad:
  90	return err;
  91}
  92
  93/*
  94 * parse a normal reply, which may contain a (dir+)dentry and/or a
  95 * target inode.
  96 */
  97static int parse_reply_info_trace(void **p, void *end,
  98				  struct ceph_mds_reply_info_parsed *info,
  99				  int features)
 100{
 101	int err;
 102
 103	if (info->head->is_dentry) {
 104		err = parse_reply_info_in(p, end, &info->diri, features);
 105		if (err < 0)
 106			goto out_bad;
 107
 108		if (unlikely(*p + sizeof(*info->dirfrag) > end))
 109			goto bad;
 110		info->dirfrag = *p;
 111		*p += sizeof(*info->dirfrag) +
 112			sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
 113		if (unlikely(*p > end))
 114			goto bad;
 115
 116		ceph_decode_32_safe(p, end, info->dname_len, bad);
 117		ceph_decode_need(p, end, info->dname_len, bad);
 118		info->dname = *p;
 119		*p += info->dname_len;
 120		info->dlease = *p;
 121		*p += sizeof(*info->dlease);
 122	}
 123
 124	if (info->head->is_target) {
 125		err = parse_reply_info_in(p, end, &info->targeti, features);
 126		if (err < 0)
 127			goto out_bad;
 128	}
 129
 130	if (unlikely(*p != end))
 131		goto bad;
 132	return 0;
 133
 134bad:
 135	err = -EIO;
 136out_bad:
 137	pr_err("problem parsing mds trace %d\n", err);
 138	return err;
 139}
 140
 141/*
 142 * parse readdir results
 143 */
 144static int parse_reply_info_dir(void **p, void *end,
 145				struct ceph_mds_reply_info_parsed *info,
 146				int features)
 147{
 148	u32 num, i = 0;
 149	int err;
 150
 151	info->dir_dir = *p;
 152	if (*p + sizeof(*info->dir_dir) > end)
 153		goto bad;
 154	*p += sizeof(*info->dir_dir) +
 155		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
 156	if (*p > end)
 157		goto bad;
 158
 159	ceph_decode_need(p, end, sizeof(num) + 2, bad);
 160	num = ceph_decode_32(p);
 161	info->dir_end = ceph_decode_8(p);
 162	info->dir_complete = ceph_decode_8(p);
 163	if (num == 0)
 164		goto done;
 165
 166	/* alloc large array */
 167	info->dir_nr = num;
 168	info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
 169			       sizeof(*info->dir_dname) +
 170			       sizeof(*info->dir_dname_len) +
 171			       sizeof(*info->dir_dlease),
 172			       GFP_NOFS);
 173	if (info->dir_in == NULL) {
 174		err = -ENOMEM;
 175		goto out_bad;
 176	}
 177	info->dir_dname = (void *)(info->dir_in + num);
 178	info->dir_dname_len = (void *)(info->dir_dname + num);
 179	info->dir_dlease = (void *)(info->dir_dname_len + num);
 180
 181	while (num) {
 182		/* dentry */
 183		ceph_decode_need(p, end, sizeof(u32)*2, bad);
 184		info->dir_dname_len[i] = ceph_decode_32(p);
 185		ceph_decode_need(p, end, info->dir_dname_len[i], bad);
 186		info->dir_dname[i] = *p;
 187		*p += info->dir_dname_len[i];
 188		dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
 189		     info->dir_dname[i]);
 190		info->dir_dlease[i] = *p;
 191		*p += sizeof(struct ceph_mds_reply_lease);
 192
 193		/* inode */
 194		err = parse_reply_info_in(p, end, &info->dir_in[i], features);
 195		if (err < 0)
 196			goto out_bad;
 197		i++;
 198		num--;
 199	}
 200
 201done:
 202	if (*p != end)
 203		goto bad;
 204	return 0;
 205
 206bad:
 207	err = -EIO;
 208out_bad:
 209	pr_err("problem parsing dir contents %d\n", err);
 210	return err;
 211}
 212
 213/*
 214 * parse fcntl F_GETLK results
 215 */
 216static int parse_reply_info_filelock(void **p, void *end,
 217				     struct ceph_mds_reply_info_parsed *info,
 218				     int features)
 219{
 220	if (*p + sizeof(*info->filelock_reply) > end)
 221		goto bad;
 222
 223	info->filelock_reply = *p;
 224	*p += sizeof(*info->filelock_reply);
 225
 226	if (unlikely(*p != end))
 227		goto bad;
 228	return 0;
 229
 230bad:
 231	return -EIO;
 232}
 233
 234/*
 235 * parse extra results
 236 */
 237static int parse_reply_info_extra(void **p, void *end,
 238				  struct ceph_mds_reply_info_parsed *info,
 239				  int features)
 240{
 241	if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
 242		return parse_reply_info_filelock(p, end, info, features);
 243	else
 244		return parse_reply_info_dir(p, end, info, features);
 245}
 246
 247/*
 248 * parse entire mds reply
 249 */
 250static int parse_reply_info(struct ceph_msg *msg,
 251			    struct ceph_mds_reply_info_parsed *info,
 252			    int features)
 253{
 254	void *p, *end;
 255	u32 len;
 256	int err;
 257
 258	info->head = msg->front.iov_base;
 259	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 260	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
 261
 262	/* trace */
 263	ceph_decode_32_safe(&p, end, len, bad);
 264	if (len > 0) {
 265		ceph_decode_need(&p, end, len, bad);
 266		err = parse_reply_info_trace(&p, p+len, info, features);
 267		if (err < 0)
 268			goto out_bad;
 269	}
 270
 271	/* extra */
 272	ceph_decode_32_safe(&p, end, len, bad);
 273	if (len > 0) {
 274		ceph_decode_need(&p, end, len, bad);
 275		err = parse_reply_info_extra(&p, p+len, info, features);
 276		if (err < 0)
 277			goto out_bad;
 278	}
 279
 280	/* snap blob */
 281	ceph_decode_32_safe(&p, end, len, bad);
 282	info->snapblob_len = len;
 283	info->snapblob = p;
 284	p += len;
 285
 286	if (p != end)
 287		goto bad;
 288	return 0;
 289
 290bad:
 291	err = -EIO;
 292out_bad:
 293	pr_err("mds parse_reply err %d\n", err);
 294	return err;
 295}
 296
 297static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 298{
 299	kfree(info->dir_in);
 300}
 301
 302
 303/*
 304 * sessions
 305 */
 306static const char *session_state_name(int s)
 307{
 308	switch (s) {
 309	case CEPH_MDS_SESSION_NEW: return "new";
 310	case CEPH_MDS_SESSION_OPENING: return "opening";
 311	case CEPH_MDS_SESSION_OPEN: return "open";
 312	case CEPH_MDS_SESSION_HUNG: return "hung";
 313	case CEPH_MDS_SESSION_CLOSING: return "closing";
 314	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 315	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 316	default: return "???";
 317	}
 318}
 319
 320static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 321{
 322	if (atomic_inc_not_zero(&s->s_ref)) {
 323		dout("mdsc get_session %p %d -> %d\n", s,
 324		     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
 325		return s;
 326	} else {
 327		dout("mdsc get_session %p 0 -- FAIL", s);
 328		return NULL;
 329	}
 330}
 331
 332void ceph_put_mds_session(struct ceph_mds_session *s)
 333{
 334	dout("mdsc put_session %p %d -> %d\n", s,
 335	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
 336	if (atomic_dec_and_test(&s->s_ref)) {
 337		if (s->s_auth.authorizer)
 338		     s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
 339			     s->s_mdsc->fsc->client->monc.auth,
 340			     s->s_auth.authorizer);
 341		kfree(s);
 342	}
 343}
 344
 345/*
 346 * called under mdsc->mutex
 347 */
 348struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 349						   int mds)
 350{
 351	struct ceph_mds_session *session;
 352
 353	if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
 354		return NULL;
 355	session = mdsc->sessions[mds];
 356	dout("lookup_mds_session %p %d\n", session,
 357	     atomic_read(&session->s_ref));
 358	get_session(session);
 359	return session;
 360}
 361
 362static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 363{
 364	if (mds >= mdsc->max_sessions)
 365		return false;
 366	return mdsc->sessions[mds];
 367}
 368
 369static int __verify_registered_session(struct ceph_mds_client *mdsc,
 370				       struct ceph_mds_session *s)
 371{
 372	if (s->s_mds >= mdsc->max_sessions ||
 373	    mdsc->sessions[s->s_mds] != s)
 374		return -ENOENT;
 375	return 0;
 376}
 377
 378/*
 379 * create+register a new session for given mds.
 380 * called under mdsc->mutex.
 381 */
 382static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 383						 int mds)
 384{
 385	struct ceph_mds_session *s;
 386
 387	s = kzalloc(sizeof(*s), GFP_NOFS);
 388	if (!s)
 389		return ERR_PTR(-ENOMEM);
 390	s->s_mdsc = mdsc;
 391	s->s_mds = mds;
 392	s->s_state = CEPH_MDS_SESSION_NEW;
 393	s->s_ttl = 0;
 394	s->s_seq = 0;
 395	mutex_init(&s->s_mutex);
 396
 397	ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
 398	s->s_con.private = s;
 399	s->s_con.ops = &mds_con_ops;
 400	s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
 401	s->s_con.peer_name.num = cpu_to_le64(mds);
 402
 403	spin_lock_init(&s->s_gen_ttl_lock);
 404	s->s_cap_gen = 0;
 405	s->s_cap_ttl = jiffies - 1;
 406
 407	spin_lock_init(&s->s_cap_lock);
 
 
 408	s->s_renew_requested = 0;
 409	s->s_renew_seq = 0;
 410	INIT_LIST_HEAD(&s->s_caps);
 411	s->s_nr_caps = 0;
 412	s->s_trim_caps = 0;
 413	atomic_set(&s->s_ref, 1);
 414	INIT_LIST_HEAD(&s->s_waiting);
 415	INIT_LIST_HEAD(&s->s_unsafe);
 416	s->s_num_cap_releases = 0;
 417	s->s_cap_iterator = NULL;
 418	INIT_LIST_HEAD(&s->s_cap_releases);
 419	INIT_LIST_HEAD(&s->s_cap_releases_done);
 420	INIT_LIST_HEAD(&s->s_cap_flushing);
 421	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 422
 423	dout("register_session mds%d\n", mds);
 424	if (mds >= mdsc->max_sessions) {
 425		int newmax = 1 << get_count_order(mds+1);
 426		struct ceph_mds_session **sa;
 427
 428		dout("register_session realloc to %d\n", newmax);
 429		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
 430		if (sa == NULL)
 431			goto fail_realloc;
 432		if (mdsc->sessions) {
 433			memcpy(sa, mdsc->sessions,
 434			       mdsc->max_sessions * sizeof(void *));
 435			kfree(mdsc->sessions);
 436		}
 437		mdsc->sessions = sa;
 438		mdsc->max_sessions = newmax;
 439	}
 440	mdsc->sessions[mds] = s;
 441	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 442
 443	ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 444
 445	return s;
 446
 447fail_realloc:
 448	kfree(s);
 449	return ERR_PTR(-ENOMEM);
 450}
 451
 452/*
 453 * called under mdsc->mutex
 454 */
 455static void __unregister_session(struct ceph_mds_client *mdsc,
 456			       struct ceph_mds_session *s)
 457{
 458	dout("__unregister_session mds%d %p\n", s->s_mds, s);
 459	BUG_ON(mdsc->sessions[s->s_mds] != s);
 460	mdsc->sessions[s->s_mds] = NULL;
 461	ceph_con_close(&s->s_con);
 462	ceph_put_mds_session(s);
 463}
 464
 465/*
 466 * drop session refs in request.
 467 *
 468 * should be last request ref, or hold mdsc->mutex
 469 */
 470static void put_request_session(struct ceph_mds_request *req)
 471{
 472	if (req->r_session) {
 473		ceph_put_mds_session(req->r_session);
 474		req->r_session = NULL;
 475	}
 476}
 477
 478void ceph_mdsc_release_request(struct kref *kref)
 479{
 480	struct ceph_mds_request *req = container_of(kref,
 481						    struct ceph_mds_request,
 482						    r_kref);
 483	if (req->r_request)
 484		ceph_msg_put(req->r_request);
 485	if (req->r_reply) {
 486		ceph_msg_put(req->r_reply);
 487		destroy_reply_info(&req->r_reply_info);
 488	}
 489	if (req->r_inode) {
 490		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 491		iput(req->r_inode);
 492	}
 493	if (req->r_locked_dir)
 494		ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
 495	if (req->r_target_inode)
 496		iput(req->r_target_inode);
 497	if (req->r_dentry)
 498		dput(req->r_dentry);
 499	if (req->r_old_dentry) {
 500		/*
 501		 * track (and drop pins for) r_old_dentry_dir
 502		 * separately, since r_old_dentry's d_parent may have
 503		 * changed between the dir mutex being dropped and
 504		 * this request being freed.
 505		 */
 506		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 507				  CEPH_CAP_PIN);
 508		dput(req->r_old_dentry);
 509		iput(req->r_old_dentry_dir);
 510	}
 511	kfree(req->r_path1);
 512	kfree(req->r_path2);
 513	put_request_session(req);
 514	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 515	kfree(req);
 516}
 517
 518/*
 519 * lookup session, bump ref if found.
 520 *
 521 * called under mdsc->mutex.
 522 */
 523static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
 524					     u64 tid)
 525{
 526	struct ceph_mds_request *req;
 527	struct rb_node *n = mdsc->request_tree.rb_node;
 528
 529	while (n) {
 530		req = rb_entry(n, struct ceph_mds_request, r_node);
 531		if (tid < req->r_tid)
 532			n = n->rb_left;
 533		else if (tid > req->r_tid)
 534			n = n->rb_right;
 535		else {
 536			ceph_mdsc_get_request(req);
 537			return req;
 538		}
 539	}
 540	return NULL;
 541}
 542
 543static void __insert_request(struct ceph_mds_client *mdsc,
 544			     struct ceph_mds_request *new)
 545{
 546	struct rb_node **p = &mdsc->request_tree.rb_node;
 547	struct rb_node *parent = NULL;
 548	struct ceph_mds_request *req = NULL;
 549
 550	while (*p) {
 551		parent = *p;
 552		req = rb_entry(parent, struct ceph_mds_request, r_node);
 553		if (new->r_tid < req->r_tid)
 554			p = &(*p)->rb_left;
 555		else if (new->r_tid > req->r_tid)
 556			p = &(*p)->rb_right;
 557		else
 558			BUG();
 559	}
 560
 561	rb_link_node(&new->r_node, parent, p);
 562	rb_insert_color(&new->r_node, &mdsc->request_tree);
 563}
 564
 565/*
 566 * Register an in-flight request, and assign a tid.  Link to directory
 567 * are modifying (if any).
 568 *
 569 * Called under mdsc->mutex.
 570 */
 571static void __register_request(struct ceph_mds_client *mdsc,
 572			       struct ceph_mds_request *req,
 573			       struct inode *dir)
 574{
 575	req->r_tid = ++mdsc->last_tid;
 576	if (req->r_num_caps)
 577		ceph_reserve_caps(mdsc, &req->r_caps_reservation,
 578				  req->r_num_caps);
 579	dout("__register_request %p tid %lld\n", req, req->r_tid);
 580	ceph_mdsc_get_request(req);
 581	__insert_request(mdsc, req);
 582
 583	req->r_uid = current_fsuid();
 584	req->r_gid = current_fsgid();
 585
 586	if (dir) {
 587		struct ceph_inode_info *ci = ceph_inode(dir);
 588
 589		ihold(dir);
 590		spin_lock(&ci->i_unsafe_lock);
 591		req->r_unsafe_dir = dir;
 592		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
 593		spin_unlock(&ci->i_unsafe_lock);
 594	}
 595}
 596
 597static void __unregister_request(struct ceph_mds_client *mdsc,
 598				 struct ceph_mds_request *req)
 599{
 600	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 601	rb_erase(&req->r_node, &mdsc->request_tree);
 602	RB_CLEAR_NODE(&req->r_node);
 603
 604	if (req->r_unsafe_dir) {
 605		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 606
 607		spin_lock(&ci->i_unsafe_lock);
 608		list_del_init(&req->r_unsafe_dir_item);
 609		spin_unlock(&ci->i_unsafe_lock);
 610
 611		iput(req->r_unsafe_dir);
 612		req->r_unsafe_dir = NULL;
 613	}
 614
 615	ceph_mdsc_put_request(req);
 616}
 617
 618/*
 619 * Choose mds to send request to next.  If there is a hint set in the
 620 * request (e.g., due to a prior forward hint from the mds), use that.
 621 * Otherwise, consult frag tree and/or caps to identify the
 622 * appropriate mds.  If all else fails, choose randomly.
 623 *
 624 * Called under mdsc->mutex.
 625 */
 626static struct dentry *get_nonsnap_parent(struct dentry *dentry)
 627{
 628	/*
 629	 * we don't need to worry about protecting the d_parent access
 630	 * here because we never renaming inside the snapped namespace
 631	 * except to resplice to another snapdir, and either the old or new
 632	 * result is a valid result.
 633	 */
 634	while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
 635		dentry = dentry->d_parent;
 636	return dentry;
 637}
 638
 639static int __choose_mds(struct ceph_mds_client *mdsc,
 640			struct ceph_mds_request *req)
 641{
 642	struct inode *inode;
 643	struct ceph_inode_info *ci;
 644	struct ceph_cap *cap;
 645	int mode = req->r_direct_mode;
 646	int mds = -1;
 647	u32 hash = req->r_direct_hash;
 648	bool is_hash = req->r_direct_is_hash;
 649
 650	/*
 651	 * is there a specific mds we should try?  ignore hint if we have
 652	 * no session and the mds is not up (active or recovering).
 653	 */
 654	if (req->r_resend_mds >= 0 &&
 655	    (__have_session(mdsc, req->r_resend_mds) ||
 656	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
 657		dout("choose_mds using resend_mds mds%d\n",
 658		     req->r_resend_mds);
 659		return req->r_resend_mds;
 660	}
 661
 662	if (mode == USE_RANDOM_MDS)
 663		goto random;
 664
 665	inode = NULL;
 666	if (req->r_inode) {
 667		inode = req->r_inode;
 668	} else if (req->r_dentry) {
 669		/* ignore race with rename; old or new d_parent is okay */
 670		struct dentry *parent = req->r_dentry->d_parent;
 671		struct inode *dir = parent->d_inode;
 672
 673		if (dir->i_sb != mdsc->fsc->sb) {
 674			/* not this fs! */
 675			inode = req->r_dentry->d_inode;
 676		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
 677			/* direct snapped/virtual snapdir requests
 678			 * based on parent dir inode */
 679			struct dentry *dn = get_nonsnap_parent(parent);
 680			inode = dn->d_inode;
 681			dout("__choose_mds using nonsnap parent %p\n", inode);
 682		} else if (req->r_dentry->d_inode) {
 683			/* dentry target */
 684			inode = req->r_dentry->d_inode;
 685		} else {
 686			/* dir + name */
 687			inode = dir;
 688			hash = ceph_dentry_hash(dir, req->r_dentry);
 689			is_hash = true;
 690		}
 691	}
 692
 693	dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
 694	     (int)hash, mode);
 695	if (!inode)
 696		goto random;
 697	ci = ceph_inode(inode);
 698
 699	if (is_hash && S_ISDIR(inode->i_mode)) {
 700		struct ceph_inode_frag frag;
 701		int found;
 702
 703		ceph_choose_frag(ci, hash, &frag, &found);
 704		if (found) {
 705			if (mode == USE_ANY_MDS && frag.ndist > 0) {
 706				u8 r;
 707
 708				/* choose a random replica */
 709				get_random_bytes(&r, 1);
 710				r %= frag.ndist;
 711				mds = frag.dist[r];
 712				dout("choose_mds %p %llx.%llx "
 713				     "frag %u mds%d (%d/%d)\n",
 714				     inode, ceph_vinop(inode),
 715				     frag.frag, mds,
 716				     (int)r, frag.ndist);
 717				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 718				    CEPH_MDS_STATE_ACTIVE)
 719					return mds;
 720			}
 721
 722			/* since this file/dir wasn't known to be
 723			 * replicated, then we want to look for the
 724			 * authoritative mds. */
 725			mode = USE_AUTH_MDS;
 726			if (frag.mds >= 0) {
 727				/* choose auth mds */
 728				mds = frag.mds;
 729				dout("choose_mds %p %llx.%llx "
 730				     "frag %u mds%d (auth)\n",
 731				     inode, ceph_vinop(inode), frag.frag, mds);
 732				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 733				    CEPH_MDS_STATE_ACTIVE)
 734					return mds;
 735			}
 736		}
 737	}
 738
 739	spin_lock(&ci->i_ceph_lock);
 740	cap = NULL;
 741	if (mode == USE_AUTH_MDS)
 742		cap = ci->i_auth_cap;
 743	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
 744		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 745	if (!cap) {
 746		spin_unlock(&ci->i_ceph_lock);
 747		goto random;
 748	}
 749	mds = cap->session->s_mds;
 750	dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
 751	     inode, ceph_vinop(inode), mds,
 752	     cap == ci->i_auth_cap ? "auth " : "", cap);
 753	spin_unlock(&ci->i_ceph_lock);
 754	return mds;
 755
 756random:
 757	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
 758	dout("choose_mds chose random mds%d\n", mds);
 759	return mds;
 760}
 761
 762
 763/*
 764 * session messages
 765 */
 766static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 767{
 768	struct ceph_msg *msg;
 769	struct ceph_mds_session_head *h;
 770
 771	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
 772			   false);
 773	if (!msg) {
 774		pr_err("create_session_msg ENOMEM creating msg\n");
 775		return NULL;
 776	}
 777	h = msg->front.iov_base;
 778	h->op = cpu_to_le32(op);
 779	h->seq = cpu_to_le64(seq);
 780	return msg;
 781}
 782
 783/*
 784 * send session open request.
 785 *
 786 * called under mdsc->mutex
 787 */
 788static int __open_session(struct ceph_mds_client *mdsc,
 789			  struct ceph_mds_session *session)
 790{
 791	struct ceph_msg *msg;
 792	int mstate;
 793	int mds = session->s_mds;
 794
 795	/* wait for mds to go active? */
 796	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
 797	dout("open_session to mds%d (%s)\n", mds,
 798	     ceph_mds_state_name(mstate));
 799	session->s_state = CEPH_MDS_SESSION_OPENING;
 800	session->s_renew_requested = jiffies;
 801
 802	/* send connect message */
 803	msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
 804	if (!msg)
 805		return -ENOMEM;
 806	ceph_con_send(&session->s_con, msg);
 807	return 0;
 808}
 809
 810/*
 811 * open sessions for any export targets for the given mds
 812 *
 813 * called under mdsc->mutex
 814 */
 815static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
 816					  struct ceph_mds_session *session)
 817{
 818	struct ceph_mds_info *mi;
 819	struct ceph_mds_session *ts;
 820	int i, mds = session->s_mds;
 821	int target;
 822
 823	if (mds >= mdsc->mdsmap->m_max_mds)
 824		return;
 825	mi = &mdsc->mdsmap->m_info[mds];
 826	dout("open_export_target_sessions for mds%d (%d targets)\n",
 827	     session->s_mds, mi->num_export_targets);
 828
 829	for (i = 0; i < mi->num_export_targets; i++) {
 830		target = mi->export_targets[i];
 831		ts = __ceph_lookup_mds_session(mdsc, target);
 832		if (!ts) {
 833			ts = register_session(mdsc, target);
 834			if (IS_ERR(ts))
 835				return;
 836		}
 837		if (session->s_state == CEPH_MDS_SESSION_NEW ||
 838		    session->s_state == CEPH_MDS_SESSION_CLOSING)
 839			__open_session(mdsc, session);
 840		else
 841			dout(" mds%d target mds%d %p is %s\n", session->s_mds,
 842			     i, ts, session_state_name(ts->s_state));
 843		ceph_put_mds_session(ts);
 844	}
 845}
 846
 847void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
 848					   struct ceph_mds_session *session)
 849{
 850	mutex_lock(&mdsc->mutex);
 851	__open_export_target_sessions(mdsc, session);
 852	mutex_unlock(&mdsc->mutex);
 853}
 854
 855/*
 856 * session caps
 857 */
 858
 859/*
 860 * Free preallocated cap messages assigned to this session
 861 */
 862static void cleanup_cap_releases(struct ceph_mds_session *session)
 863{
 864	struct ceph_msg *msg;
 865
 866	spin_lock(&session->s_cap_lock);
 867	while (!list_empty(&session->s_cap_releases)) {
 868		msg = list_first_entry(&session->s_cap_releases,
 869				       struct ceph_msg, list_head);
 870		list_del_init(&msg->list_head);
 871		ceph_msg_put(msg);
 872	}
 873	while (!list_empty(&session->s_cap_releases_done)) {
 874		msg = list_first_entry(&session->s_cap_releases_done,
 875				       struct ceph_msg, list_head);
 876		list_del_init(&msg->list_head);
 877		ceph_msg_put(msg);
 878	}
 879	spin_unlock(&session->s_cap_lock);
 880}
 881
 882/*
 883 * Helper to safely iterate over all caps associated with a session, with
 884 * special care taken to handle a racing __ceph_remove_cap().
 885 *
 886 * Caller must hold session s_mutex.
 887 */
 888static int iterate_session_caps(struct ceph_mds_session *session,
 889				 int (*cb)(struct inode *, struct ceph_cap *,
 890					    void *), void *arg)
 891{
 892	struct list_head *p;
 893	struct ceph_cap *cap;
 894	struct inode *inode, *last_inode = NULL;
 895	struct ceph_cap *old_cap = NULL;
 896	int ret;
 897
 898	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
 899	spin_lock(&session->s_cap_lock);
 900	p = session->s_caps.next;
 901	while (p != &session->s_caps) {
 902		cap = list_entry(p, struct ceph_cap, session_caps);
 903		inode = igrab(&cap->ci->vfs_inode);
 904		if (!inode) {
 905			p = p->next;
 906			continue;
 907		}
 908		session->s_cap_iterator = cap;
 909		spin_unlock(&session->s_cap_lock);
 910
 911		if (last_inode) {
 912			iput(last_inode);
 913			last_inode = NULL;
 914		}
 915		if (old_cap) {
 916			ceph_put_cap(session->s_mdsc, old_cap);
 917			old_cap = NULL;
 918		}
 919
 920		ret = cb(inode, cap, arg);
 921		last_inode = inode;
 922
 923		spin_lock(&session->s_cap_lock);
 924		p = p->next;
 925		if (cap->ci == NULL) {
 926			dout("iterate_session_caps  finishing cap %p removal\n",
 927			     cap);
 928			BUG_ON(cap->session != session);
 929			list_del_init(&cap->session_caps);
 930			session->s_nr_caps--;
 931			cap->session = NULL;
 932			old_cap = cap;  /* put_cap it w/o locks held */
 933		}
 934		if (ret < 0)
 935			goto out;
 936	}
 937	ret = 0;
 938out:
 939	session->s_cap_iterator = NULL;
 940	spin_unlock(&session->s_cap_lock);
 941
 942	if (last_inode)
 943		iput(last_inode);
 944	if (old_cap)
 945		ceph_put_cap(session->s_mdsc, old_cap);
 946
 947	return ret;
 948}
 949
 950static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 951				  void *arg)
 952{
 953	struct ceph_inode_info *ci = ceph_inode(inode);
 954	int drop = 0;
 955
 956	dout("removing cap %p, ci is %p, inode is %p\n",
 957	     cap, ci, &ci->vfs_inode);
 958	spin_lock(&ci->i_ceph_lock);
 959	__ceph_remove_cap(cap);
 960	if (!__ceph_is_any_real_caps(ci)) {
 961		struct ceph_mds_client *mdsc =
 962			ceph_sb_to_client(inode->i_sb)->mdsc;
 963
 964		spin_lock(&mdsc->cap_dirty_lock);
 965		if (!list_empty(&ci->i_dirty_item)) {
 966			pr_info(" dropping dirty %s state for %p %lld\n",
 967				ceph_cap_string(ci->i_dirty_caps),
 968				inode, ceph_ino(inode));
 969			ci->i_dirty_caps = 0;
 970			list_del_init(&ci->i_dirty_item);
 971			drop = 1;
 972		}
 973		if (!list_empty(&ci->i_flushing_item)) {
 974			pr_info(" dropping dirty+flushing %s state for %p %lld\n",
 975				ceph_cap_string(ci->i_flushing_caps),
 976				inode, ceph_ino(inode));
 977			ci->i_flushing_caps = 0;
 978			list_del_init(&ci->i_flushing_item);
 979			mdsc->num_cap_flushing--;
 980			drop = 1;
 981		}
 982		if (drop && ci->i_wrbuffer_ref) {
 983			pr_info(" dropping dirty data for %p %lld\n",
 984				inode, ceph_ino(inode));
 985			ci->i_wrbuffer_ref = 0;
 986			ci->i_wrbuffer_ref_head = 0;
 987			drop++;
 988		}
 989		spin_unlock(&mdsc->cap_dirty_lock);
 990	}
 991	spin_unlock(&ci->i_ceph_lock);
 992	while (drop--)
 993		iput(inode);
 994	return 0;
 995}
 996
 997/*
 998 * caller must hold session s_mutex
 999 */
1000static void remove_session_caps(struct ceph_mds_session *session)
1001{
1002	dout("remove_session_caps on %p\n", session);
1003	iterate_session_caps(session, remove_session_caps_cb, NULL);
1004	BUG_ON(session->s_nr_caps > 0);
1005	BUG_ON(!list_empty(&session->s_cap_flushing));
1006	cleanup_cap_releases(session);
1007}
1008
1009/*
1010 * wake up any threads waiting on this session's caps.  if the cap is
1011 * old (didn't get renewed on the client reconnect), remove it now.
1012 *
1013 * caller must hold s_mutex.
1014 */
1015static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1016			      void *arg)
1017{
1018	struct ceph_inode_info *ci = ceph_inode(inode);
1019
1020	wake_up_all(&ci->i_cap_wq);
1021	if (arg) {
1022		spin_lock(&ci->i_ceph_lock);
1023		ci->i_wanted_max_size = 0;
1024		ci->i_requested_max_size = 0;
1025		spin_unlock(&ci->i_ceph_lock);
1026	}
1027	return 0;
1028}
1029
1030static void wake_up_session_caps(struct ceph_mds_session *session,
1031				 int reconnect)
1032{
1033	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1034	iterate_session_caps(session, wake_up_session_cb,
1035			     (void *)(unsigned long)reconnect);
1036}
1037
1038/*
1039 * Send periodic message to MDS renewing all currently held caps.  The
1040 * ack will reset the expiration for all caps from this session.
1041 *
1042 * caller holds s_mutex
1043 */
1044static int send_renew_caps(struct ceph_mds_client *mdsc,
1045			   struct ceph_mds_session *session)
1046{
1047	struct ceph_msg *msg;
1048	int state;
1049
1050	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1051	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1052		pr_info("mds%d caps stale\n", session->s_mds);
1053	session->s_renew_requested = jiffies;
1054
1055	/* do not try to renew caps until a recovering mds has reconnected
1056	 * with its clients. */
1057	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1058	if (state < CEPH_MDS_STATE_RECONNECT) {
1059		dout("send_renew_caps ignoring mds%d (%s)\n",
1060		     session->s_mds, ceph_mds_state_name(state));
1061		return 0;
1062	}
1063
1064	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1065		ceph_mds_state_name(state));
1066	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1067				 ++session->s_renew_seq);
1068	if (!msg)
1069		return -ENOMEM;
1070	ceph_con_send(&session->s_con, msg);
1071	return 0;
1072}
1073
1074/*
1075 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1076 *
1077 * Called under session->s_mutex
1078 */
1079static void renewed_caps(struct ceph_mds_client *mdsc,
1080			 struct ceph_mds_session *session, int is_renew)
1081{
1082	int was_stale;
1083	int wake = 0;
1084
1085	spin_lock(&session->s_cap_lock);
1086	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
 
1087
1088	session->s_cap_ttl = session->s_renew_requested +
1089		mdsc->mdsmap->m_session_timeout*HZ;
1090
1091	if (was_stale) {
1092		if (time_before(jiffies, session->s_cap_ttl)) {
1093			pr_info("mds%d caps renewed\n", session->s_mds);
1094			wake = 1;
1095		} else {
1096			pr_info("mds%d caps still stale\n", session->s_mds);
1097		}
1098	}
1099	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1100	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1101	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1102	spin_unlock(&session->s_cap_lock);
1103
1104	if (wake)
1105		wake_up_session_caps(session, 0);
1106}
1107
1108/*
1109 * send a session close request
1110 */
1111static int request_close_session(struct ceph_mds_client *mdsc,
1112				 struct ceph_mds_session *session)
1113{
1114	struct ceph_msg *msg;
1115
1116	dout("request_close_session mds%d state %s seq %lld\n",
1117	     session->s_mds, session_state_name(session->s_state),
1118	     session->s_seq);
1119	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1120	if (!msg)
1121		return -ENOMEM;
1122	ceph_con_send(&session->s_con, msg);
1123	return 0;
1124}
1125
1126/*
1127 * Called with s_mutex held.
1128 */
1129static int __close_session(struct ceph_mds_client *mdsc,
1130			 struct ceph_mds_session *session)
1131{
1132	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1133		return 0;
1134	session->s_state = CEPH_MDS_SESSION_CLOSING;
1135	return request_close_session(mdsc, session);
1136}
1137
1138/*
1139 * Trim old(er) caps.
1140 *
1141 * Because we can't cache an inode without one or more caps, we do
1142 * this indirectly: if a cap is unused, we prune its aliases, at which
1143 * point the inode will hopefully get dropped to.
1144 *
1145 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1146 * memory pressure from the MDS, though, so it needn't be perfect.
1147 */
1148static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1149{
1150	struct ceph_mds_session *session = arg;
1151	struct ceph_inode_info *ci = ceph_inode(inode);
1152	int used, oissued, mine;
1153
1154	if (session->s_trim_caps <= 0)
1155		return -1;
1156
1157	spin_lock(&ci->i_ceph_lock);
1158	mine = cap->issued | cap->implemented;
1159	used = __ceph_caps_used(ci);
1160	oissued = __ceph_caps_issued_other(ci, cap);
1161
1162	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1163	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1164	     ceph_cap_string(used));
1165	if (ci->i_dirty_caps)
1166		goto out;   /* dirty caps */
1167	if ((used & ~oissued) & mine)
1168		goto out;   /* we need these caps */
1169
1170	session->s_trim_caps--;
1171	if (oissued) {
1172		/* we aren't the only cap.. just remove us */
1173		__ceph_remove_cap(cap);
1174	} else {
1175		/* try to drop referring dentries */
1176		spin_unlock(&ci->i_ceph_lock);
1177		d_prune_aliases(inode);
1178		dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1179		     inode, cap, atomic_read(&inode->i_count));
1180		return 0;
1181	}
1182
1183out:
1184	spin_unlock(&ci->i_ceph_lock);
1185	return 0;
1186}
1187
1188/*
1189 * Trim session cap count down to some max number.
1190 */
1191static int trim_caps(struct ceph_mds_client *mdsc,
1192		     struct ceph_mds_session *session,
1193		     int max_caps)
1194{
1195	int trim_caps = session->s_nr_caps - max_caps;
1196
1197	dout("trim_caps mds%d start: %d / %d, trim %d\n",
1198	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1199	if (trim_caps > 0) {
1200		session->s_trim_caps = trim_caps;
1201		iterate_session_caps(session, trim_caps_cb, session);
1202		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1203		     session->s_mds, session->s_nr_caps, max_caps,
1204			trim_caps - session->s_trim_caps);
1205		session->s_trim_caps = 0;
1206	}
1207	return 0;
1208}
1209
1210/*
1211 * Allocate cap_release messages.  If there is a partially full message
1212 * in the queue, try to allocate enough to cover it's remainder, so that
1213 * we can send it immediately.
1214 *
1215 * Called under s_mutex.
1216 */
1217int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1218			  struct ceph_mds_session *session)
1219{
1220	struct ceph_msg *msg, *partial = NULL;
1221	struct ceph_mds_cap_release *head;
1222	int err = -ENOMEM;
1223	int extra = mdsc->fsc->mount_options->cap_release_safety;
1224	int num;
1225
1226	dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1227	     extra);
1228
1229	spin_lock(&session->s_cap_lock);
1230
1231	if (!list_empty(&session->s_cap_releases)) {
1232		msg = list_first_entry(&session->s_cap_releases,
1233				       struct ceph_msg,
1234				 list_head);
1235		head = msg->front.iov_base;
1236		num = le32_to_cpu(head->num);
1237		if (num) {
1238			dout(" partial %p with (%d/%d)\n", msg, num,
1239			     (int)CEPH_CAPS_PER_RELEASE);
1240			extra += CEPH_CAPS_PER_RELEASE - num;
1241			partial = msg;
1242		}
1243	}
1244	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1245		spin_unlock(&session->s_cap_lock);
1246		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1247				   GFP_NOFS, false);
1248		if (!msg)
1249			goto out_unlocked;
1250		dout("add_cap_releases %p msg %p now %d\n", session, msg,
1251		     (int)msg->front.iov_len);
1252		head = msg->front.iov_base;
1253		head->num = cpu_to_le32(0);
1254		msg->front.iov_len = sizeof(*head);
1255		spin_lock(&session->s_cap_lock);
1256		list_add(&msg->list_head, &session->s_cap_releases);
1257		session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1258	}
1259
1260	if (partial) {
1261		head = partial->front.iov_base;
1262		num = le32_to_cpu(head->num);
1263		dout(" queueing partial %p with %d/%d\n", partial, num,
1264		     (int)CEPH_CAPS_PER_RELEASE);
1265		list_move_tail(&partial->list_head,
1266			       &session->s_cap_releases_done);
1267		session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1268	}
1269	err = 0;
1270	spin_unlock(&session->s_cap_lock);
1271out_unlocked:
1272	return err;
1273}
1274
1275/*
1276 * flush all dirty inode data to disk.
1277 *
1278 * returns true if we've flushed through want_flush_seq
1279 */
1280static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1281{
1282	int mds, ret = 1;
1283
1284	dout("check_cap_flush want %lld\n", want_flush_seq);
1285	mutex_lock(&mdsc->mutex);
1286	for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1287		struct ceph_mds_session *session = mdsc->sessions[mds];
1288
1289		if (!session)
1290			continue;
1291		get_session(session);
1292		mutex_unlock(&mdsc->mutex);
1293
1294		mutex_lock(&session->s_mutex);
1295		if (!list_empty(&session->s_cap_flushing)) {
1296			struct ceph_inode_info *ci =
1297				list_entry(session->s_cap_flushing.next,
1298					   struct ceph_inode_info,
1299					   i_flushing_item);
1300			struct inode *inode = &ci->vfs_inode;
1301
1302			spin_lock(&ci->i_ceph_lock);
1303			if (ci->i_cap_flush_seq <= want_flush_seq) {
1304				dout("check_cap_flush still flushing %p "
1305				     "seq %lld <= %lld to mds%d\n", inode,
1306				     ci->i_cap_flush_seq, want_flush_seq,
1307				     session->s_mds);
1308				ret = 0;
1309			}
1310			spin_unlock(&ci->i_ceph_lock);
1311		}
1312		mutex_unlock(&session->s_mutex);
1313		ceph_put_mds_session(session);
1314
1315		if (!ret)
1316			return ret;
1317		mutex_lock(&mdsc->mutex);
1318	}
1319
1320	mutex_unlock(&mdsc->mutex);
1321	dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1322	return ret;
1323}
1324
1325/*
1326 * called under s_mutex
1327 */
1328void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1329			    struct ceph_mds_session *session)
1330{
1331	struct ceph_msg *msg;
1332
1333	dout("send_cap_releases mds%d\n", session->s_mds);
1334	spin_lock(&session->s_cap_lock);
1335	while (!list_empty(&session->s_cap_releases_done)) {
1336		msg = list_first_entry(&session->s_cap_releases_done,
1337				 struct ceph_msg, list_head);
1338		list_del_init(&msg->list_head);
1339		spin_unlock(&session->s_cap_lock);
1340		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1341		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1342		ceph_con_send(&session->s_con, msg);
1343		spin_lock(&session->s_cap_lock);
1344	}
1345	spin_unlock(&session->s_cap_lock);
1346}
1347
1348static void discard_cap_releases(struct ceph_mds_client *mdsc,
1349				 struct ceph_mds_session *session)
1350{
1351	struct ceph_msg *msg;
1352	struct ceph_mds_cap_release *head;
1353	unsigned num;
1354
1355	dout("discard_cap_releases mds%d\n", session->s_mds);
1356	spin_lock(&session->s_cap_lock);
1357
1358	/* zero out the in-progress message */
1359	msg = list_first_entry(&session->s_cap_releases,
1360			       struct ceph_msg, list_head);
1361	head = msg->front.iov_base;
1362	num = le32_to_cpu(head->num);
1363	dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1364	head->num = cpu_to_le32(0);
1365	session->s_num_cap_releases += num;
1366
1367	/* requeue completed messages */
1368	while (!list_empty(&session->s_cap_releases_done)) {
1369		msg = list_first_entry(&session->s_cap_releases_done,
1370				 struct ceph_msg, list_head);
1371		list_del_init(&msg->list_head);
1372
1373		head = msg->front.iov_base;
1374		num = le32_to_cpu(head->num);
1375		dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1376		     num);
1377		session->s_num_cap_releases += num;
1378		head->num = cpu_to_le32(0);
1379		msg->front.iov_len = sizeof(*head);
1380		list_add(&msg->list_head, &session->s_cap_releases);
1381	}
1382
1383	spin_unlock(&session->s_cap_lock);
1384}
1385
1386/*
1387 * requests
1388 */
1389
1390/*
1391 * Create an mds request.
1392 */
1393struct ceph_mds_request *
1394ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1395{
1396	struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1397
1398	if (!req)
1399		return ERR_PTR(-ENOMEM);
1400
1401	mutex_init(&req->r_fill_mutex);
1402	req->r_mdsc = mdsc;
1403	req->r_started = jiffies;
1404	req->r_resend_mds = -1;
1405	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1406	req->r_fmode = -1;
1407	kref_init(&req->r_kref);
1408	INIT_LIST_HEAD(&req->r_wait);
1409	init_completion(&req->r_completion);
1410	init_completion(&req->r_safe_completion);
1411	INIT_LIST_HEAD(&req->r_unsafe_item);
1412
1413	req->r_op = op;
1414	req->r_direct_mode = mode;
1415	return req;
1416}
1417
1418/*
1419 * return oldest (lowest) request, tid in request tree, 0 if none.
1420 *
1421 * called under mdsc->mutex.
1422 */
1423static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1424{
1425	if (RB_EMPTY_ROOT(&mdsc->request_tree))
1426		return NULL;
1427	return rb_entry(rb_first(&mdsc->request_tree),
1428			struct ceph_mds_request, r_node);
1429}
1430
1431static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1432{
1433	struct ceph_mds_request *req = __get_oldest_req(mdsc);
1434
1435	if (req)
1436		return req->r_tid;
1437	return 0;
1438}
1439
1440/*
1441 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1442 * on build_path_from_dentry in fs/cifs/dir.c.
1443 *
1444 * If @stop_on_nosnap, generate path relative to the first non-snapped
1445 * inode.
1446 *
1447 * Encode hidden .snap dirs as a double /, i.e.
1448 *   foo/.snap/bar -> foo//bar
1449 */
1450char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1451			   int stop_on_nosnap)
1452{
1453	struct dentry *temp;
1454	char *path;
1455	int len, pos;
1456	unsigned seq;
1457
1458	if (dentry == NULL)
1459		return ERR_PTR(-EINVAL);
1460
1461retry:
1462	len = 0;
1463	seq = read_seqbegin(&rename_lock);
1464	rcu_read_lock();
1465	for (temp = dentry; !IS_ROOT(temp);) {
1466		struct inode *inode = temp->d_inode;
1467		if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1468			len++;  /* slash only */
1469		else if (stop_on_nosnap && inode &&
1470			 ceph_snap(inode) == CEPH_NOSNAP)
1471			break;
1472		else
1473			len += 1 + temp->d_name.len;
1474		temp = temp->d_parent;
1475		if (temp == NULL) {
1476			rcu_read_unlock();
1477			pr_err("build_path corrupt dentry %p\n", dentry);
1478			return ERR_PTR(-EINVAL);
1479		}
1480	}
1481	rcu_read_unlock();
1482	if (len)
1483		len--;  /* no leading '/' */
1484
1485	path = kmalloc(len+1, GFP_NOFS);
1486	if (path == NULL)
1487		return ERR_PTR(-ENOMEM);
1488	pos = len;
1489	path[pos] = 0;	/* trailing null */
1490	rcu_read_lock();
1491	for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1492		struct inode *inode;
1493
1494		spin_lock(&temp->d_lock);
1495		inode = temp->d_inode;
1496		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1497			dout("build_path path+%d: %p SNAPDIR\n",
1498			     pos, temp);
1499		} else if (stop_on_nosnap && inode &&
1500			   ceph_snap(inode) == CEPH_NOSNAP) {
1501			spin_unlock(&temp->d_lock);
1502			break;
1503		} else {
1504			pos -= temp->d_name.len;
1505			if (pos < 0) {
1506				spin_unlock(&temp->d_lock);
1507				break;
1508			}
1509			strncpy(path + pos, temp->d_name.name,
1510				temp->d_name.len);
1511		}
1512		spin_unlock(&temp->d_lock);
1513		if (pos)
1514			path[--pos] = '/';
1515		temp = temp->d_parent;
1516		if (temp == NULL) {
1517			rcu_read_unlock();
1518			pr_err("build_path corrupt dentry\n");
1519			kfree(path);
1520			return ERR_PTR(-EINVAL);
1521		}
1522	}
1523	rcu_read_unlock();
1524	if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1525		pr_err("build_path did not end path lookup where "
1526		       "expected, namelen is %d, pos is %d\n", len, pos);
1527		/* presumably this is only possible if racing with a
1528		   rename of one of the parent directories (we can not
1529		   lock the dentries above us to prevent this, but
1530		   retrying should be harmless) */
1531		kfree(path);
1532		goto retry;
1533	}
1534
1535	*base = ceph_ino(temp->d_inode);
1536	*plen = len;
1537	dout("build_path on %p %d built %llx '%.*s'\n",
1538	     dentry, dentry->d_count, *base, len, path);
1539	return path;
1540}
1541
1542static int build_dentry_path(struct dentry *dentry,
1543			     const char **ppath, int *ppathlen, u64 *pino,
1544			     int *pfreepath)
1545{
1546	char *path;
1547
1548	if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1549		*pino = ceph_ino(dentry->d_parent->d_inode);
1550		*ppath = dentry->d_name.name;
1551		*ppathlen = dentry->d_name.len;
1552		return 0;
1553	}
1554	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1555	if (IS_ERR(path))
1556		return PTR_ERR(path);
1557	*ppath = path;
1558	*pfreepath = 1;
1559	return 0;
1560}
1561
1562static int build_inode_path(struct inode *inode,
1563			    const char **ppath, int *ppathlen, u64 *pino,
1564			    int *pfreepath)
1565{
1566	struct dentry *dentry;
1567	char *path;
1568
1569	if (ceph_snap(inode) == CEPH_NOSNAP) {
1570		*pino = ceph_ino(inode);
1571		*ppathlen = 0;
1572		return 0;
1573	}
1574	dentry = d_find_alias(inode);
1575	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1576	dput(dentry);
1577	if (IS_ERR(path))
1578		return PTR_ERR(path);
1579	*ppath = path;
1580	*pfreepath = 1;
1581	return 0;
1582}
1583
1584/*
1585 * request arguments may be specified via an inode *, a dentry *, or
1586 * an explicit ino+path.
1587 */
1588static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1589				  const char *rpath, u64 rino,
1590				  const char **ppath, int *pathlen,
1591				  u64 *ino, int *freepath)
1592{
1593	int r = 0;
1594
1595	if (rinode) {
1596		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1597		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1598		     ceph_snap(rinode));
1599	} else if (rdentry) {
1600		r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1601		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1602		     *ppath);
1603	} else if (rpath || rino) {
1604		*ino = rino;
1605		*ppath = rpath;
1606		*pathlen = strlen(rpath);
1607		dout(" path %.*s\n", *pathlen, rpath);
1608	}
1609
1610	return r;
1611}
1612
1613/*
1614 * called under mdsc->mutex
1615 */
1616static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1617					       struct ceph_mds_request *req,
1618					       int mds)
1619{
1620	struct ceph_msg *msg;
1621	struct ceph_mds_request_head *head;
1622	const char *path1 = NULL;
1623	const char *path2 = NULL;
1624	u64 ino1 = 0, ino2 = 0;
1625	int pathlen1 = 0, pathlen2 = 0;
1626	int freepath1 = 0, freepath2 = 0;
1627	int len;
1628	u16 releases;
1629	void *p, *end;
1630	int ret;
1631
1632	ret = set_request_path_attr(req->r_inode, req->r_dentry,
1633			      req->r_path1, req->r_ino1.ino,
1634			      &path1, &pathlen1, &ino1, &freepath1);
1635	if (ret < 0) {
1636		msg = ERR_PTR(ret);
1637		goto out;
1638	}
1639
1640	ret = set_request_path_attr(NULL, req->r_old_dentry,
1641			      req->r_path2, req->r_ino2.ino,
1642			      &path2, &pathlen2, &ino2, &freepath2);
1643	if (ret < 0) {
1644		msg = ERR_PTR(ret);
1645		goto out_free1;
1646	}
1647
1648	len = sizeof(*head) +
1649		pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1650
1651	/* calculate (max) length for cap releases */
1652	len += sizeof(struct ceph_mds_request_release) *
1653		(!!req->r_inode_drop + !!req->r_dentry_drop +
1654		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1655	if (req->r_dentry_drop)
1656		len += req->r_dentry->d_name.len;
1657	if (req->r_old_dentry_drop)
1658		len += req->r_old_dentry->d_name.len;
1659
1660	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1661	if (!msg) {
1662		msg = ERR_PTR(-ENOMEM);
1663		goto out_free2;
1664	}
1665
1666	msg->hdr.tid = cpu_to_le64(req->r_tid);
1667
1668	head = msg->front.iov_base;
1669	p = msg->front.iov_base + sizeof(*head);
1670	end = msg->front.iov_base + msg->front.iov_len;
1671
1672	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1673	head->op = cpu_to_le32(req->r_op);
1674	head->caller_uid = cpu_to_le32(req->r_uid);
1675	head->caller_gid = cpu_to_le32(req->r_gid);
1676	head->args = req->r_args;
1677
1678	ceph_encode_filepath(&p, end, ino1, path1);
1679	ceph_encode_filepath(&p, end, ino2, path2);
1680
1681	/* make note of release offset, in case we need to replay */
1682	req->r_request_release_offset = p - msg->front.iov_base;
1683
1684	/* cap releases */
1685	releases = 0;
1686	if (req->r_inode_drop)
1687		releases += ceph_encode_inode_release(&p,
1688		      req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1689		      mds, req->r_inode_drop, req->r_inode_unless, 0);
1690	if (req->r_dentry_drop)
1691		releases += ceph_encode_dentry_release(&p, req->r_dentry,
1692		       mds, req->r_dentry_drop, req->r_dentry_unless);
1693	if (req->r_old_dentry_drop)
1694		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1695		       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1696	if (req->r_old_inode_drop)
1697		releases += ceph_encode_inode_release(&p,
1698		      req->r_old_dentry->d_inode,
1699		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1700	head->num_releases = cpu_to_le16(releases);
1701
1702	BUG_ON(p > end);
1703	msg->front.iov_len = p - msg->front.iov_base;
1704	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1705
1706	msg->pages = req->r_pages;
1707	msg->nr_pages = req->r_num_pages;
1708	msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1709	msg->hdr.data_off = cpu_to_le16(0);
1710
1711out_free2:
1712	if (freepath2)
1713		kfree((char *)path2);
1714out_free1:
1715	if (freepath1)
1716		kfree((char *)path1);
1717out:
1718	return msg;
1719}
1720
1721/*
1722 * called under mdsc->mutex if error, under no mutex if
1723 * success.
1724 */
1725static void complete_request(struct ceph_mds_client *mdsc,
1726			     struct ceph_mds_request *req)
1727{
1728	if (req->r_callback)
1729		req->r_callback(mdsc, req);
1730	else
1731		complete_all(&req->r_completion);
1732}
1733
1734/*
1735 * called under mdsc->mutex
1736 */
1737static int __prepare_send_request(struct ceph_mds_client *mdsc,
1738				  struct ceph_mds_request *req,
1739				  int mds)
1740{
1741	struct ceph_mds_request_head *rhead;
1742	struct ceph_msg *msg;
1743	int flags = 0;
1744
1745	req->r_attempts++;
1746	if (req->r_inode) {
1747		struct ceph_cap *cap =
1748			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1749
1750		if (cap)
1751			req->r_sent_on_mseq = cap->mseq;
1752		else
1753			req->r_sent_on_mseq = -1;
1754	}
1755	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1756	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1757
1758	if (req->r_got_unsafe) {
1759		/*
1760		 * Replay.  Do not regenerate message (and rebuild
1761		 * paths, etc.); just use the original message.
1762		 * Rebuilding paths will break for renames because
1763		 * d_move mangles the src name.
1764		 */
1765		msg = req->r_request;
1766		rhead = msg->front.iov_base;
1767
1768		flags = le32_to_cpu(rhead->flags);
1769		flags |= CEPH_MDS_FLAG_REPLAY;
1770		rhead->flags = cpu_to_le32(flags);
1771
1772		if (req->r_target_inode)
1773			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1774
1775		rhead->num_retry = req->r_attempts - 1;
1776
1777		/* remove cap/dentry releases from message */
1778		rhead->num_releases = 0;
1779		msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1780		msg->front.iov_len = req->r_request_release_offset;
1781		return 0;
1782	}
1783
1784	if (req->r_request) {
1785		ceph_msg_put(req->r_request);
1786		req->r_request = NULL;
1787	}
1788	msg = create_request_message(mdsc, req, mds);
1789	if (IS_ERR(msg)) {
1790		req->r_err = PTR_ERR(msg);
1791		complete_request(mdsc, req);
1792		return PTR_ERR(msg);
1793	}
1794	req->r_request = msg;
1795
1796	rhead = msg->front.iov_base;
1797	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1798	if (req->r_got_unsafe)
1799		flags |= CEPH_MDS_FLAG_REPLAY;
1800	if (req->r_locked_dir)
1801		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1802	rhead->flags = cpu_to_le32(flags);
1803	rhead->num_fwd = req->r_num_fwd;
1804	rhead->num_retry = req->r_attempts - 1;
1805	rhead->ino = 0;
1806
1807	dout(" r_locked_dir = %p\n", req->r_locked_dir);
1808	return 0;
1809}
1810
1811/*
1812 * send request, or put it on the appropriate wait list.
1813 */
1814static int __do_request(struct ceph_mds_client *mdsc,
1815			struct ceph_mds_request *req)
1816{
1817	struct ceph_mds_session *session = NULL;
1818	int mds = -1;
1819	int err = -EAGAIN;
1820
1821	if (req->r_err || req->r_got_result)
1822		goto out;
1823
1824	if (req->r_timeout &&
1825	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1826		dout("do_request timed out\n");
1827		err = -EIO;
1828		goto finish;
1829	}
1830
1831	put_request_session(req);
1832
1833	mds = __choose_mds(mdsc, req);
1834	if (mds < 0 ||
1835	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1836		dout("do_request no mds or not active, waiting for map\n");
1837		list_add(&req->r_wait, &mdsc->waiting_for_map);
1838		goto out;
1839	}
1840
1841	/* get, open session */
1842	session = __ceph_lookup_mds_session(mdsc, mds);
1843	if (!session) {
1844		session = register_session(mdsc, mds);
1845		if (IS_ERR(session)) {
1846			err = PTR_ERR(session);
1847			goto finish;
1848		}
1849	}
1850	req->r_session = get_session(session);
1851
1852	dout("do_request mds%d session %p state %s\n", mds, session,
1853	     session_state_name(session->s_state));
1854	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1855	    session->s_state != CEPH_MDS_SESSION_HUNG) {
1856		if (session->s_state == CEPH_MDS_SESSION_NEW ||
1857		    session->s_state == CEPH_MDS_SESSION_CLOSING)
1858			__open_session(mdsc, session);
1859		list_add(&req->r_wait, &session->s_waiting);
1860		goto out_session;
1861	}
1862
1863	/* send request */
1864	req->r_resend_mds = -1;   /* forget any previous mds hint */
1865
1866	if (req->r_request_started == 0)   /* note request start time */
1867		req->r_request_started = jiffies;
1868
1869	err = __prepare_send_request(mdsc, req, mds);
1870	if (!err) {
1871		ceph_msg_get(req->r_request);
1872		ceph_con_send(&session->s_con, req->r_request);
1873	}
1874
1875out_session:
1876	ceph_put_mds_session(session);
1877out:
1878	return err;
1879
1880finish:
1881	req->r_err = err;
1882	complete_request(mdsc, req);
1883	goto out;
1884}
1885
1886/*
1887 * called under mdsc->mutex
1888 */
1889static void __wake_requests(struct ceph_mds_client *mdsc,
1890			    struct list_head *head)
1891{
1892	struct ceph_mds_request *req, *nreq;
1893
1894	list_for_each_entry_safe(req, nreq, head, r_wait) {
1895		list_del_init(&req->r_wait);
1896		__do_request(mdsc, req);
1897	}
1898}
1899
1900/*
1901 * Wake up threads with requests pending for @mds, so that they can
1902 * resubmit their requests to a possibly different mds.
1903 */
1904static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1905{
1906	struct ceph_mds_request *req;
1907	struct rb_node *p;
1908
1909	dout("kick_requests mds%d\n", mds);
1910	for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1911		req = rb_entry(p, struct ceph_mds_request, r_node);
1912		if (req->r_got_unsafe)
1913			continue;
1914		if (req->r_session &&
1915		    req->r_session->s_mds == mds) {
1916			dout(" kicking tid %llu\n", req->r_tid);
1917			__do_request(mdsc, req);
1918		}
1919	}
1920}
1921
1922void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1923			      struct ceph_mds_request *req)
1924{
1925	dout("submit_request on %p\n", req);
1926	mutex_lock(&mdsc->mutex);
1927	__register_request(mdsc, req, NULL);
1928	__do_request(mdsc, req);
1929	mutex_unlock(&mdsc->mutex);
1930}
1931
1932/*
1933 * Synchrously perform an mds request.  Take care of all of the
1934 * session setup, forwarding, retry details.
1935 */
1936int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1937			 struct inode *dir,
1938			 struct ceph_mds_request *req)
1939{
1940	int err;
1941
1942	dout("do_request on %p\n", req);
1943
1944	/* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1945	if (req->r_inode)
1946		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1947	if (req->r_locked_dir)
1948		ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1949	if (req->r_old_dentry)
1950		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
1951				  CEPH_CAP_PIN);
1952
1953	/* issue */
1954	mutex_lock(&mdsc->mutex);
1955	__register_request(mdsc, req, dir);
1956	__do_request(mdsc, req);
1957
1958	if (req->r_err) {
1959		err = req->r_err;
1960		__unregister_request(mdsc, req);
1961		dout("do_request early error %d\n", err);
1962		goto out;
1963	}
1964
1965	/* wait */
1966	mutex_unlock(&mdsc->mutex);
1967	dout("do_request waiting\n");
1968	if (req->r_timeout) {
1969		err = (long)wait_for_completion_killable_timeout(
1970			&req->r_completion, req->r_timeout);
1971		if (err == 0)
1972			err = -EIO;
1973	} else {
1974		err = wait_for_completion_killable(&req->r_completion);
1975	}
1976	dout("do_request waited, got %d\n", err);
1977	mutex_lock(&mdsc->mutex);
1978
1979	/* only abort if we didn't race with a real reply */
1980	if (req->r_got_result) {
1981		err = le32_to_cpu(req->r_reply_info.head->result);
1982	} else if (err < 0) {
1983		dout("aborted request %lld with %d\n", req->r_tid, err);
1984
1985		/*
1986		 * ensure we aren't running concurrently with
1987		 * ceph_fill_trace or ceph_readdir_prepopulate, which
1988		 * rely on locks (dir mutex) held by our caller.
1989		 */
1990		mutex_lock(&req->r_fill_mutex);
1991		req->r_err = err;
1992		req->r_aborted = true;
1993		mutex_unlock(&req->r_fill_mutex);
1994
1995		if (req->r_locked_dir &&
1996		    (req->r_op & CEPH_MDS_OP_WRITE))
1997			ceph_invalidate_dir_request(req);
1998	} else {
1999		err = req->r_err;
2000	}
2001
2002out:
2003	mutex_unlock(&mdsc->mutex);
2004	dout("do_request %p done, result %d\n", req, err);
2005	return err;
2006}
2007
2008/*
2009 * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS
2010 * namespace request.
2011 */
2012void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2013{
2014	struct inode *inode = req->r_locked_dir;
2015	struct ceph_inode_info *ci = ceph_inode(inode);
2016
2017	dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode);
2018	spin_lock(&ci->i_ceph_lock);
2019	ceph_dir_clear_complete(inode);
2020	ci->i_release_count++;
2021	spin_unlock(&ci->i_ceph_lock);
2022
2023	if (req->r_dentry)
2024		ceph_invalidate_dentry_lease(req->r_dentry);
2025	if (req->r_old_dentry)
2026		ceph_invalidate_dentry_lease(req->r_old_dentry);
2027}
2028
2029/*
2030 * Handle mds reply.
2031 *
2032 * We take the session mutex and parse and process the reply immediately.
2033 * This preserves the logical ordering of replies, capabilities, etc., sent
2034 * by the MDS as they are applied to our local cache.
2035 */
2036static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2037{
2038	struct ceph_mds_client *mdsc = session->s_mdsc;
2039	struct ceph_mds_request *req;
2040	struct ceph_mds_reply_head *head = msg->front.iov_base;
2041	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2042	u64 tid;
2043	int err, result;
2044	int mds = session->s_mds;
2045
2046	if (msg->front.iov_len < sizeof(*head)) {
2047		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2048		ceph_msg_dump(msg);
2049		return;
2050	}
2051
2052	/* get request, session */
2053	tid = le64_to_cpu(msg->hdr.tid);
2054	mutex_lock(&mdsc->mutex);
2055	req = __lookup_request(mdsc, tid);
2056	if (!req) {
2057		dout("handle_reply on unknown tid %llu\n", tid);
2058		mutex_unlock(&mdsc->mutex);
2059		return;
2060	}
2061	dout("handle_reply %p\n", req);
2062
2063	/* correct session? */
2064	if (req->r_session != session) {
2065		pr_err("mdsc_handle_reply got %llu on session mds%d"
2066		       " not mds%d\n", tid, session->s_mds,
2067		       req->r_session ? req->r_session->s_mds : -1);
2068		mutex_unlock(&mdsc->mutex);
2069		goto out;
2070	}
2071
2072	/* dup? */
2073	if ((req->r_got_unsafe && !head->safe) ||
2074	    (req->r_got_safe && head->safe)) {
2075		pr_warning("got a dup %s reply on %llu from mds%d\n",
2076			   head->safe ? "safe" : "unsafe", tid, mds);
2077		mutex_unlock(&mdsc->mutex);
2078		goto out;
2079	}
2080	if (req->r_got_safe && !head->safe) {
2081		pr_warning("got unsafe after safe on %llu from mds%d\n",
2082			   tid, mds);
2083		mutex_unlock(&mdsc->mutex);
2084		goto out;
2085	}
2086
2087	result = le32_to_cpu(head->result);
2088
2089	/*
2090	 * Handle an ESTALE
2091	 * if we're not talking to the authority, send to them
2092	 * if the authority has changed while we weren't looking,
2093	 * send to new authority
2094	 * Otherwise we just have to return an ESTALE
2095	 */
2096	if (result == -ESTALE) {
2097		dout("got ESTALE on request %llu", req->r_tid);
2098		if (!req->r_inode) {
2099			/* do nothing; not an authority problem */
2100		} else if (req->r_direct_mode != USE_AUTH_MDS) {
2101			dout("not using auth, setting for that now");
2102			req->r_direct_mode = USE_AUTH_MDS;
2103			__do_request(mdsc, req);
2104			mutex_unlock(&mdsc->mutex);
2105			goto out;
2106		} else  {
2107			struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2108			struct ceph_cap *cap = NULL;
2109
2110			if (req->r_session)
2111				cap = ceph_get_cap_for_mds(ci,
2112						   req->r_session->s_mds);
2113
2114			dout("already using auth");
2115			if ((!cap || cap != ci->i_auth_cap) ||
2116			    (cap->mseq != req->r_sent_on_mseq)) {
2117				dout("but cap changed, so resending");
2118				__do_request(mdsc, req);
2119				mutex_unlock(&mdsc->mutex);
2120				goto out;
2121			}
2122		}
2123		dout("have to return ESTALE on request %llu", req->r_tid);
2124	}
2125
2126
2127	if (head->safe) {
2128		req->r_got_safe = true;
2129		__unregister_request(mdsc, req);
2130		complete_all(&req->r_safe_completion);
2131
2132		if (req->r_got_unsafe) {
2133			/*
2134			 * We already handled the unsafe response, now do the
2135			 * cleanup.  No need to examine the response; the MDS
2136			 * doesn't include any result info in the safe
2137			 * response.  And even if it did, there is nothing
2138			 * useful we could do with a revised return value.
2139			 */
2140			dout("got safe reply %llu, mds%d\n", tid, mds);
2141			list_del_init(&req->r_unsafe_item);
2142
2143			/* last unsafe request during umount? */
2144			if (mdsc->stopping && !__get_oldest_req(mdsc))
2145				complete_all(&mdsc->safe_umount_waiters);
2146			mutex_unlock(&mdsc->mutex);
2147			goto out;
2148		}
2149	} else {
2150		req->r_got_unsafe = true;
2151		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2152	}
2153
2154	dout("handle_reply tid %lld result %d\n", tid, result);
2155	rinfo = &req->r_reply_info;
2156	err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2157	mutex_unlock(&mdsc->mutex);
2158
2159	mutex_lock(&session->s_mutex);
2160	if (err < 0) {
2161		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2162		ceph_msg_dump(msg);
2163		goto out_err;
2164	}
2165
2166	/* snap trace */
2167	if (rinfo->snapblob_len) {
2168		down_write(&mdsc->snap_rwsem);
2169		ceph_update_snap_trace(mdsc, rinfo->snapblob,
2170			       rinfo->snapblob + rinfo->snapblob_len,
2171			       le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2172		downgrade_write(&mdsc->snap_rwsem);
2173	} else {
2174		down_read(&mdsc->snap_rwsem);
2175	}
2176
2177	/* insert trace into our cache */
2178	mutex_lock(&req->r_fill_mutex);
2179	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2180	if (err == 0) {
2181		if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2182		    rinfo->dir_nr)
2183			ceph_readdir_prepopulate(req, req->r_session);
2184		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2185	}
2186	mutex_unlock(&req->r_fill_mutex);
2187
2188	up_read(&mdsc->snap_rwsem);
2189out_err:
2190	mutex_lock(&mdsc->mutex);
2191	if (!req->r_aborted) {
2192		if (err) {
2193			req->r_err = err;
2194		} else {
2195			req->r_reply = msg;
2196			ceph_msg_get(msg);
2197			req->r_got_result = true;
2198		}
2199	} else {
2200		dout("reply arrived after request %lld was aborted\n", tid);
2201	}
2202	mutex_unlock(&mdsc->mutex);
2203
2204	ceph_add_cap_releases(mdsc, req->r_session);
2205	mutex_unlock(&session->s_mutex);
2206
2207	/* kick calling process */
2208	complete_request(mdsc, req);
2209out:
2210	ceph_mdsc_put_request(req);
2211	return;
2212}
2213
2214
2215
2216/*
2217 * handle mds notification that our request has been forwarded.
2218 */
2219static void handle_forward(struct ceph_mds_client *mdsc,
2220			   struct ceph_mds_session *session,
2221			   struct ceph_msg *msg)
2222{
2223	struct ceph_mds_request *req;
2224	u64 tid = le64_to_cpu(msg->hdr.tid);
2225	u32 next_mds;
2226	u32 fwd_seq;
2227	int err = -EINVAL;
2228	void *p = msg->front.iov_base;
2229	void *end = p + msg->front.iov_len;
2230
2231	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2232	next_mds = ceph_decode_32(&p);
2233	fwd_seq = ceph_decode_32(&p);
2234
2235	mutex_lock(&mdsc->mutex);
2236	req = __lookup_request(mdsc, tid);
2237	if (!req) {
2238		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2239		goto out;  /* dup reply? */
2240	}
2241
2242	if (req->r_aborted) {
2243		dout("forward tid %llu aborted, unregistering\n", tid);
2244		__unregister_request(mdsc, req);
2245	} else if (fwd_seq <= req->r_num_fwd) {
2246		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2247		     tid, next_mds, req->r_num_fwd, fwd_seq);
2248	} else {
2249		/* resend. forward race not possible; mds would drop */
2250		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2251		BUG_ON(req->r_err);
2252		BUG_ON(req->r_got_result);
2253		req->r_num_fwd = fwd_seq;
2254		req->r_resend_mds = next_mds;
2255		put_request_session(req);
2256		__do_request(mdsc, req);
2257	}
2258	ceph_mdsc_put_request(req);
2259out:
2260	mutex_unlock(&mdsc->mutex);
2261	return;
2262
2263bad:
2264	pr_err("mdsc_handle_forward decode error err=%d\n", err);
2265}
2266
2267/*
2268 * handle a mds session control message
2269 */
2270static void handle_session(struct ceph_mds_session *session,
2271			   struct ceph_msg *msg)
2272{
2273	struct ceph_mds_client *mdsc = session->s_mdsc;
2274	u32 op;
2275	u64 seq;
2276	int mds = session->s_mds;
2277	struct ceph_mds_session_head *h = msg->front.iov_base;
2278	int wake = 0;
2279
2280	/* decode */
2281	if (msg->front.iov_len != sizeof(*h))
2282		goto bad;
2283	op = le32_to_cpu(h->op);
2284	seq = le64_to_cpu(h->seq);
2285
2286	mutex_lock(&mdsc->mutex);
2287	if (op == CEPH_SESSION_CLOSE)
2288		__unregister_session(mdsc, session);
2289	/* FIXME: this ttl calculation is generous */
2290	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2291	mutex_unlock(&mdsc->mutex);
2292
2293	mutex_lock(&session->s_mutex);
2294
2295	dout("handle_session mds%d %s %p state %s seq %llu\n",
2296	     mds, ceph_session_op_name(op), session,
2297	     session_state_name(session->s_state), seq);
2298
2299	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2300		session->s_state = CEPH_MDS_SESSION_OPEN;
2301		pr_info("mds%d came back\n", session->s_mds);
2302	}
2303
2304	switch (op) {
2305	case CEPH_SESSION_OPEN:
2306		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2307			pr_info("mds%d reconnect success\n", session->s_mds);
2308		session->s_state = CEPH_MDS_SESSION_OPEN;
2309		renewed_caps(mdsc, session, 0);
2310		wake = 1;
2311		if (mdsc->stopping)
2312			__close_session(mdsc, session);
2313		break;
2314
2315	case CEPH_SESSION_RENEWCAPS:
2316		if (session->s_renew_seq == seq)
2317			renewed_caps(mdsc, session, 1);
2318		break;
2319
2320	case CEPH_SESSION_CLOSE:
2321		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2322			pr_info("mds%d reconnect denied\n", session->s_mds);
2323		remove_session_caps(session);
2324		wake = 1; /* for good measure */
2325		wake_up_all(&mdsc->session_close_wq);
2326		kick_requests(mdsc, mds);
2327		break;
2328
2329	case CEPH_SESSION_STALE:
2330		pr_info("mds%d caps went stale, renewing\n",
2331			session->s_mds);
2332		spin_lock(&session->s_gen_ttl_lock);
2333		session->s_cap_gen++;
2334		session->s_cap_ttl = jiffies - 1;
2335		spin_unlock(&session->s_gen_ttl_lock);
2336		send_renew_caps(mdsc, session);
2337		break;
2338
2339	case CEPH_SESSION_RECALL_STATE:
2340		trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2341		break;
2342
2343	default:
2344		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2345		WARN_ON(1);
2346	}
2347
2348	mutex_unlock(&session->s_mutex);
2349	if (wake) {
2350		mutex_lock(&mdsc->mutex);
2351		__wake_requests(mdsc, &session->s_waiting);
2352		mutex_unlock(&mdsc->mutex);
2353	}
2354	return;
2355
2356bad:
2357	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2358	       (int)msg->front.iov_len);
2359	ceph_msg_dump(msg);
2360	return;
2361}
2362
2363
2364/*
2365 * called under session->mutex.
2366 */
2367static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2368				   struct ceph_mds_session *session)
2369{
2370	struct ceph_mds_request *req, *nreq;
2371	int err;
2372
2373	dout("replay_unsafe_requests mds%d\n", session->s_mds);
2374
2375	mutex_lock(&mdsc->mutex);
2376	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2377		err = __prepare_send_request(mdsc, req, session->s_mds);
2378		if (!err) {
2379			ceph_msg_get(req->r_request);
2380			ceph_con_send(&session->s_con, req->r_request);
2381		}
2382	}
2383	mutex_unlock(&mdsc->mutex);
2384}
2385
2386/*
2387 * Encode information about a cap for a reconnect with the MDS.
2388 */
2389static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2390			  void *arg)
2391{
2392	union {
2393		struct ceph_mds_cap_reconnect v2;
2394		struct ceph_mds_cap_reconnect_v1 v1;
2395	} rec;
2396	size_t reclen;
2397	struct ceph_inode_info *ci;
2398	struct ceph_reconnect_state *recon_state = arg;
2399	struct ceph_pagelist *pagelist = recon_state->pagelist;
2400	char *path;
2401	int pathlen, err;
2402	u64 pathbase;
2403	struct dentry *dentry;
2404
2405	ci = cap->ci;
2406
2407	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2408	     inode, ceph_vinop(inode), cap, cap->cap_id,
2409	     ceph_cap_string(cap->issued));
2410	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2411	if (err)
2412		return err;
2413
2414	dentry = d_find_alias(inode);
2415	if (dentry) {
2416		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2417		if (IS_ERR(path)) {
2418			err = PTR_ERR(path);
2419			goto out_dput;
2420		}
2421	} else {
2422		path = NULL;
2423		pathlen = 0;
2424	}
2425	err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2426	if (err)
2427		goto out_free;
2428
2429	spin_lock(&ci->i_ceph_lock);
2430	cap->seq = 0;        /* reset cap seq */
2431	cap->issue_seq = 0;  /* and issue_seq */
2432
2433	if (recon_state->flock) {
2434		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2435		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2436		rec.v2.issued = cpu_to_le32(cap->issued);
2437		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2438		rec.v2.pathbase = cpu_to_le64(pathbase);
2439		rec.v2.flock_len = 0;
2440		reclen = sizeof(rec.v2);
2441	} else {
2442		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2443		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2444		rec.v1.issued = cpu_to_le32(cap->issued);
2445		rec.v1.size = cpu_to_le64(inode->i_size);
2446		ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2447		ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2448		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2449		rec.v1.pathbase = cpu_to_le64(pathbase);
2450		reclen = sizeof(rec.v1);
2451	}
2452	spin_unlock(&ci->i_ceph_lock);
2453
2454	if (recon_state->flock) {
2455		int num_fcntl_locks, num_flock_locks;
2456		struct ceph_pagelist_cursor trunc_point;
2457
2458		ceph_pagelist_set_cursor(pagelist, &trunc_point);
2459		do {
2460			lock_flocks();
2461			ceph_count_locks(inode, &num_fcntl_locks,
2462					 &num_flock_locks);
2463			rec.v2.flock_len = (2*sizeof(u32) +
2464					    (num_fcntl_locks+num_flock_locks) *
2465					    sizeof(struct ceph_filelock));
2466			unlock_flocks();
2467
2468			/* pre-alloc pagelist */
2469			ceph_pagelist_truncate(pagelist, &trunc_point);
2470			err = ceph_pagelist_append(pagelist, &rec, reclen);
2471			if (!err)
2472				err = ceph_pagelist_reserve(pagelist,
2473							    rec.v2.flock_len);
2474
2475			/* encode locks */
2476			if (!err) {
2477				lock_flocks();
2478				err = ceph_encode_locks(inode,
2479							pagelist,
2480							num_fcntl_locks,
2481							num_flock_locks);
2482				unlock_flocks();
2483			}
2484		} while (err == -ENOSPC);
2485	} else {
2486		err = ceph_pagelist_append(pagelist, &rec, reclen);
2487	}
2488
2489out_free:
2490	kfree(path);
2491out_dput:
2492	dput(dentry);
2493	return err;
2494}
2495
2496
2497/*
2498 * If an MDS fails and recovers, clients need to reconnect in order to
2499 * reestablish shared state.  This includes all caps issued through
2500 * this session _and_ the snap_realm hierarchy.  Because it's not
2501 * clear which snap realms the mds cares about, we send everything we
2502 * know about.. that ensures we'll then get any new info the
2503 * recovering MDS might have.
2504 *
2505 * This is a relatively heavyweight operation, but it's rare.
2506 *
2507 * called with mdsc->mutex held.
2508 */
2509static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2510			       struct ceph_mds_session *session)
2511{
2512	struct ceph_msg *reply;
2513	struct rb_node *p;
2514	int mds = session->s_mds;
2515	int err = -ENOMEM;
2516	struct ceph_pagelist *pagelist;
2517	struct ceph_reconnect_state recon_state;
2518
2519	pr_info("mds%d reconnect start\n", mds);
2520
2521	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2522	if (!pagelist)
2523		goto fail_nopagelist;
2524	ceph_pagelist_init(pagelist);
2525
2526	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2527	if (!reply)
2528		goto fail_nomsg;
2529
2530	mutex_lock(&session->s_mutex);
2531	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2532	session->s_seq = 0;
2533
2534	ceph_con_open(&session->s_con,
2535		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2536
2537	/* replay unsafe requests */
2538	replay_unsafe_requests(mdsc, session);
2539
2540	down_read(&mdsc->snap_rwsem);
2541
2542	dout("session %p state %s\n", session,
2543	     session_state_name(session->s_state));
2544
2545	/* drop old cap expires; we're about to reestablish that state */
2546	discard_cap_releases(mdsc, session);
2547
2548	/* traverse this session's caps */
2549	err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2550	if (err)
2551		goto fail;
2552
2553	recon_state.pagelist = pagelist;
2554	recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2555	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2556	if (err < 0)
2557		goto fail;
2558
2559	/*
2560	 * snaprealms.  we provide mds with the ino, seq (version), and
2561	 * parent for all of our realms.  If the mds has any newer info,
2562	 * it will tell us.
2563	 */
2564	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2565		struct ceph_snap_realm *realm =
2566			rb_entry(p, struct ceph_snap_realm, node);
2567		struct ceph_mds_snaprealm_reconnect sr_rec;
2568
2569		dout(" adding snap realm %llx seq %lld parent %llx\n",
2570		     realm->ino, realm->seq, realm->parent_ino);
2571		sr_rec.ino = cpu_to_le64(realm->ino);
2572		sr_rec.seq = cpu_to_le64(realm->seq);
2573		sr_rec.parent = cpu_to_le64(realm->parent_ino);
2574		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2575		if (err)
2576			goto fail;
2577	}
2578
2579	reply->pagelist = pagelist;
2580	if (recon_state.flock)
2581		reply->hdr.version = cpu_to_le16(2);
2582	reply->hdr.data_len = cpu_to_le32(pagelist->length);
2583	reply->nr_pages = calc_pages_for(0, pagelist->length);
2584	ceph_con_send(&session->s_con, reply);
2585
2586	mutex_unlock(&session->s_mutex);
2587
2588	mutex_lock(&mdsc->mutex);
2589	__wake_requests(mdsc, &session->s_waiting);
2590	mutex_unlock(&mdsc->mutex);
2591
2592	up_read(&mdsc->snap_rwsem);
2593	return;
2594
2595fail:
2596	ceph_msg_put(reply);
2597	up_read(&mdsc->snap_rwsem);
2598	mutex_unlock(&session->s_mutex);
2599fail_nomsg:
2600	ceph_pagelist_release(pagelist);
2601	kfree(pagelist);
2602fail_nopagelist:
2603	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2604	return;
2605}
2606
2607
2608/*
2609 * compare old and new mdsmaps, kicking requests
2610 * and closing out old connections as necessary
2611 *
2612 * called under mdsc->mutex.
2613 */
2614static void check_new_map(struct ceph_mds_client *mdsc,
2615			  struct ceph_mdsmap *newmap,
2616			  struct ceph_mdsmap *oldmap)
2617{
2618	int i;
2619	int oldstate, newstate;
2620	struct ceph_mds_session *s;
2621
2622	dout("check_new_map new %u old %u\n",
2623	     newmap->m_epoch, oldmap->m_epoch);
2624
2625	for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2626		if (mdsc->sessions[i] == NULL)
2627			continue;
2628		s = mdsc->sessions[i];
2629		oldstate = ceph_mdsmap_get_state(oldmap, i);
2630		newstate = ceph_mdsmap_get_state(newmap, i);
2631
2632		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2633		     i, ceph_mds_state_name(oldstate),
2634		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2635		     ceph_mds_state_name(newstate),
2636		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2637		     session_state_name(s->s_state));
2638
2639		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2640			   ceph_mdsmap_get_addr(newmap, i),
2641			   sizeof(struct ceph_entity_addr))) {
2642			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2643				/* the session never opened, just close it
2644				 * out now */
2645				__wake_requests(mdsc, &s->s_waiting);
2646				__unregister_session(mdsc, s);
2647			} else {
2648				/* just close it */
2649				mutex_unlock(&mdsc->mutex);
2650				mutex_lock(&s->s_mutex);
2651				mutex_lock(&mdsc->mutex);
2652				ceph_con_close(&s->s_con);
2653				mutex_unlock(&s->s_mutex);
2654				s->s_state = CEPH_MDS_SESSION_RESTARTING;
2655			}
2656
2657			/* kick any requests waiting on the recovering mds */
2658			kick_requests(mdsc, i);
2659		} else if (oldstate == newstate) {
2660			continue;  /* nothing new with this mds */
2661		}
2662
2663		/*
2664		 * send reconnect?
2665		 */
2666		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2667		    newstate >= CEPH_MDS_STATE_RECONNECT) {
2668			mutex_unlock(&mdsc->mutex);
2669			send_mds_reconnect(mdsc, s);
2670			mutex_lock(&mdsc->mutex);
2671		}
2672
2673		/*
2674		 * kick request on any mds that has gone active.
2675		 */
2676		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2677		    newstate >= CEPH_MDS_STATE_ACTIVE) {
2678			if (oldstate != CEPH_MDS_STATE_CREATING &&
2679			    oldstate != CEPH_MDS_STATE_STARTING)
2680				pr_info("mds%d recovery completed\n", s->s_mds);
2681			kick_requests(mdsc, i);
2682			ceph_kick_flushing_caps(mdsc, s);
2683			wake_up_session_caps(s, 1);
2684		}
2685	}
2686
2687	for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2688		s = mdsc->sessions[i];
2689		if (!s)
2690			continue;
2691		if (!ceph_mdsmap_is_laggy(newmap, i))
2692			continue;
2693		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2694		    s->s_state == CEPH_MDS_SESSION_HUNG ||
2695		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
2696			dout(" connecting to export targets of laggy mds%d\n",
2697			     i);
2698			__open_export_target_sessions(mdsc, s);
2699		}
2700	}
2701}
2702
2703
2704
2705/*
2706 * leases
2707 */
2708
2709/*
2710 * caller must hold session s_mutex, dentry->d_lock
2711 */
2712void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2713{
2714	struct ceph_dentry_info *di = ceph_dentry(dentry);
2715
2716	ceph_put_mds_session(di->lease_session);
2717	di->lease_session = NULL;
2718}
2719
2720static void handle_lease(struct ceph_mds_client *mdsc,
2721			 struct ceph_mds_session *session,
2722			 struct ceph_msg *msg)
2723{
2724	struct super_block *sb = mdsc->fsc->sb;
2725	struct inode *inode;
2726	struct dentry *parent, *dentry;
2727	struct ceph_dentry_info *di;
2728	int mds = session->s_mds;
2729	struct ceph_mds_lease *h = msg->front.iov_base;
2730	u32 seq;
2731	struct ceph_vino vino;
2732	struct qstr dname;
2733	int release = 0;
2734
2735	dout("handle_lease from mds%d\n", mds);
2736
2737	/* decode */
2738	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2739		goto bad;
2740	vino.ino = le64_to_cpu(h->ino);
2741	vino.snap = CEPH_NOSNAP;
2742	seq = le32_to_cpu(h->seq);
2743	dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2744	dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2745	if (dname.len != get_unaligned_le32(h+1))
2746		goto bad;
2747
2748	mutex_lock(&session->s_mutex);
2749	session->s_seq++;
2750
2751	/* lookup inode */
2752	inode = ceph_find_inode(sb, vino);
2753	dout("handle_lease %s, ino %llx %p %.*s\n",
2754	     ceph_lease_op_name(h->action), vino.ino, inode,
2755	     dname.len, dname.name);
2756	if (inode == NULL) {
2757		dout("handle_lease no inode %llx\n", vino.ino);
2758		goto release;
2759	}
2760
2761	/* dentry */
2762	parent = d_find_alias(inode);
2763	if (!parent) {
2764		dout("no parent dentry on inode %p\n", inode);
2765		WARN_ON(1);
2766		goto release;  /* hrm... */
2767	}
2768	dname.hash = full_name_hash(dname.name, dname.len);
2769	dentry = d_lookup(parent, &dname);
2770	dput(parent);
2771	if (!dentry)
2772		goto release;
2773
2774	spin_lock(&dentry->d_lock);
2775	di = ceph_dentry(dentry);
2776	switch (h->action) {
2777	case CEPH_MDS_LEASE_REVOKE:
2778		if (di->lease_session == session) {
2779			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2780				h->seq = cpu_to_le32(di->lease_seq);
2781			__ceph_mdsc_drop_dentry_lease(dentry);
2782		}
2783		release = 1;
2784		break;
2785
2786	case CEPH_MDS_LEASE_RENEW:
2787		if (di->lease_session == session &&
2788		    di->lease_gen == session->s_cap_gen &&
2789		    di->lease_renew_from &&
2790		    di->lease_renew_after == 0) {
2791			unsigned long duration =
2792				le32_to_cpu(h->duration_ms) * HZ / 1000;
2793
2794			di->lease_seq = seq;
2795			dentry->d_time = di->lease_renew_from + duration;
2796			di->lease_renew_after = di->lease_renew_from +
2797				(duration >> 1);
2798			di->lease_renew_from = 0;
2799		}
2800		break;
2801	}
2802	spin_unlock(&dentry->d_lock);
2803	dput(dentry);
2804
2805	if (!release)
2806		goto out;
2807
2808release:
2809	/* let's just reuse the same message */
2810	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2811	ceph_msg_get(msg);
2812	ceph_con_send(&session->s_con, msg);
2813
2814out:
2815	iput(inode);
2816	mutex_unlock(&session->s_mutex);
2817	return;
2818
2819bad:
2820	pr_err("corrupt lease message\n");
2821	ceph_msg_dump(msg);
2822}
2823
2824void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2825			      struct inode *inode,
2826			      struct dentry *dentry, char action,
2827			      u32 seq)
2828{
2829	struct ceph_msg *msg;
2830	struct ceph_mds_lease *lease;
2831	int len = sizeof(*lease) + sizeof(u32);
2832	int dnamelen = 0;
2833
2834	dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2835	     inode, dentry, ceph_lease_op_name(action), session->s_mds);
2836	dnamelen = dentry->d_name.len;
2837	len += dnamelen;
2838
2839	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
2840	if (!msg)
2841		return;
2842	lease = msg->front.iov_base;
2843	lease->action = action;
2844	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2845	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2846	lease->seq = cpu_to_le32(seq);
2847	put_unaligned_le32(dnamelen, lease + 1);
2848	memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2849
2850	/*
2851	 * if this is a preemptive lease RELEASE, no need to
2852	 * flush request stream, since the actual request will
2853	 * soon follow.
2854	 */
2855	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2856
2857	ceph_con_send(&session->s_con, msg);
2858}
2859
2860/*
2861 * Preemptively release a lease we expect to invalidate anyway.
2862 * Pass @inode always, @dentry is optional.
2863 */
2864void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2865			     struct dentry *dentry)
2866{
2867	struct ceph_dentry_info *di;
2868	struct ceph_mds_session *session;
2869	u32 seq;
2870
2871	BUG_ON(inode == NULL);
2872	BUG_ON(dentry == NULL);
2873
2874	/* is dentry lease valid? */
2875	spin_lock(&dentry->d_lock);
2876	di = ceph_dentry(dentry);
2877	if (!di || !di->lease_session ||
2878	    di->lease_session->s_mds < 0 ||
2879	    di->lease_gen != di->lease_session->s_cap_gen ||
2880	    !time_before(jiffies, dentry->d_time)) {
2881		dout("lease_release inode %p dentry %p -- "
2882		     "no lease\n",
2883		     inode, dentry);
2884		spin_unlock(&dentry->d_lock);
2885		return;
2886	}
2887
2888	/* we do have a lease on this dentry; note mds and seq */
2889	session = ceph_get_mds_session(di->lease_session);
2890	seq = di->lease_seq;
2891	__ceph_mdsc_drop_dentry_lease(dentry);
2892	spin_unlock(&dentry->d_lock);
2893
2894	dout("lease_release inode %p dentry %p to mds%d\n",
2895	     inode, dentry, session->s_mds);
2896	ceph_mdsc_lease_send_msg(session, inode, dentry,
2897				 CEPH_MDS_LEASE_RELEASE, seq);
2898	ceph_put_mds_session(session);
2899}
2900
2901/*
2902 * drop all leases (and dentry refs) in preparation for umount
2903 */
2904static void drop_leases(struct ceph_mds_client *mdsc)
2905{
2906	int i;
2907
2908	dout("drop_leases\n");
2909	mutex_lock(&mdsc->mutex);
2910	for (i = 0; i < mdsc->max_sessions; i++) {
2911		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2912		if (!s)
2913			continue;
2914		mutex_unlock(&mdsc->mutex);
2915		mutex_lock(&s->s_mutex);
2916		mutex_unlock(&s->s_mutex);
2917		ceph_put_mds_session(s);
2918		mutex_lock(&mdsc->mutex);
2919	}
2920	mutex_unlock(&mdsc->mutex);
2921}
2922
2923
2924
2925/*
2926 * delayed work -- periodically trim expired leases, renew caps with mds
2927 */
2928static void schedule_delayed(struct ceph_mds_client *mdsc)
2929{
2930	int delay = 5;
2931	unsigned hz = round_jiffies_relative(HZ * delay);
2932	schedule_delayed_work(&mdsc->delayed_work, hz);
2933}
2934
2935static void delayed_work(struct work_struct *work)
2936{
2937	int i;
2938	struct ceph_mds_client *mdsc =
2939		container_of(work, struct ceph_mds_client, delayed_work.work);
2940	int renew_interval;
2941	int renew_caps;
2942
2943	dout("mdsc delayed_work\n");
2944	ceph_check_delayed_caps(mdsc);
2945
2946	mutex_lock(&mdsc->mutex);
2947	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2948	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2949				   mdsc->last_renew_caps);
2950	if (renew_caps)
2951		mdsc->last_renew_caps = jiffies;
2952
2953	for (i = 0; i < mdsc->max_sessions; i++) {
2954		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2955		if (s == NULL)
2956			continue;
2957		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2958			dout("resending session close request for mds%d\n",
2959			     s->s_mds);
2960			request_close_session(mdsc, s);
2961			ceph_put_mds_session(s);
2962			continue;
2963		}
2964		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2965			if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2966				s->s_state = CEPH_MDS_SESSION_HUNG;
2967				pr_info("mds%d hung\n", s->s_mds);
2968			}
2969		}
2970		if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2971			/* this mds is failed or recovering, just wait */
2972			ceph_put_mds_session(s);
2973			continue;
2974		}
2975		mutex_unlock(&mdsc->mutex);
2976
2977		mutex_lock(&s->s_mutex);
2978		if (renew_caps)
2979			send_renew_caps(mdsc, s);
2980		else
2981			ceph_con_keepalive(&s->s_con);
2982		ceph_add_cap_releases(mdsc, s);
2983		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2984		    s->s_state == CEPH_MDS_SESSION_HUNG)
2985			ceph_send_cap_releases(mdsc, s);
2986		mutex_unlock(&s->s_mutex);
2987		ceph_put_mds_session(s);
2988
2989		mutex_lock(&mdsc->mutex);
2990	}
2991	mutex_unlock(&mdsc->mutex);
2992
2993	schedule_delayed(mdsc);
2994}
2995
2996int ceph_mdsc_init(struct ceph_fs_client *fsc)
2997
2998{
2999	struct ceph_mds_client *mdsc;
3000
3001	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3002	if (!mdsc)
3003		return -ENOMEM;
3004	mdsc->fsc = fsc;
3005	fsc->mdsc = mdsc;
3006	mutex_init(&mdsc->mutex);
3007	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3008	if (mdsc->mdsmap == NULL)
3009		return -ENOMEM;
3010
3011	init_completion(&mdsc->safe_umount_waiters);
3012	init_waitqueue_head(&mdsc->session_close_wq);
3013	INIT_LIST_HEAD(&mdsc->waiting_for_map);
3014	mdsc->sessions = NULL;
3015	mdsc->max_sessions = 0;
3016	mdsc->stopping = 0;
3017	init_rwsem(&mdsc->snap_rwsem);
3018	mdsc->snap_realms = RB_ROOT;
3019	INIT_LIST_HEAD(&mdsc->snap_empty);
3020	spin_lock_init(&mdsc->snap_empty_lock);
3021	mdsc->last_tid = 0;
3022	mdsc->request_tree = RB_ROOT;
3023	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3024	mdsc->last_renew_caps = jiffies;
3025	INIT_LIST_HEAD(&mdsc->cap_delay_list);
3026	spin_lock_init(&mdsc->cap_delay_lock);
3027	INIT_LIST_HEAD(&mdsc->snap_flush_list);
3028	spin_lock_init(&mdsc->snap_flush_lock);
3029	mdsc->cap_flush_seq = 0;
3030	INIT_LIST_HEAD(&mdsc->cap_dirty);
3031	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3032	mdsc->num_cap_flushing = 0;
3033	spin_lock_init(&mdsc->cap_dirty_lock);
3034	init_waitqueue_head(&mdsc->cap_flushing_wq);
3035	spin_lock_init(&mdsc->dentry_lru_lock);
3036	INIT_LIST_HEAD(&mdsc->dentry_lru);
3037
3038	ceph_caps_init(mdsc);
3039	ceph_adjust_min_caps(mdsc, fsc->min_caps);
3040
3041	return 0;
3042}
3043
3044/*
3045 * Wait for safe replies on open mds requests.  If we time out, drop
3046 * all requests from the tree to avoid dangling dentry refs.
3047 */
3048static void wait_requests(struct ceph_mds_client *mdsc)
3049{
3050	struct ceph_mds_request *req;
3051	struct ceph_fs_client *fsc = mdsc->fsc;
3052
3053	mutex_lock(&mdsc->mutex);
3054	if (__get_oldest_req(mdsc)) {
3055		mutex_unlock(&mdsc->mutex);
3056
3057		dout("wait_requests waiting for requests\n");
3058		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3059				    fsc->client->options->mount_timeout * HZ);
3060
3061		/* tear down remaining requests */
3062		mutex_lock(&mdsc->mutex);
3063		while ((req = __get_oldest_req(mdsc))) {
3064			dout("wait_requests timed out on tid %llu\n",
3065			     req->r_tid);
3066			__unregister_request(mdsc, req);
3067		}
3068	}
3069	mutex_unlock(&mdsc->mutex);
3070	dout("wait_requests done\n");
3071}
3072
3073/*
3074 * called before mount is ro, and before dentries are torn down.
3075 * (hmm, does this still race with new lookups?)
3076 */
3077void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3078{
3079	dout("pre_umount\n");
3080	mdsc->stopping = 1;
3081
3082	drop_leases(mdsc);
3083	ceph_flush_dirty_caps(mdsc);
3084	wait_requests(mdsc);
3085
3086	/*
3087	 * wait for reply handlers to drop their request refs and
3088	 * their inode/dcache refs
3089	 */
3090	ceph_msgr_flush();
3091}
3092
3093/*
3094 * wait for all write mds requests to flush.
3095 */
3096static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3097{
3098	struct ceph_mds_request *req = NULL, *nextreq;
3099	struct rb_node *n;
3100
3101	mutex_lock(&mdsc->mutex);
3102	dout("wait_unsafe_requests want %lld\n", want_tid);
3103restart:
3104	req = __get_oldest_req(mdsc);
3105	while (req && req->r_tid <= want_tid) {
3106		/* find next request */
3107		n = rb_next(&req->r_node);
3108		if (n)
3109			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3110		else
3111			nextreq = NULL;
3112		if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3113			/* write op */
3114			ceph_mdsc_get_request(req);
3115			if (nextreq)
3116				ceph_mdsc_get_request(nextreq);
3117			mutex_unlock(&mdsc->mutex);
3118			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3119			     req->r_tid, want_tid);
3120			wait_for_completion(&req->r_safe_completion);
3121			mutex_lock(&mdsc->mutex);
3122			ceph_mdsc_put_request(req);
3123			if (!nextreq)
3124				break;  /* next dne before, so we're done! */
3125			if (RB_EMPTY_NODE(&nextreq->r_node)) {
3126				/* next request was removed from tree */
3127				ceph_mdsc_put_request(nextreq);
3128				goto restart;
3129			}
3130			ceph_mdsc_put_request(nextreq);  /* won't go away */
3131		}
3132		req = nextreq;
3133	}
3134	mutex_unlock(&mdsc->mutex);
3135	dout("wait_unsafe_requests done\n");
3136}
3137
3138void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3139{
3140	u64 want_tid, want_flush;
3141
3142	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3143		return;
3144
3145	dout("sync\n");
3146	mutex_lock(&mdsc->mutex);
3147	want_tid = mdsc->last_tid;
3148	want_flush = mdsc->cap_flush_seq;
3149	mutex_unlock(&mdsc->mutex);
3150	dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3151
3152	ceph_flush_dirty_caps(mdsc);
3153
3154	wait_unsafe_requests(mdsc, want_tid);
3155	wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3156}
3157
3158/*
3159 * true if all sessions are closed, or we force unmount
3160 */
3161static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3162{
3163	int i, n = 0;
3164
3165	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3166		return true;
3167
3168	mutex_lock(&mdsc->mutex);
3169	for (i = 0; i < mdsc->max_sessions; i++)
3170		if (mdsc->sessions[i])
3171			n++;
3172	mutex_unlock(&mdsc->mutex);
3173	return n == 0;
3174}
3175
3176/*
3177 * called after sb is ro.
3178 */
3179void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3180{
3181	struct ceph_mds_session *session;
3182	int i;
3183	struct ceph_fs_client *fsc = mdsc->fsc;
3184	unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3185
3186	dout("close_sessions\n");
3187
3188	/* close sessions */
3189	mutex_lock(&mdsc->mutex);
3190	for (i = 0; i < mdsc->max_sessions; i++) {
3191		session = __ceph_lookup_mds_session(mdsc, i);
3192		if (!session)
3193			continue;
3194		mutex_unlock(&mdsc->mutex);
3195		mutex_lock(&session->s_mutex);
3196		__close_session(mdsc, session);
3197		mutex_unlock(&session->s_mutex);
3198		ceph_put_mds_session(session);
3199		mutex_lock(&mdsc->mutex);
3200	}
3201	mutex_unlock(&mdsc->mutex);
3202
3203	dout("waiting for sessions to close\n");
3204	wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3205			   timeout);
3206
3207	/* tear down remaining sessions */
3208	mutex_lock(&mdsc->mutex);
3209	for (i = 0; i < mdsc->max_sessions; i++) {
3210		if (mdsc->sessions[i]) {
3211			session = get_session(mdsc->sessions[i]);
3212			__unregister_session(mdsc, session);
3213			mutex_unlock(&mdsc->mutex);
3214			mutex_lock(&session->s_mutex);
3215			remove_session_caps(session);
3216			mutex_unlock(&session->s_mutex);
3217			ceph_put_mds_session(session);
3218			mutex_lock(&mdsc->mutex);
3219		}
3220	}
3221	WARN_ON(!list_empty(&mdsc->cap_delay_list));
3222	mutex_unlock(&mdsc->mutex);
3223
3224	ceph_cleanup_empty_realms(mdsc);
3225
3226	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3227
3228	dout("stopped\n");
3229}
3230
3231static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3232{
3233	dout("stop\n");
3234	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3235	if (mdsc->mdsmap)
3236		ceph_mdsmap_destroy(mdsc->mdsmap);
3237	kfree(mdsc->sessions);
3238	ceph_caps_finalize(mdsc);
3239}
3240
3241void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3242{
3243	struct ceph_mds_client *mdsc = fsc->mdsc;
3244
3245	dout("mdsc_destroy %p\n", mdsc);
3246	ceph_mdsc_stop(mdsc);
3247
3248	/* flush out any connection work with references to us */
3249	ceph_msgr_flush();
3250
3251	fsc->mdsc = NULL;
3252	kfree(mdsc);
3253	dout("mdsc_destroy %p done\n", mdsc);
3254}
3255
3256
3257/*
3258 * handle mds map update.
3259 */
3260void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3261{
3262	u32 epoch;
3263	u32 maplen;
3264	void *p = msg->front.iov_base;
3265	void *end = p + msg->front.iov_len;
3266	struct ceph_mdsmap *newmap, *oldmap;
3267	struct ceph_fsid fsid;
3268	int err = -EINVAL;
3269
3270	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3271	ceph_decode_copy(&p, &fsid, sizeof(fsid));
3272	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3273		return;
3274	epoch = ceph_decode_32(&p);
3275	maplen = ceph_decode_32(&p);
3276	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3277
3278	/* do we need it? */
3279	ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3280	mutex_lock(&mdsc->mutex);
3281	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3282		dout("handle_map epoch %u <= our %u\n",
3283		     epoch, mdsc->mdsmap->m_epoch);
3284		mutex_unlock(&mdsc->mutex);
3285		return;
3286	}
3287
3288	newmap = ceph_mdsmap_decode(&p, end);
3289	if (IS_ERR(newmap)) {
3290		err = PTR_ERR(newmap);
3291		goto bad_unlock;
3292	}
3293
3294	/* swap into place */
3295	if (mdsc->mdsmap) {
3296		oldmap = mdsc->mdsmap;
3297		mdsc->mdsmap = newmap;
3298		check_new_map(mdsc, newmap, oldmap);
3299		ceph_mdsmap_destroy(oldmap);
3300	} else {
3301		mdsc->mdsmap = newmap;  /* first mds map */
3302	}
3303	mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3304
3305	__wake_requests(mdsc, &mdsc->waiting_for_map);
3306
3307	mutex_unlock(&mdsc->mutex);
3308	schedule_delayed(mdsc);
3309	return;
3310
3311bad_unlock:
3312	mutex_unlock(&mdsc->mutex);
3313bad:
3314	pr_err("error decoding mdsmap %d\n", err);
3315	return;
3316}
3317
3318static struct ceph_connection *con_get(struct ceph_connection *con)
3319{
3320	struct ceph_mds_session *s = con->private;
3321
3322	if (get_session(s)) {
3323		dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3324		return con;
3325	}
3326	dout("mdsc con_get %p FAIL\n", s);
3327	return NULL;
3328}
3329
3330static void con_put(struct ceph_connection *con)
3331{
3332	struct ceph_mds_session *s = con->private;
3333
3334	dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3335	ceph_put_mds_session(s);
3336}
3337
3338/*
3339 * if the client is unresponsive for long enough, the mds will kill
3340 * the session entirely.
3341 */
3342static void peer_reset(struct ceph_connection *con)
3343{
3344	struct ceph_mds_session *s = con->private;
3345	struct ceph_mds_client *mdsc = s->s_mdsc;
3346
3347	pr_warning("mds%d closed our session\n", s->s_mds);
3348	send_mds_reconnect(mdsc, s);
3349}
3350
3351static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3352{
3353	struct ceph_mds_session *s = con->private;
3354	struct ceph_mds_client *mdsc = s->s_mdsc;
3355	int type = le16_to_cpu(msg->hdr.type);
3356
3357	mutex_lock(&mdsc->mutex);
3358	if (__verify_registered_session(mdsc, s) < 0) {
3359		mutex_unlock(&mdsc->mutex);
3360		goto out;
3361	}
3362	mutex_unlock(&mdsc->mutex);
3363
3364	switch (type) {
3365	case CEPH_MSG_MDS_MAP:
3366		ceph_mdsc_handle_map(mdsc, msg);
3367		break;
3368	case CEPH_MSG_CLIENT_SESSION:
3369		handle_session(s, msg);
3370		break;
3371	case CEPH_MSG_CLIENT_REPLY:
3372		handle_reply(s, msg);
3373		break;
3374	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3375		handle_forward(mdsc, s, msg);
3376		break;
3377	case CEPH_MSG_CLIENT_CAPS:
3378		ceph_handle_caps(s, msg);
3379		break;
3380	case CEPH_MSG_CLIENT_SNAP:
3381		ceph_handle_snap(mdsc, s, msg);
3382		break;
3383	case CEPH_MSG_CLIENT_LEASE:
3384		handle_lease(mdsc, s, msg);
3385		break;
3386
3387	default:
3388		pr_err("received unknown message type %d %s\n", type,
3389		       ceph_msg_type_name(type));
3390	}
3391out:
3392	ceph_msg_put(msg);
3393}
3394
3395/*
3396 * authentication
3397 */
3398
3399/*
3400 * Note: returned pointer is the address of a structure that's
3401 * managed separately.  Caller must *not* attempt to free it.
3402 */
3403static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3404					int *proto, int force_new)
3405{
3406	struct ceph_mds_session *s = con->private;
3407	struct ceph_mds_client *mdsc = s->s_mdsc;
3408	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3409	struct ceph_auth_handshake *auth = &s->s_auth;
3410
3411	if (force_new && auth->authorizer) {
3412		if (ac->ops && ac->ops->destroy_authorizer)
3413			ac->ops->destroy_authorizer(ac, auth->authorizer);
3414		auth->authorizer = NULL;
3415	}
3416	if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
3417		int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3418							auth);
3419		if (ret)
3420			return ERR_PTR(ret);
 
 
 
 
 
 
3421	}
3422	*proto = ac->protocol;
3423
3424	return auth;
 
 
 
 
 
3425}
3426
3427
3428static int verify_authorizer_reply(struct ceph_connection *con, int len)
3429{
3430	struct ceph_mds_session *s = con->private;
3431	struct ceph_mds_client *mdsc = s->s_mdsc;
3432	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3433
3434	return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3435}
3436
3437static int invalidate_authorizer(struct ceph_connection *con)
3438{
3439	struct ceph_mds_session *s = con->private;
3440	struct ceph_mds_client *mdsc = s->s_mdsc;
3441	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3442
3443	if (ac->ops->invalidate_authorizer)
3444		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3445
3446	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3447}
3448
3449static const struct ceph_connection_operations mds_con_ops = {
3450	.get = con_get,
3451	.put = con_put,
3452	.dispatch = dispatch,
3453	.get_authorizer = get_authorizer,
3454	.verify_authorizer_reply = verify_authorizer_reply,
3455	.invalidate_authorizer = invalidate_authorizer,
3456	.peer_reset = peer_reset,
3457};
3458
3459/* eof */
v3.1
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/fs.h>
   4#include <linux/wait.h>
   5#include <linux/slab.h>
   6#include <linux/sched.h>
   7#include <linux/debugfs.h>
   8#include <linux/seq_file.h>
   9
  10#include "super.h"
  11#include "mds_client.h"
  12
  13#include <linux/ceph/messenger.h>
  14#include <linux/ceph/decode.h>
  15#include <linux/ceph/pagelist.h>
  16#include <linux/ceph/auth.h>
  17#include <linux/ceph/debugfs.h>
  18
  19/*
  20 * A cluster of MDS (metadata server) daemons is responsible for
  21 * managing the file system namespace (the directory hierarchy and
  22 * inodes) and for coordinating shared access to storage.  Metadata is
  23 * partitioning hierarchically across a number of servers, and that
  24 * partition varies over time as the cluster adjusts the distribution
  25 * in order to balance load.
  26 *
  27 * The MDS client is primarily responsible to managing synchronous
  28 * metadata requests for operations like open, unlink, and so forth.
  29 * If there is a MDS failure, we find out about it when we (possibly
  30 * request and) receive a new MDS map, and can resubmit affected
  31 * requests.
  32 *
  33 * For the most part, though, we take advantage of a lossless
  34 * communications channel to the MDS, and do not need to worry about
  35 * timing out or resubmitting requests.
  36 *
  37 * We maintain a stateful "session" with each MDS we interact with.
  38 * Within each session, we sent periodic heartbeat messages to ensure
  39 * any capabilities or leases we have been issues remain valid.  If
  40 * the session times out and goes stale, our leases and capabilities
  41 * are no longer valid.
  42 */
  43
  44struct ceph_reconnect_state {
  45	struct ceph_pagelist *pagelist;
  46	bool flock;
  47};
  48
  49static void __wake_requests(struct ceph_mds_client *mdsc,
  50			    struct list_head *head);
  51
  52static const struct ceph_connection_operations mds_con_ops;
  53
  54
  55/*
  56 * mds reply parsing
  57 */
  58
  59/*
  60 * parse individual inode info
  61 */
  62static int parse_reply_info_in(void **p, void *end,
  63			       struct ceph_mds_reply_info_in *info,
  64			       int features)
  65{
  66	int err = -EIO;
  67
  68	info->in = *p;
  69	*p += sizeof(struct ceph_mds_reply_inode) +
  70		sizeof(*info->in->fragtree.splits) *
  71		le32_to_cpu(info->in->fragtree.nsplits);
  72
  73	ceph_decode_32_safe(p, end, info->symlink_len, bad);
  74	ceph_decode_need(p, end, info->symlink_len, bad);
  75	info->symlink = *p;
  76	*p += info->symlink_len;
  77
  78	if (features & CEPH_FEATURE_DIRLAYOUTHASH)
  79		ceph_decode_copy_safe(p, end, &info->dir_layout,
  80				      sizeof(info->dir_layout), bad);
  81	else
  82		memset(&info->dir_layout, 0, sizeof(info->dir_layout));
  83
  84	ceph_decode_32_safe(p, end, info->xattr_len, bad);
  85	ceph_decode_need(p, end, info->xattr_len, bad);
  86	info->xattr_data = *p;
  87	*p += info->xattr_len;
  88	return 0;
  89bad:
  90	return err;
  91}
  92
  93/*
  94 * parse a normal reply, which may contain a (dir+)dentry and/or a
  95 * target inode.
  96 */
  97static int parse_reply_info_trace(void **p, void *end,
  98				  struct ceph_mds_reply_info_parsed *info,
  99				  int features)
 100{
 101	int err;
 102
 103	if (info->head->is_dentry) {
 104		err = parse_reply_info_in(p, end, &info->diri, features);
 105		if (err < 0)
 106			goto out_bad;
 107
 108		if (unlikely(*p + sizeof(*info->dirfrag) > end))
 109			goto bad;
 110		info->dirfrag = *p;
 111		*p += sizeof(*info->dirfrag) +
 112			sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
 113		if (unlikely(*p > end))
 114			goto bad;
 115
 116		ceph_decode_32_safe(p, end, info->dname_len, bad);
 117		ceph_decode_need(p, end, info->dname_len, bad);
 118		info->dname = *p;
 119		*p += info->dname_len;
 120		info->dlease = *p;
 121		*p += sizeof(*info->dlease);
 122	}
 123
 124	if (info->head->is_target) {
 125		err = parse_reply_info_in(p, end, &info->targeti, features);
 126		if (err < 0)
 127			goto out_bad;
 128	}
 129
 130	if (unlikely(*p != end))
 131		goto bad;
 132	return 0;
 133
 134bad:
 135	err = -EIO;
 136out_bad:
 137	pr_err("problem parsing mds trace %d\n", err);
 138	return err;
 139}
 140
 141/*
 142 * parse readdir results
 143 */
 144static int parse_reply_info_dir(void **p, void *end,
 145				struct ceph_mds_reply_info_parsed *info,
 146				int features)
 147{
 148	u32 num, i = 0;
 149	int err;
 150
 151	info->dir_dir = *p;
 152	if (*p + sizeof(*info->dir_dir) > end)
 153		goto bad;
 154	*p += sizeof(*info->dir_dir) +
 155		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
 156	if (*p > end)
 157		goto bad;
 158
 159	ceph_decode_need(p, end, sizeof(num) + 2, bad);
 160	num = ceph_decode_32(p);
 161	info->dir_end = ceph_decode_8(p);
 162	info->dir_complete = ceph_decode_8(p);
 163	if (num == 0)
 164		goto done;
 165
 166	/* alloc large array */
 167	info->dir_nr = num;
 168	info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
 169			       sizeof(*info->dir_dname) +
 170			       sizeof(*info->dir_dname_len) +
 171			       sizeof(*info->dir_dlease),
 172			       GFP_NOFS);
 173	if (info->dir_in == NULL) {
 174		err = -ENOMEM;
 175		goto out_bad;
 176	}
 177	info->dir_dname = (void *)(info->dir_in + num);
 178	info->dir_dname_len = (void *)(info->dir_dname + num);
 179	info->dir_dlease = (void *)(info->dir_dname_len + num);
 180
 181	while (num) {
 182		/* dentry */
 183		ceph_decode_need(p, end, sizeof(u32)*2, bad);
 184		info->dir_dname_len[i] = ceph_decode_32(p);
 185		ceph_decode_need(p, end, info->dir_dname_len[i], bad);
 186		info->dir_dname[i] = *p;
 187		*p += info->dir_dname_len[i];
 188		dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
 189		     info->dir_dname[i]);
 190		info->dir_dlease[i] = *p;
 191		*p += sizeof(struct ceph_mds_reply_lease);
 192
 193		/* inode */
 194		err = parse_reply_info_in(p, end, &info->dir_in[i], features);
 195		if (err < 0)
 196			goto out_bad;
 197		i++;
 198		num--;
 199	}
 200
 201done:
 202	if (*p != end)
 203		goto bad;
 204	return 0;
 205
 206bad:
 207	err = -EIO;
 208out_bad:
 209	pr_err("problem parsing dir contents %d\n", err);
 210	return err;
 211}
 212
 213/*
 214 * parse fcntl F_GETLK results
 215 */
 216static int parse_reply_info_filelock(void **p, void *end,
 217				     struct ceph_mds_reply_info_parsed *info,
 218				     int features)
 219{
 220	if (*p + sizeof(*info->filelock_reply) > end)
 221		goto bad;
 222
 223	info->filelock_reply = *p;
 224	*p += sizeof(*info->filelock_reply);
 225
 226	if (unlikely(*p != end))
 227		goto bad;
 228	return 0;
 229
 230bad:
 231	return -EIO;
 232}
 233
 234/*
 235 * parse extra results
 236 */
 237static int parse_reply_info_extra(void **p, void *end,
 238				  struct ceph_mds_reply_info_parsed *info,
 239				  int features)
 240{
 241	if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
 242		return parse_reply_info_filelock(p, end, info, features);
 243	else
 244		return parse_reply_info_dir(p, end, info, features);
 245}
 246
 247/*
 248 * parse entire mds reply
 249 */
 250static int parse_reply_info(struct ceph_msg *msg,
 251			    struct ceph_mds_reply_info_parsed *info,
 252			    int features)
 253{
 254	void *p, *end;
 255	u32 len;
 256	int err;
 257
 258	info->head = msg->front.iov_base;
 259	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 260	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
 261
 262	/* trace */
 263	ceph_decode_32_safe(&p, end, len, bad);
 264	if (len > 0) {
 
 265		err = parse_reply_info_trace(&p, p+len, info, features);
 266		if (err < 0)
 267			goto out_bad;
 268	}
 269
 270	/* extra */
 271	ceph_decode_32_safe(&p, end, len, bad);
 272	if (len > 0) {
 
 273		err = parse_reply_info_extra(&p, p+len, info, features);
 274		if (err < 0)
 275			goto out_bad;
 276	}
 277
 278	/* snap blob */
 279	ceph_decode_32_safe(&p, end, len, bad);
 280	info->snapblob_len = len;
 281	info->snapblob = p;
 282	p += len;
 283
 284	if (p != end)
 285		goto bad;
 286	return 0;
 287
 288bad:
 289	err = -EIO;
 290out_bad:
 291	pr_err("mds parse_reply err %d\n", err);
 292	return err;
 293}
 294
 295static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 296{
 297	kfree(info->dir_in);
 298}
 299
 300
 301/*
 302 * sessions
 303 */
 304static const char *session_state_name(int s)
 305{
 306	switch (s) {
 307	case CEPH_MDS_SESSION_NEW: return "new";
 308	case CEPH_MDS_SESSION_OPENING: return "opening";
 309	case CEPH_MDS_SESSION_OPEN: return "open";
 310	case CEPH_MDS_SESSION_HUNG: return "hung";
 311	case CEPH_MDS_SESSION_CLOSING: return "closing";
 312	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 313	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 314	default: return "???";
 315	}
 316}
 317
 318static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 319{
 320	if (atomic_inc_not_zero(&s->s_ref)) {
 321		dout("mdsc get_session %p %d -> %d\n", s,
 322		     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
 323		return s;
 324	} else {
 325		dout("mdsc get_session %p 0 -- FAIL", s);
 326		return NULL;
 327	}
 328}
 329
 330void ceph_put_mds_session(struct ceph_mds_session *s)
 331{
 332	dout("mdsc put_session %p %d -> %d\n", s,
 333	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
 334	if (atomic_dec_and_test(&s->s_ref)) {
 335		if (s->s_authorizer)
 336		     s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
 337			     s->s_mdsc->fsc->client->monc.auth,
 338			     s->s_authorizer);
 339		kfree(s);
 340	}
 341}
 342
 343/*
 344 * called under mdsc->mutex
 345 */
 346struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 347						   int mds)
 348{
 349	struct ceph_mds_session *session;
 350
 351	if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
 352		return NULL;
 353	session = mdsc->sessions[mds];
 354	dout("lookup_mds_session %p %d\n", session,
 355	     atomic_read(&session->s_ref));
 356	get_session(session);
 357	return session;
 358}
 359
 360static bool __have_session(struct ceph_mds_client *mdsc, int mds)
 361{
 362	if (mds >= mdsc->max_sessions)
 363		return false;
 364	return mdsc->sessions[mds];
 365}
 366
 367static int __verify_registered_session(struct ceph_mds_client *mdsc,
 368				       struct ceph_mds_session *s)
 369{
 370	if (s->s_mds >= mdsc->max_sessions ||
 371	    mdsc->sessions[s->s_mds] != s)
 372		return -ENOENT;
 373	return 0;
 374}
 375
 376/*
 377 * create+register a new session for given mds.
 378 * called under mdsc->mutex.
 379 */
 380static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 381						 int mds)
 382{
 383	struct ceph_mds_session *s;
 384
 385	s = kzalloc(sizeof(*s), GFP_NOFS);
 386	if (!s)
 387		return ERR_PTR(-ENOMEM);
 388	s->s_mdsc = mdsc;
 389	s->s_mds = mds;
 390	s->s_state = CEPH_MDS_SESSION_NEW;
 391	s->s_ttl = 0;
 392	s->s_seq = 0;
 393	mutex_init(&s->s_mutex);
 394
 395	ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
 396	s->s_con.private = s;
 397	s->s_con.ops = &mds_con_ops;
 398	s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
 399	s->s_con.peer_name.num = cpu_to_le64(mds);
 400
 
 
 
 
 401	spin_lock_init(&s->s_cap_lock);
 402	s->s_cap_gen = 0;
 403	s->s_cap_ttl = 0;
 404	s->s_renew_requested = 0;
 405	s->s_renew_seq = 0;
 406	INIT_LIST_HEAD(&s->s_caps);
 407	s->s_nr_caps = 0;
 408	s->s_trim_caps = 0;
 409	atomic_set(&s->s_ref, 1);
 410	INIT_LIST_HEAD(&s->s_waiting);
 411	INIT_LIST_HEAD(&s->s_unsafe);
 412	s->s_num_cap_releases = 0;
 413	s->s_cap_iterator = NULL;
 414	INIT_LIST_HEAD(&s->s_cap_releases);
 415	INIT_LIST_HEAD(&s->s_cap_releases_done);
 416	INIT_LIST_HEAD(&s->s_cap_flushing);
 417	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 418
 419	dout("register_session mds%d\n", mds);
 420	if (mds >= mdsc->max_sessions) {
 421		int newmax = 1 << get_count_order(mds+1);
 422		struct ceph_mds_session **sa;
 423
 424		dout("register_session realloc to %d\n", newmax);
 425		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
 426		if (sa == NULL)
 427			goto fail_realloc;
 428		if (mdsc->sessions) {
 429			memcpy(sa, mdsc->sessions,
 430			       mdsc->max_sessions * sizeof(void *));
 431			kfree(mdsc->sessions);
 432		}
 433		mdsc->sessions = sa;
 434		mdsc->max_sessions = newmax;
 435	}
 436	mdsc->sessions[mds] = s;
 437	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 438
 439	ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 440
 441	return s;
 442
 443fail_realloc:
 444	kfree(s);
 445	return ERR_PTR(-ENOMEM);
 446}
 447
 448/*
 449 * called under mdsc->mutex
 450 */
 451static void __unregister_session(struct ceph_mds_client *mdsc,
 452			       struct ceph_mds_session *s)
 453{
 454	dout("__unregister_session mds%d %p\n", s->s_mds, s);
 455	BUG_ON(mdsc->sessions[s->s_mds] != s);
 456	mdsc->sessions[s->s_mds] = NULL;
 457	ceph_con_close(&s->s_con);
 458	ceph_put_mds_session(s);
 459}
 460
 461/*
 462 * drop session refs in request.
 463 *
 464 * should be last request ref, or hold mdsc->mutex
 465 */
 466static void put_request_session(struct ceph_mds_request *req)
 467{
 468	if (req->r_session) {
 469		ceph_put_mds_session(req->r_session);
 470		req->r_session = NULL;
 471	}
 472}
 473
 474void ceph_mdsc_release_request(struct kref *kref)
 475{
 476	struct ceph_mds_request *req = container_of(kref,
 477						    struct ceph_mds_request,
 478						    r_kref);
 479	if (req->r_request)
 480		ceph_msg_put(req->r_request);
 481	if (req->r_reply) {
 482		ceph_msg_put(req->r_reply);
 483		destroy_reply_info(&req->r_reply_info);
 484	}
 485	if (req->r_inode) {
 486		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 487		iput(req->r_inode);
 488	}
 489	if (req->r_locked_dir)
 490		ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
 491	if (req->r_target_inode)
 492		iput(req->r_target_inode);
 493	if (req->r_dentry)
 494		dput(req->r_dentry);
 495	if (req->r_old_dentry) {
 496		/*
 497		 * track (and drop pins for) r_old_dentry_dir
 498		 * separately, since r_old_dentry's d_parent may have
 499		 * changed between the dir mutex being dropped and
 500		 * this request being freed.
 501		 */
 502		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 503				  CEPH_CAP_PIN);
 504		dput(req->r_old_dentry);
 505		iput(req->r_old_dentry_dir);
 506	}
 507	kfree(req->r_path1);
 508	kfree(req->r_path2);
 509	put_request_session(req);
 510	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 511	kfree(req);
 512}
 513
 514/*
 515 * lookup session, bump ref if found.
 516 *
 517 * called under mdsc->mutex.
 518 */
 519static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
 520					     u64 tid)
 521{
 522	struct ceph_mds_request *req;
 523	struct rb_node *n = mdsc->request_tree.rb_node;
 524
 525	while (n) {
 526		req = rb_entry(n, struct ceph_mds_request, r_node);
 527		if (tid < req->r_tid)
 528			n = n->rb_left;
 529		else if (tid > req->r_tid)
 530			n = n->rb_right;
 531		else {
 532			ceph_mdsc_get_request(req);
 533			return req;
 534		}
 535	}
 536	return NULL;
 537}
 538
 539static void __insert_request(struct ceph_mds_client *mdsc,
 540			     struct ceph_mds_request *new)
 541{
 542	struct rb_node **p = &mdsc->request_tree.rb_node;
 543	struct rb_node *parent = NULL;
 544	struct ceph_mds_request *req = NULL;
 545
 546	while (*p) {
 547		parent = *p;
 548		req = rb_entry(parent, struct ceph_mds_request, r_node);
 549		if (new->r_tid < req->r_tid)
 550			p = &(*p)->rb_left;
 551		else if (new->r_tid > req->r_tid)
 552			p = &(*p)->rb_right;
 553		else
 554			BUG();
 555	}
 556
 557	rb_link_node(&new->r_node, parent, p);
 558	rb_insert_color(&new->r_node, &mdsc->request_tree);
 559}
 560
 561/*
 562 * Register an in-flight request, and assign a tid.  Link to directory
 563 * are modifying (if any).
 564 *
 565 * Called under mdsc->mutex.
 566 */
 567static void __register_request(struct ceph_mds_client *mdsc,
 568			       struct ceph_mds_request *req,
 569			       struct inode *dir)
 570{
 571	req->r_tid = ++mdsc->last_tid;
 572	if (req->r_num_caps)
 573		ceph_reserve_caps(mdsc, &req->r_caps_reservation,
 574				  req->r_num_caps);
 575	dout("__register_request %p tid %lld\n", req, req->r_tid);
 576	ceph_mdsc_get_request(req);
 577	__insert_request(mdsc, req);
 578
 579	req->r_uid = current_fsuid();
 580	req->r_gid = current_fsgid();
 581
 582	if (dir) {
 583		struct ceph_inode_info *ci = ceph_inode(dir);
 584
 585		ihold(dir);
 586		spin_lock(&ci->i_unsafe_lock);
 587		req->r_unsafe_dir = dir;
 588		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
 589		spin_unlock(&ci->i_unsafe_lock);
 590	}
 591}
 592
 593static void __unregister_request(struct ceph_mds_client *mdsc,
 594				 struct ceph_mds_request *req)
 595{
 596	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 597	rb_erase(&req->r_node, &mdsc->request_tree);
 598	RB_CLEAR_NODE(&req->r_node);
 599
 600	if (req->r_unsafe_dir) {
 601		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 602
 603		spin_lock(&ci->i_unsafe_lock);
 604		list_del_init(&req->r_unsafe_dir_item);
 605		spin_unlock(&ci->i_unsafe_lock);
 606
 607		iput(req->r_unsafe_dir);
 608		req->r_unsafe_dir = NULL;
 609	}
 610
 611	ceph_mdsc_put_request(req);
 612}
 613
 614/*
 615 * Choose mds to send request to next.  If there is a hint set in the
 616 * request (e.g., due to a prior forward hint from the mds), use that.
 617 * Otherwise, consult frag tree and/or caps to identify the
 618 * appropriate mds.  If all else fails, choose randomly.
 619 *
 620 * Called under mdsc->mutex.
 621 */
 622struct dentry *get_nonsnap_parent(struct dentry *dentry)
 623{
 624	/*
 625	 * we don't need to worry about protecting the d_parent access
 626	 * here because we never renaming inside the snapped namespace
 627	 * except to resplice to another snapdir, and either the old or new
 628	 * result is a valid result.
 629	 */
 630	while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
 631		dentry = dentry->d_parent;
 632	return dentry;
 633}
 634
 635static int __choose_mds(struct ceph_mds_client *mdsc,
 636			struct ceph_mds_request *req)
 637{
 638	struct inode *inode;
 639	struct ceph_inode_info *ci;
 640	struct ceph_cap *cap;
 641	int mode = req->r_direct_mode;
 642	int mds = -1;
 643	u32 hash = req->r_direct_hash;
 644	bool is_hash = req->r_direct_is_hash;
 645
 646	/*
 647	 * is there a specific mds we should try?  ignore hint if we have
 648	 * no session and the mds is not up (active or recovering).
 649	 */
 650	if (req->r_resend_mds >= 0 &&
 651	    (__have_session(mdsc, req->r_resend_mds) ||
 652	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
 653		dout("choose_mds using resend_mds mds%d\n",
 654		     req->r_resend_mds);
 655		return req->r_resend_mds;
 656	}
 657
 658	if (mode == USE_RANDOM_MDS)
 659		goto random;
 660
 661	inode = NULL;
 662	if (req->r_inode) {
 663		inode = req->r_inode;
 664	} else if (req->r_dentry) {
 665		/* ignore race with rename; old or new d_parent is okay */
 666		struct dentry *parent = req->r_dentry->d_parent;
 667		struct inode *dir = parent->d_inode;
 668
 669		if (dir->i_sb != mdsc->fsc->sb) {
 670			/* not this fs! */
 671			inode = req->r_dentry->d_inode;
 672		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
 673			/* direct snapped/virtual snapdir requests
 674			 * based on parent dir inode */
 675			struct dentry *dn = get_nonsnap_parent(parent);
 676			inode = dn->d_inode;
 677			dout("__choose_mds using nonsnap parent %p\n", inode);
 678		} else if (req->r_dentry->d_inode) {
 679			/* dentry target */
 680			inode = req->r_dentry->d_inode;
 681		} else {
 682			/* dir + name */
 683			inode = dir;
 684			hash = ceph_dentry_hash(dir, req->r_dentry);
 685			is_hash = true;
 686		}
 687	}
 688
 689	dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
 690	     (int)hash, mode);
 691	if (!inode)
 692		goto random;
 693	ci = ceph_inode(inode);
 694
 695	if (is_hash && S_ISDIR(inode->i_mode)) {
 696		struct ceph_inode_frag frag;
 697		int found;
 698
 699		ceph_choose_frag(ci, hash, &frag, &found);
 700		if (found) {
 701			if (mode == USE_ANY_MDS && frag.ndist > 0) {
 702				u8 r;
 703
 704				/* choose a random replica */
 705				get_random_bytes(&r, 1);
 706				r %= frag.ndist;
 707				mds = frag.dist[r];
 708				dout("choose_mds %p %llx.%llx "
 709				     "frag %u mds%d (%d/%d)\n",
 710				     inode, ceph_vinop(inode),
 711				     frag.frag, mds,
 712				     (int)r, frag.ndist);
 713				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 714				    CEPH_MDS_STATE_ACTIVE)
 715					return mds;
 716			}
 717
 718			/* since this file/dir wasn't known to be
 719			 * replicated, then we want to look for the
 720			 * authoritative mds. */
 721			mode = USE_AUTH_MDS;
 722			if (frag.mds >= 0) {
 723				/* choose auth mds */
 724				mds = frag.mds;
 725				dout("choose_mds %p %llx.%llx "
 726				     "frag %u mds%d (auth)\n",
 727				     inode, ceph_vinop(inode), frag.frag, mds);
 728				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 729				    CEPH_MDS_STATE_ACTIVE)
 730					return mds;
 731			}
 732		}
 733	}
 734
 735	spin_lock(&inode->i_lock);
 736	cap = NULL;
 737	if (mode == USE_AUTH_MDS)
 738		cap = ci->i_auth_cap;
 739	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
 740		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 741	if (!cap) {
 742		spin_unlock(&inode->i_lock);
 743		goto random;
 744	}
 745	mds = cap->session->s_mds;
 746	dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
 747	     inode, ceph_vinop(inode), mds,
 748	     cap == ci->i_auth_cap ? "auth " : "", cap);
 749	spin_unlock(&inode->i_lock);
 750	return mds;
 751
 752random:
 753	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
 754	dout("choose_mds chose random mds%d\n", mds);
 755	return mds;
 756}
 757
 758
 759/*
 760 * session messages
 761 */
 762static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 763{
 764	struct ceph_msg *msg;
 765	struct ceph_mds_session_head *h;
 766
 767	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
 
 768	if (!msg) {
 769		pr_err("create_session_msg ENOMEM creating msg\n");
 770		return NULL;
 771	}
 772	h = msg->front.iov_base;
 773	h->op = cpu_to_le32(op);
 774	h->seq = cpu_to_le64(seq);
 775	return msg;
 776}
 777
 778/*
 779 * send session open request.
 780 *
 781 * called under mdsc->mutex
 782 */
 783static int __open_session(struct ceph_mds_client *mdsc,
 784			  struct ceph_mds_session *session)
 785{
 786	struct ceph_msg *msg;
 787	int mstate;
 788	int mds = session->s_mds;
 789
 790	/* wait for mds to go active? */
 791	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
 792	dout("open_session to mds%d (%s)\n", mds,
 793	     ceph_mds_state_name(mstate));
 794	session->s_state = CEPH_MDS_SESSION_OPENING;
 795	session->s_renew_requested = jiffies;
 796
 797	/* send connect message */
 798	msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
 799	if (!msg)
 800		return -ENOMEM;
 801	ceph_con_send(&session->s_con, msg);
 802	return 0;
 803}
 804
 805/*
 806 * open sessions for any export targets for the given mds
 807 *
 808 * called under mdsc->mutex
 809 */
 810static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
 811					  struct ceph_mds_session *session)
 812{
 813	struct ceph_mds_info *mi;
 814	struct ceph_mds_session *ts;
 815	int i, mds = session->s_mds;
 816	int target;
 817
 818	if (mds >= mdsc->mdsmap->m_max_mds)
 819		return;
 820	mi = &mdsc->mdsmap->m_info[mds];
 821	dout("open_export_target_sessions for mds%d (%d targets)\n",
 822	     session->s_mds, mi->num_export_targets);
 823
 824	for (i = 0; i < mi->num_export_targets; i++) {
 825		target = mi->export_targets[i];
 826		ts = __ceph_lookup_mds_session(mdsc, target);
 827		if (!ts) {
 828			ts = register_session(mdsc, target);
 829			if (IS_ERR(ts))
 830				return;
 831		}
 832		if (session->s_state == CEPH_MDS_SESSION_NEW ||
 833		    session->s_state == CEPH_MDS_SESSION_CLOSING)
 834			__open_session(mdsc, session);
 835		else
 836			dout(" mds%d target mds%d %p is %s\n", session->s_mds,
 837			     i, ts, session_state_name(ts->s_state));
 838		ceph_put_mds_session(ts);
 839	}
 840}
 841
 842void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
 843					   struct ceph_mds_session *session)
 844{
 845	mutex_lock(&mdsc->mutex);
 846	__open_export_target_sessions(mdsc, session);
 847	mutex_unlock(&mdsc->mutex);
 848}
 849
 850/*
 851 * session caps
 852 */
 853
 854/*
 855 * Free preallocated cap messages assigned to this session
 856 */
 857static void cleanup_cap_releases(struct ceph_mds_session *session)
 858{
 859	struct ceph_msg *msg;
 860
 861	spin_lock(&session->s_cap_lock);
 862	while (!list_empty(&session->s_cap_releases)) {
 863		msg = list_first_entry(&session->s_cap_releases,
 864				       struct ceph_msg, list_head);
 865		list_del_init(&msg->list_head);
 866		ceph_msg_put(msg);
 867	}
 868	while (!list_empty(&session->s_cap_releases_done)) {
 869		msg = list_first_entry(&session->s_cap_releases_done,
 870				       struct ceph_msg, list_head);
 871		list_del_init(&msg->list_head);
 872		ceph_msg_put(msg);
 873	}
 874	spin_unlock(&session->s_cap_lock);
 875}
 876
 877/*
 878 * Helper to safely iterate over all caps associated with a session, with
 879 * special care taken to handle a racing __ceph_remove_cap().
 880 *
 881 * Caller must hold session s_mutex.
 882 */
 883static int iterate_session_caps(struct ceph_mds_session *session,
 884				 int (*cb)(struct inode *, struct ceph_cap *,
 885					    void *), void *arg)
 886{
 887	struct list_head *p;
 888	struct ceph_cap *cap;
 889	struct inode *inode, *last_inode = NULL;
 890	struct ceph_cap *old_cap = NULL;
 891	int ret;
 892
 893	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
 894	spin_lock(&session->s_cap_lock);
 895	p = session->s_caps.next;
 896	while (p != &session->s_caps) {
 897		cap = list_entry(p, struct ceph_cap, session_caps);
 898		inode = igrab(&cap->ci->vfs_inode);
 899		if (!inode) {
 900			p = p->next;
 901			continue;
 902		}
 903		session->s_cap_iterator = cap;
 904		spin_unlock(&session->s_cap_lock);
 905
 906		if (last_inode) {
 907			iput(last_inode);
 908			last_inode = NULL;
 909		}
 910		if (old_cap) {
 911			ceph_put_cap(session->s_mdsc, old_cap);
 912			old_cap = NULL;
 913		}
 914
 915		ret = cb(inode, cap, arg);
 916		last_inode = inode;
 917
 918		spin_lock(&session->s_cap_lock);
 919		p = p->next;
 920		if (cap->ci == NULL) {
 921			dout("iterate_session_caps  finishing cap %p removal\n",
 922			     cap);
 923			BUG_ON(cap->session != session);
 924			list_del_init(&cap->session_caps);
 925			session->s_nr_caps--;
 926			cap->session = NULL;
 927			old_cap = cap;  /* put_cap it w/o locks held */
 928		}
 929		if (ret < 0)
 930			goto out;
 931	}
 932	ret = 0;
 933out:
 934	session->s_cap_iterator = NULL;
 935	spin_unlock(&session->s_cap_lock);
 936
 937	if (last_inode)
 938		iput(last_inode);
 939	if (old_cap)
 940		ceph_put_cap(session->s_mdsc, old_cap);
 941
 942	return ret;
 943}
 944
 945static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 946				  void *arg)
 947{
 948	struct ceph_inode_info *ci = ceph_inode(inode);
 949	int drop = 0;
 950
 951	dout("removing cap %p, ci is %p, inode is %p\n",
 952	     cap, ci, &ci->vfs_inode);
 953	spin_lock(&inode->i_lock);
 954	__ceph_remove_cap(cap);
 955	if (!__ceph_is_any_real_caps(ci)) {
 956		struct ceph_mds_client *mdsc =
 957			ceph_sb_to_client(inode->i_sb)->mdsc;
 958
 959		spin_lock(&mdsc->cap_dirty_lock);
 960		if (!list_empty(&ci->i_dirty_item)) {
 961			pr_info(" dropping dirty %s state for %p %lld\n",
 962				ceph_cap_string(ci->i_dirty_caps),
 963				inode, ceph_ino(inode));
 964			ci->i_dirty_caps = 0;
 965			list_del_init(&ci->i_dirty_item);
 966			drop = 1;
 967		}
 968		if (!list_empty(&ci->i_flushing_item)) {
 969			pr_info(" dropping dirty+flushing %s state for %p %lld\n",
 970				ceph_cap_string(ci->i_flushing_caps),
 971				inode, ceph_ino(inode));
 972			ci->i_flushing_caps = 0;
 973			list_del_init(&ci->i_flushing_item);
 974			mdsc->num_cap_flushing--;
 975			drop = 1;
 976		}
 977		if (drop && ci->i_wrbuffer_ref) {
 978			pr_info(" dropping dirty data for %p %lld\n",
 979				inode, ceph_ino(inode));
 980			ci->i_wrbuffer_ref = 0;
 981			ci->i_wrbuffer_ref_head = 0;
 982			drop++;
 983		}
 984		spin_unlock(&mdsc->cap_dirty_lock);
 985	}
 986	spin_unlock(&inode->i_lock);
 987	while (drop--)
 988		iput(inode);
 989	return 0;
 990}
 991
 992/*
 993 * caller must hold session s_mutex
 994 */
 995static void remove_session_caps(struct ceph_mds_session *session)
 996{
 997	dout("remove_session_caps on %p\n", session);
 998	iterate_session_caps(session, remove_session_caps_cb, NULL);
 999	BUG_ON(session->s_nr_caps > 0);
1000	BUG_ON(!list_empty(&session->s_cap_flushing));
1001	cleanup_cap_releases(session);
1002}
1003
1004/*
1005 * wake up any threads waiting on this session's caps.  if the cap is
1006 * old (didn't get renewed on the client reconnect), remove it now.
1007 *
1008 * caller must hold s_mutex.
1009 */
1010static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1011			      void *arg)
1012{
1013	struct ceph_inode_info *ci = ceph_inode(inode);
1014
1015	wake_up_all(&ci->i_cap_wq);
1016	if (arg) {
1017		spin_lock(&inode->i_lock);
1018		ci->i_wanted_max_size = 0;
1019		ci->i_requested_max_size = 0;
1020		spin_unlock(&inode->i_lock);
1021	}
1022	return 0;
1023}
1024
1025static void wake_up_session_caps(struct ceph_mds_session *session,
1026				 int reconnect)
1027{
1028	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1029	iterate_session_caps(session, wake_up_session_cb,
1030			     (void *)(unsigned long)reconnect);
1031}
1032
1033/*
1034 * Send periodic message to MDS renewing all currently held caps.  The
1035 * ack will reset the expiration for all caps from this session.
1036 *
1037 * caller holds s_mutex
1038 */
1039static int send_renew_caps(struct ceph_mds_client *mdsc,
1040			   struct ceph_mds_session *session)
1041{
1042	struct ceph_msg *msg;
1043	int state;
1044
1045	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1046	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1047		pr_info("mds%d caps stale\n", session->s_mds);
1048	session->s_renew_requested = jiffies;
1049
1050	/* do not try to renew caps until a recovering mds has reconnected
1051	 * with its clients. */
1052	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1053	if (state < CEPH_MDS_STATE_RECONNECT) {
1054		dout("send_renew_caps ignoring mds%d (%s)\n",
1055		     session->s_mds, ceph_mds_state_name(state));
1056		return 0;
1057	}
1058
1059	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1060		ceph_mds_state_name(state));
1061	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1062				 ++session->s_renew_seq);
1063	if (!msg)
1064		return -ENOMEM;
1065	ceph_con_send(&session->s_con, msg);
1066	return 0;
1067}
1068
1069/*
1070 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1071 *
1072 * Called under session->s_mutex
1073 */
1074static void renewed_caps(struct ceph_mds_client *mdsc,
1075			 struct ceph_mds_session *session, int is_renew)
1076{
1077	int was_stale;
1078	int wake = 0;
1079
1080	spin_lock(&session->s_cap_lock);
1081	was_stale = is_renew && (session->s_cap_ttl == 0 ||
1082				 time_after_eq(jiffies, session->s_cap_ttl));
1083
1084	session->s_cap_ttl = session->s_renew_requested +
1085		mdsc->mdsmap->m_session_timeout*HZ;
1086
1087	if (was_stale) {
1088		if (time_before(jiffies, session->s_cap_ttl)) {
1089			pr_info("mds%d caps renewed\n", session->s_mds);
1090			wake = 1;
1091		} else {
1092			pr_info("mds%d caps still stale\n", session->s_mds);
1093		}
1094	}
1095	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1096	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1097	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1098	spin_unlock(&session->s_cap_lock);
1099
1100	if (wake)
1101		wake_up_session_caps(session, 0);
1102}
1103
1104/*
1105 * send a session close request
1106 */
1107static int request_close_session(struct ceph_mds_client *mdsc,
1108				 struct ceph_mds_session *session)
1109{
1110	struct ceph_msg *msg;
1111
1112	dout("request_close_session mds%d state %s seq %lld\n",
1113	     session->s_mds, session_state_name(session->s_state),
1114	     session->s_seq);
1115	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1116	if (!msg)
1117		return -ENOMEM;
1118	ceph_con_send(&session->s_con, msg);
1119	return 0;
1120}
1121
1122/*
1123 * Called with s_mutex held.
1124 */
1125static int __close_session(struct ceph_mds_client *mdsc,
1126			 struct ceph_mds_session *session)
1127{
1128	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1129		return 0;
1130	session->s_state = CEPH_MDS_SESSION_CLOSING;
1131	return request_close_session(mdsc, session);
1132}
1133
1134/*
1135 * Trim old(er) caps.
1136 *
1137 * Because we can't cache an inode without one or more caps, we do
1138 * this indirectly: if a cap is unused, we prune its aliases, at which
1139 * point the inode will hopefully get dropped to.
1140 *
1141 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1142 * memory pressure from the MDS, though, so it needn't be perfect.
1143 */
1144static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1145{
1146	struct ceph_mds_session *session = arg;
1147	struct ceph_inode_info *ci = ceph_inode(inode);
1148	int used, oissued, mine;
1149
1150	if (session->s_trim_caps <= 0)
1151		return -1;
1152
1153	spin_lock(&inode->i_lock);
1154	mine = cap->issued | cap->implemented;
1155	used = __ceph_caps_used(ci);
1156	oissued = __ceph_caps_issued_other(ci, cap);
1157
1158	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1159	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1160	     ceph_cap_string(used));
1161	if (ci->i_dirty_caps)
1162		goto out;   /* dirty caps */
1163	if ((used & ~oissued) & mine)
1164		goto out;   /* we need these caps */
1165
1166	session->s_trim_caps--;
1167	if (oissued) {
1168		/* we aren't the only cap.. just remove us */
1169		__ceph_remove_cap(cap);
1170	} else {
1171		/* try to drop referring dentries */
1172		spin_unlock(&inode->i_lock);
1173		d_prune_aliases(inode);
1174		dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1175		     inode, cap, atomic_read(&inode->i_count));
1176		return 0;
1177	}
1178
1179out:
1180	spin_unlock(&inode->i_lock);
1181	return 0;
1182}
1183
1184/*
1185 * Trim session cap count down to some max number.
1186 */
1187static int trim_caps(struct ceph_mds_client *mdsc,
1188		     struct ceph_mds_session *session,
1189		     int max_caps)
1190{
1191	int trim_caps = session->s_nr_caps - max_caps;
1192
1193	dout("trim_caps mds%d start: %d / %d, trim %d\n",
1194	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1195	if (trim_caps > 0) {
1196		session->s_trim_caps = trim_caps;
1197		iterate_session_caps(session, trim_caps_cb, session);
1198		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1199		     session->s_mds, session->s_nr_caps, max_caps,
1200			trim_caps - session->s_trim_caps);
1201		session->s_trim_caps = 0;
1202	}
1203	return 0;
1204}
1205
1206/*
1207 * Allocate cap_release messages.  If there is a partially full message
1208 * in the queue, try to allocate enough to cover it's remainder, so that
1209 * we can send it immediately.
1210 *
1211 * Called under s_mutex.
1212 */
1213int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1214			  struct ceph_mds_session *session)
1215{
1216	struct ceph_msg *msg, *partial = NULL;
1217	struct ceph_mds_cap_release *head;
1218	int err = -ENOMEM;
1219	int extra = mdsc->fsc->mount_options->cap_release_safety;
1220	int num;
1221
1222	dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1223	     extra);
1224
1225	spin_lock(&session->s_cap_lock);
1226
1227	if (!list_empty(&session->s_cap_releases)) {
1228		msg = list_first_entry(&session->s_cap_releases,
1229				       struct ceph_msg,
1230				 list_head);
1231		head = msg->front.iov_base;
1232		num = le32_to_cpu(head->num);
1233		if (num) {
1234			dout(" partial %p with (%d/%d)\n", msg, num,
1235			     (int)CEPH_CAPS_PER_RELEASE);
1236			extra += CEPH_CAPS_PER_RELEASE - num;
1237			partial = msg;
1238		}
1239	}
1240	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1241		spin_unlock(&session->s_cap_lock);
1242		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1243				   GFP_NOFS);
1244		if (!msg)
1245			goto out_unlocked;
1246		dout("add_cap_releases %p msg %p now %d\n", session, msg,
1247		     (int)msg->front.iov_len);
1248		head = msg->front.iov_base;
1249		head->num = cpu_to_le32(0);
1250		msg->front.iov_len = sizeof(*head);
1251		spin_lock(&session->s_cap_lock);
1252		list_add(&msg->list_head, &session->s_cap_releases);
1253		session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1254	}
1255
1256	if (partial) {
1257		head = partial->front.iov_base;
1258		num = le32_to_cpu(head->num);
1259		dout(" queueing partial %p with %d/%d\n", partial, num,
1260		     (int)CEPH_CAPS_PER_RELEASE);
1261		list_move_tail(&partial->list_head,
1262			       &session->s_cap_releases_done);
1263		session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1264	}
1265	err = 0;
1266	spin_unlock(&session->s_cap_lock);
1267out_unlocked:
1268	return err;
1269}
1270
1271/*
1272 * flush all dirty inode data to disk.
1273 *
1274 * returns true if we've flushed through want_flush_seq
1275 */
1276static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1277{
1278	int mds, ret = 1;
1279
1280	dout("check_cap_flush want %lld\n", want_flush_seq);
1281	mutex_lock(&mdsc->mutex);
1282	for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1283		struct ceph_mds_session *session = mdsc->sessions[mds];
1284
1285		if (!session)
1286			continue;
1287		get_session(session);
1288		mutex_unlock(&mdsc->mutex);
1289
1290		mutex_lock(&session->s_mutex);
1291		if (!list_empty(&session->s_cap_flushing)) {
1292			struct ceph_inode_info *ci =
1293				list_entry(session->s_cap_flushing.next,
1294					   struct ceph_inode_info,
1295					   i_flushing_item);
1296			struct inode *inode = &ci->vfs_inode;
1297
1298			spin_lock(&inode->i_lock);
1299			if (ci->i_cap_flush_seq <= want_flush_seq) {
1300				dout("check_cap_flush still flushing %p "
1301				     "seq %lld <= %lld to mds%d\n", inode,
1302				     ci->i_cap_flush_seq, want_flush_seq,
1303				     session->s_mds);
1304				ret = 0;
1305			}
1306			spin_unlock(&inode->i_lock);
1307		}
1308		mutex_unlock(&session->s_mutex);
1309		ceph_put_mds_session(session);
1310
1311		if (!ret)
1312			return ret;
1313		mutex_lock(&mdsc->mutex);
1314	}
1315
1316	mutex_unlock(&mdsc->mutex);
1317	dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1318	return ret;
1319}
1320
1321/*
1322 * called under s_mutex
1323 */
1324void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1325			    struct ceph_mds_session *session)
1326{
1327	struct ceph_msg *msg;
1328
1329	dout("send_cap_releases mds%d\n", session->s_mds);
1330	spin_lock(&session->s_cap_lock);
1331	while (!list_empty(&session->s_cap_releases_done)) {
1332		msg = list_first_entry(&session->s_cap_releases_done,
1333				 struct ceph_msg, list_head);
1334		list_del_init(&msg->list_head);
1335		spin_unlock(&session->s_cap_lock);
1336		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1337		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1338		ceph_con_send(&session->s_con, msg);
1339		spin_lock(&session->s_cap_lock);
1340	}
1341	spin_unlock(&session->s_cap_lock);
1342}
1343
1344static void discard_cap_releases(struct ceph_mds_client *mdsc,
1345				 struct ceph_mds_session *session)
1346{
1347	struct ceph_msg *msg;
1348	struct ceph_mds_cap_release *head;
1349	unsigned num;
1350
1351	dout("discard_cap_releases mds%d\n", session->s_mds);
1352	spin_lock(&session->s_cap_lock);
1353
1354	/* zero out the in-progress message */
1355	msg = list_first_entry(&session->s_cap_releases,
1356			       struct ceph_msg, list_head);
1357	head = msg->front.iov_base;
1358	num = le32_to_cpu(head->num);
1359	dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1360	head->num = cpu_to_le32(0);
1361	session->s_num_cap_releases += num;
1362
1363	/* requeue completed messages */
1364	while (!list_empty(&session->s_cap_releases_done)) {
1365		msg = list_first_entry(&session->s_cap_releases_done,
1366				 struct ceph_msg, list_head);
1367		list_del_init(&msg->list_head);
1368
1369		head = msg->front.iov_base;
1370		num = le32_to_cpu(head->num);
1371		dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1372		     num);
1373		session->s_num_cap_releases += num;
1374		head->num = cpu_to_le32(0);
1375		msg->front.iov_len = sizeof(*head);
1376		list_add(&msg->list_head, &session->s_cap_releases);
1377	}
1378
1379	spin_unlock(&session->s_cap_lock);
1380}
1381
1382/*
1383 * requests
1384 */
1385
1386/*
1387 * Create an mds request.
1388 */
1389struct ceph_mds_request *
1390ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1391{
1392	struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1393
1394	if (!req)
1395		return ERR_PTR(-ENOMEM);
1396
1397	mutex_init(&req->r_fill_mutex);
1398	req->r_mdsc = mdsc;
1399	req->r_started = jiffies;
1400	req->r_resend_mds = -1;
1401	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1402	req->r_fmode = -1;
1403	kref_init(&req->r_kref);
1404	INIT_LIST_HEAD(&req->r_wait);
1405	init_completion(&req->r_completion);
1406	init_completion(&req->r_safe_completion);
1407	INIT_LIST_HEAD(&req->r_unsafe_item);
1408
1409	req->r_op = op;
1410	req->r_direct_mode = mode;
1411	return req;
1412}
1413
1414/*
1415 * return oldest (lowest) request, tid in request tree, 0 if none.
1416 *
1417 * called under mdsc->mutex.
1418 */
1419static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1420{
1421	if (RB_EMPTY_ROOT(&mdsc->request_tree))
1422		return NULL;
1423	return rb_entry(rb_first(&mdsc->request_tree),
1424			struct ceph_mds_request, r_node);
1425}
1426
1427static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1428{
1429	struct ceph_mds_request *req = __get_oldest_req(mdsc);
1430
1431	if (req)
1432		return req->r_tid;
1433	return 0;
1434}
1435
1436/*
1437 * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1438 * on build_path_from_dentry in fs/cifs/dir.c.
1439 *
1440 * If @stop_on_nosnap, generate path relative to the first non-snapped
1441 * inode.
1442 *
1443 * Encode hidden .snap dirs as a double /, i.e.
1444 *   foo/.snap/bar -> foo//bar
1445 */
1446char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1447			   int stop_on_nosnap)
1448{
1449	struct dentry *temp;
1450	char *path;
1451	int len, pos;
1452	unsigned seq;
1453
1454	if (dentry == NULL)
1455		return ERR_PTR(-EINVAL);
1456
1457retry:
1458	len = 0;
1459	seq = read_seqbegin(&rename_lock);
1460	rcu_read_lock();
1461	for (temp = dentry; !IS_ROOT(temp);) {
1462		struct inode *inode = temp->d_inode;
1463		if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1464			len++;  /* slash only */
1465		else if (stop_on_nosnap && inode &&
1466			 ceph_snap(inode) == CEPH_NOSNAP)
1467			break;
1468		else
1469			len += 1 + temp->d_name.len;
1470		temp = temp->d_parent;
1471		if (temp == NULL) {
1472			rcu_read_unlock();
1473			pr_err("build_path corrupt dentry %p\n", dentry);
1474			return ERR_PTR(-EINVAL);
1475		}
1476	}
1477	rcu_read_unlock();
1478	if (len)
1479		len--;  /* no leading '/' */
1480
1481	path = kmalloc(len+1, GFP_NOFS);
1482	if (path == NULL)
1483		return ERR_PTR(-ENOMEM);
1484	pos = len;
1485	path[pos] = 0;	/* trailing null */
1486	rcu_read_lock();
1487	for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1488		struct inode *inode;
1489
1490		spin_lock(&temp->d_lock);
1491		inode = temp->d_inode;
1492		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1493			dout("build_path path+%d: %p SNAPDIR\n",
1494			     pos, temp);
1495		} else if (stop_on_nosnap && inode &&
1496			   ceph_snap(inode) == CEPH_NOSNAP) {
 
1497			break;
1498		} else {
1499			pos -= temp->d_name.len;
1500			if (pos < 0) {
1501				spin_unlock(&temp->d_lock);
1502				break;
1503			}
1504			strncpy(path + pos, temp->d_name.name,
1505				temp->d_name.len);
1506		}
1507		spin_unlock(&temp->d_lock);
1508		if (pos)
1509			path[--pos] = '/';
1510		temp = temp->d_parent;
1511		if (temp == NULL) {
1512			rcu_read_unlock();
1513			pr_err("build_path corrupt dentry\n");
1514			kfree(path);
1515			return ERR_PTR(-EINVAL);
1516		}
1517	}
1518	rcu_read_unlock();
1519	if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1520		pr_err("build_path did not end path lookup where "
1521		       "expected, namelen is %d, pos is %d\n", len, pos);
1522		/* presumably this is only possible if racing with a
1523		   rename of one of the parent directories (we can not
1524		   lock the dentries above us to prevent this, but
1525		   retrying should be harmless) */
1526		kfree(path);
1527		goto retry;
1528	}
1529
1530	*base = ceph_ino(temp->d_inode);
1531	*plen = len;
1532	dout("build_path on %p %d built %llx '%.*s'\n",
1533	     dentry, dentry->d_count, *base, len, path);
1534	return path;
1535}
1536
1537static int build_dentry_path(struct dentry *dentry,
1538			     const char **ppath, int *ppathlen, u64 *pino,
1539			     int *pfreepath)
1540{
1541	char *path;
1542
1543	if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1544		*pino = ceph_ino(dentry->d_parent->d_inode);
1545		*ppath = dentry->d_name.name;
1546		*ppathlen = dentry->d_name.len;
1547		return 0;
1548	}
1549	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1550	if (IS_ERR(path))
1551		return PTR_ERR(path);
1552	*ppath = path;
1553	*pfreepath = 1;
1554	return 0;
1555}
1556
1557static int build_inode_path(struct inode *inode,
1558			    const char **ppath, int *ppathlen, u64 *pino,
1559			    int *pfreepath)
1560{
1561	struct dentry *dentry;
1562	char *path;
1563
1564	if (ceph_snap(inode) == CEPH_NOSNAP) {
1565		*pino = ceph_ino(inode);
1566		*ppathlen = 0;
1567		return 0;
1568	}
1569	dentry = d_find_alias(inode);
1570	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1571	dput(dentry);
1572	if (IS_ERR(path))
1573		return PTR_ERR(path);
1574	*ppath = path;
1575	*pfreepath = 1;
1576	return 0;
1577}
1578
1579/*
1580 * request arguments may be specified via an inode *, a dentry *, or
1581 * an explicit ino+path.
1582 */
1583static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1584				  const char *rpath, u64 rino,
1585				  const char **ppath, int *pathlen,
1586				  u64 *ino, int *freepath)
1587{
1588	int r = 0;
1589
1590	if (rinode) {
1591		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1592		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1593		     ceph_snap(rinode));
1594	} else if (rdentry) {
1595		r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1596		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1597		     *ppath);
1598	} else if (rpath || rino) {
1599		*ino = rino;
1600		*ppath = rpath;
1601		*pathlen = strlen(rpath);
1602		dout(" path %.*s\n", *pathlen, rpath);
1603	}
1604
1605	return r;
1606}
1607
1608/*
1609 * called under mdsc->mutex
1610 */
1611static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1612					       struct ceph_mds_request *req,
1613					       int mds)
1614{
1615	struct ceph_msg *msg;
1616	struct ceph_mds_request_head *head;
1617	const char *path1 = NULL;
1618	const char *path2 = NULL;
1619	u64 ino1 = 0, ino2 = 0;
1620	int pathlen1 = 0, pathlen2 = 0;
1621	int freepath1 = 0, freepath2 = 0;
1622	int len;
1623	u16 releases;
1624	void *p, *end;
1625	int ret;
1626
1627	ret = set_request_path_attr(req->r_inode, req->r_dentry,
1628			      req->r_path1, req->r_ino1.ino,
1629			      &path1, &pathlen1, &ino1, &freepath1);
1630	if (ret < 0) {
1631		msg = ERR_PTR(ret);
1632		goto out;
1633	}
1634
1635	ret = set_request_path_attr(NULL, req->r_old_dentry,
1636			      req->r_path2, req->r_ino2.ino,
1637			      &path2, &pathlen2, &ino2, &freepath2);
1638	if (ret < 0) {
1639		msg = ERR_PTR(ret);
1640		goto out_free1;
1641	}
1642
1643	len = sizeof(*head) +
1644		pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1645
1646	/* calculate (max) length for cap releases */
1647	len += sizeof(struct ceph_mds_request_release) *
1648		(!!req->r_inode_drop + !!req->r_dentry_drop +
1649		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1650	if (req->r_dentry_drop)
1651		len += req->r_dentry->d_name.len;
1652	if (req->r_old_dentry_drop)
1653		len += req->r_old_dentry->d_name.len;
1654
1655	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
1656	if (!msg) {
1657		msg = ERR_PTR(-ENOMEM);
1658		goto out_free2;
1659	}
1660
1661	msg->hdr.tid = cpu_to_le64(req->r_tid);
1662
1663	head = msg->front.iov_base;
1664	p = msg->front.iov_base + sizeof(*head);
1665	end = msg->front.iov_base + msg->front.iov_len;
1666
1667	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1668	head->op = cpu_to_le32(req->r_op);
1669	head->caller_uid = cpu_to_le32(req->r_uid);
1670	head->caller_gid = cpu_to_le32(req->r_gid);
1671	head->args = req->r_args;
1672
1673	ceph_encode_filepath(&p, end, ino1, path1);
1674	ceph_encode_filepath(&p, end, ino2, path2);
1675
1676	/* make note of release offset, in case we need to replay */
1677	req->r_request_release_offset = p - msg->front.iov_base;
1678
1679	/* cap releases */
1680	releases = 0;
1681	if (req->r_inode_drop)
1682		releases += ceph_encode_inode_release(&p,
1683		      req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1684		      mds, req->r_inode_drop, req->r_inode_unless, 0);
1685	if (req->r_dentry_drop)
1686		releases += ceph_encode_dentry_release(&p, req->r_dentry,
1687		       mds, req->r_dentry_drop, req->r_dentry_unless);
1688	if (req->r_old_dentry_drop)
1689		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1690		       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1691	if (req->r_old_inode_drop)
1692		releases += ceph_encode_inode_release(&p,
1693		      req->r_old_dentry->d_inode,
1694		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1695	head->num_releases = cpu_to_le16(releases);
1696
1697	BUG_ON(p > end);
1698	msg->front.iov_len = p - msg->front.iov_base;
1699	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1700
1701	msg->pages = req->r_pages;
1702	msg->nr_pages = req->r_num_pages;
1703	msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1704	msg->hdr.data_off = cpu_to_le16(0);
1705
1706out_free2:
1707	if (freepath2)
1708		kfree((char *)path2);
1709out_free1:
1710	if (freepath1)
1711		kfree((char *)path1);
1712out:
1713	return msg;
1714}
1715
1716/*
1717 * called under mdsc->mutex if error, under no mutex if
1718 * success.
1719 */
1720static void complete_request(struct ceph_mds_client *mdsc,
1721			     struct ceph_mds_request *req)
1722{
1723	if (req->r_callback)
1724		req->r_callback(mdsc, req);
1725	else
1726		complete_all(&req->r_completion);
1727}
1728
1729/*
1730 * called under mdsc->mutex
1731 */
1732static int __prepare_send_request(struct ceph_mds_client *mdsc,
1733				  struct ceph_mds_request *req,
1734				  int mds)
1735{
1736	struct ceph_mds_request_head *rhead;
1737	struct ceph_msg *msg;
1738	int flags = 0;
1739
1740	req->r_attempts++;
1741	if (req->r_inode) {
1742		struct ceph_cap *cap =
1743			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1744
1745		if (cap)
1746			req->r_sent_on_mseq = cap->mseq;
1747		else
1748			req->r_sent_on_mseq = -1;
1749	}
1750	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1751	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1752
1753	if (req->r_got_unsafe) {
1754		/*
1755		 * Replay.  Do not regenerate message (and rebuild
1756		 * paths, etc.); just use the original message.
1757		 * Rebuilding paths will break for renames because
1758		 * d_move mangles the src name.
1759		 */
1760		msg = req->r_request;
1761		rhead = msg->front.iov_base;
1762
1763		flags = le32_to_cpu(rhead->flags);
1764		flags |= CEPH_MDS_FLAG_REPLAY;
1765		rhead->flags = cpu_to_le32(flags);
1766
1767		if (req->r_target_inode)
1768			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1769
1770		rhead->num_retry = req->r_attempts - 1;
1771
1772		/* remove cap/dentry releases from message */
1773		rhead->num_releases = 0;
1774		msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1775		msg->front.iov_len = req->r_request_release_offset;
1776		return 0;
1777	}
1778
1779	if (req->r_request) {
1780		ceph_msg_put(req->r_request);
1781		req->r_request = NULL;
1782	}
1783	msg = create_request_message(mdsc, req, mds);
1784	if (IS_ERR(msg)) {
1785		req->r_err = PTR_ERR(msg);
1786		complete_request(mdsc, req);
1787		return PTR_ERR(msg);
1788	}
1789	req->r_request = msg;
1790
1791	rhead = msg->front.iov_base;
1792	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1793	if (req->r_got_unsafe)
1794		flags |= CEPH_MDS_FLAG_REPLAY;
1795	if (req->r_locked_dir)
1796		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1797	rhead->flags = cpu_to_le32(flags);
1798	rhead->num_fwd = req->r_num_fwd;
1799	rhead->num_retry = req->r_attempts - 1;
1800	rhead->ino = 0;
1801
1802	dout(" r_locked_dir = %p\n", req->r_locked_dir);
1803	return 0;
1804}
1805
1806/*
1807 * send request, or put it on the appropriate wait list.
1808 */
1809static int __do_request(struct ceph_mds_client *mdsc,
1810			struct ceph_mds_request *req)
1811{
1812	struct ceph_mds_session *session = NULL;
1813	int mds = -1;
1814	int err = -EAGAIN;
1815
1816	if (req->r_err || req->r_got_result)
1817		goto out;
1818
1819	if (req->r_timeout &&
1820	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1821		dout("do_request timed out\n");
1822		err = -EIO;
1823		goto finish;
1824	}
1825
1826	put_request_session(req);
1827
1828	mds = __choose_mds(mdsc, req);
1829	if (mds < 0 ||
1830	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1831		dout("do_request no mds or not active, waiting for map\n");
1832		list_add(&req->r_wait, &mdsc->waiting_for_map);
1833		goto out;
1834	}
1835
1836	/* get, open session */
1837	session = __ceph_lookup_mds_session(mdsc, mds);
1838	if (!session) {
1839		session = register_session(mdsc, mds);
1840		if (IS_ERR(session)) {
1841			err = PTR_ERR(session);
1842			goto finish;
1843		}
1844	}
1845	req->r_session = get_session(session);
1846
1847	dout("do_request mds%d session %p state %s\n", mds, session,
1848	     session_state_name(session->s_state));
1849	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1850	    session->s_state != CEPH_MDS_SESSION_HUNG) {
1851		if (session->s_state == CEPH_MDS_SESSION_NEW ||
1852		    session->s_state == CEPH_MDS_SESSION_CLOSING)
1853			__open_session(mdsc, session);
1854		list_add(&req->r_wait, &session->s_waiting);
1855		goto out_session;
1856	}
1857
1858	/* send request */
1859	req->r_resend_mds = -1;   /* forget any previous mds hint */
1860
1861	if (req->r_request_started == 0)   /* note request start time */
1862		req->r_request_started = jiffies;
1863
1864	err = __prepare_send_request(mdsc, req, mds);
1865	if (!err) {
1866		ceph_msg_get(req->r_request);
1867		ceph_con_send(&session->s_con, req->r_request);
1868	}
1869
1870out_session:
1871	ceph_put_mds_session(session);
1872out:
1873	return err;
1874
1875finish:
1876	req->r_err = err;
1877	complete_request(mdsc, req);
1878	goto out;
1879}
1880
1881/*
1882 * called under mdsc->mutex
1883 */
1884static void __wake_requests(struct ceph_mds_client *mdsc,
1885			    struct list_head *head)
1886{
1887	struct ceph_mds_request *req, *nreq;
1888
1889	list_for_each_entry_safe(req, nreq, head, r_wait) {
1890		list_del_init(&req->r_wait);
1891		__do_request(mdsc, req);
1892	}
1893}
1894
1895/*
1896 * Wake up threads with requests pending for @mds, so that they can
1897 * resubmit their requests to a possibly different mds.
1898 */
1899static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1900{
1901	struct ceph_mds_request *req;
1902	struct rb_node *p;
1903
1904	dout("kick_requests mds%d\n", mds);
1905	for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1906		req = rb_entry(p, struct ceph_mds_request, r_node);
1907		if (req->r_got_unsafe)
1908			continue;
1909		if (req->r_session &&
1910		    req->r_session->s_mds == mds) {
1911			dout(" kicking tid %llu\n", req->r_tid);
1912			__do_request(mdsc, req);
1913		}
1914	}
1915}
1916
1917void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1918			      struct ceph_mds_request *req)
1919{
1920	dout("submit_request on %p\n", req);
1921	mutex_lock(&mdsc->mutex);
1922	__register_request(mdsc, req, NULL);
1923	__do_request(mdsc, req);
1924	mutex_unlock(&mdsc->mutex);
1925}
1926
1927/*
1928 * Synchrously perform an mds request.  Take care of all of the
1929 * session setup, forwarding, retry details.
1930 */
1931int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1932			 struct inode *dir,
1933			 struct ceph_mds_request *req)
1934{
1935	int err;
1936
1937	dout("do_request on %p\n", req);
1938
1939	/* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1940	if (req->r_inode)
1941		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1942	if (req->r_locked_dir)
1943		ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1944	if (req->r_old_dentry)
1945		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
1946				  CEPH_CAP_PIN);
1947
1948	/* issue */
1949	mutex_lock(&mdsc->mutex);
1950	__register_request(mdsc, req, dir);
1951	__do_request(mdsc, req);
1952
1953	if (req->r_err) {
1954		err = req->r_err;
1955		__unregister_request(mdsc, req);
1956		dout("do_request early error %d\n", err);
1957		goto out;
1958	}
1959
1960	/* wait */
1961	mutex_unlock(&mdsc->mutex);
1962	dout("do_request waiting\n");
1963	if (req->r_timeout) {
1964		err = (long)wait_for_completion_killable_timeout(
1965			&req->r_completion, req->r_timeout);
1966		if (err == 0)
1967			err = -EIO;
1968	} else {
1969		err = wait_for_completion_killable(&req->r_completion);
1970	}
1971	dout("do_request waited, got %d\n", err);
1972	mutex_lock(&mdsc->mutex);
1973
1974	/* only abort if we didn't race with a real reply */
1975	if (req->r_got_result) {
1976		err = le32_to_cpu(req->r_reply_info.head->result);
1977	} else if (err < 0) {
1978		dout("aborted request %lld with %d\n", req->r_tid, err);
1979
1980		/*
1981		 * ensure we aren't running concurrently with
1982		 * ceph_fill_trace or ceph_readdir_prepopulate, which
1983		 * rely on locks (dir mutex) held by our caller.
1984		 */
1985		mutex_lock(&req->r_fill_mutex);
1986		req->r_err = err;
1987		req->r_aborted = true;
1988		mutex_unlock(&req->r_fill_mutex);
1989
1990		if (req->r_locked_dir &&
1991		    (req->r_op & CEPH_MDS_OP_WRITE))
1992			ceph_invalidate_dir_request(req);
1993	} else {
1994		err = req->r_err;
1995	}
1996
1997out:
1998	mutex_unlock(&mdsc->mutex);
1999	dout("do_request %p done, result %d\n", req, err);
2000	return err;
2001}
2002
2003/*
2004 * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
2005 * namespace request.
2006 */
2007void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2008{
2009	struct inode *inode = req->r_locked_dir;
2010	struct ceph_inode_info *ci = ceph_inode(inode);
2011
2012	dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
2013	spin_lock(&inode->i_lock);
2014	ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
2015	ci->i_release_count++;
2016	spin_unlock(&inode->i_lock);
2017
2018	if (req->r_dentry)
2019		ceph_invalidate_dentry_lease(req->r_dentry);
2020	if (req->r_old_dentry)
2021		ceph_invalidate_dentry_lease(req->r_old_dentry);
2022}
2023
2024/*
2025 * Handle mds reply.
2026 *
2027 * We take the session mutex and parse and process the reply immediately.
2028 * This preserves the logical ordering of replies, capabilities, etc., sent
2029 * by the MDS as they are applied to our local cache.
2030 */
2031static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2032{
2033	struct ceph_mds_client *mdsc = session->s_mdsc;
2034	struct ceph_mds_request *req;
2035	struct ceph_mds_reply_head *head = msg->front.iov_base;
2036	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2037	u64 tid;
2038	int err, result;
2039	int mds = session->s_mds;
2040
2041	if (msg->front.iov_len < sizeof(*head)) {
2042		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2043		ceph_msg_dump(msg);
2044		return;
2045	}
2046
2047	/* get request, session */
2048	tid = le64_to_cpu(msg->hdr.tid);
2049	mutex_lock(&mdsc->mutex);
2050	req = __lookup_request(mdsc, tid);
2051	if (!req) {
2052		dout("handle_reply on unknown tid %llu\n", tid);
2053		mutex_unlock(&mdsc->mutex);
2054		return;
2055	}
2056	dout("handle_reply %p\n", req);
2057
2058	/* correct session? */
2059	if (req->r_session != session) {
2060		pr_err("mdsc_handle_reply got %llu on session mds%d"
2061		       " not mds%d\n", tid, session->s_mds,
2062		       req->r_session ? req->r_session->s_mds : -1);
2063		mutex_unlock(&mdsc->mutex);
2064		goto out;
2065	}
2066
2067	/* dup? */
2068	if ((req->r_got_unsafe && !head->safe) ||
2069	    (req->r_got_safe && head->safe)) {
2070		pr_warning("got a dup %s reply on %llu from mds%d\n",
2071			   head->safe ? "safe" : "unsafe", tid, mds);
2072		mutex_unlock(&mdsc->mutex);
2073		goto out;
2074	}
2075	if (req->r_got_safe && !head->safe) {
2076		pr_warning("got unsafe after safe on %llu from mds%d\n",
2077			   tid, mds);
2078		mutex_unlock(&mdsc->mutex);
2079		goto out;
2080	}
2081
2082	result = le32_to_cpu(head->result);
2083
2084	/*
2085	 * Handle an ESTALE
2086	 * if we're not talking to the authority, send to them
2087	 * if the authority has changed while we weren't looking,
2088	 * send to new authority
2089	 * Otherwise we just have to return an ESTALE
2090	 */
2091	if (result == -ESTALE) {
2092		dout("got ESTALE on request %llu", req->r_tid);
2093		if (!req->r_inode) {
2094			/* do nothing; not an authority problem */
2095		} else if (req->r_direct_mode != USE_AUTH_MDS) {
2096			dout("not using auth, setting for that now");
2097			req->r_direct_mode = USE_AUTH_MDS;
2098			__do_request(mdsc, req);
2099			mutex_unlock(&mdsc->mutex);
2100			goto out;
2101		} else  {
2102			struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2103			struct ceph_cap *cap = NULL;
2104
2105			if (req->r_session)
2106				cap = ceph_get_cap_for_mds(ci,
2107						   req->r_session->s_mds);
2108
2109			dout("already using auth");
2110			if ((!cap || cap != ci->i_auth_cap) ||
2111			    (cap->mseq != req->r_sent_on_mseq)) {
2112				dout("but cap changed, so resending");
2113				__do_request(mdsc, req);
2114				mutex_unlock(&mdsc->mutex);
2115				goto out;
2116			}
2117		}
2118		dout("have to return ESTALE on request %llu", req->r_tid);
2119	}
2120
2121
2122	if (head->safe) {
2123		req->r_got_safe = true;
2124		__unregister_request(mdsc, req);
2125		complete_all(&req->r_safe_completion);
2126
2127		if (req->r_got_unsafe) {
2128			/*
2129			 * We already handled the unsafe response, now do the
2130			 * cleanup.  No need to examine the response; the MDS
2131			 * doesn't include any result info in the safe
2132			 * response.  And even if it did, there is nothing
2133			 * useful we could do with a revised return value.
2134			 */
2135			dout("got safe reply %llu, mds%d\n", tid, mds);
2136			list_del_init(&req->r_unsafe_item);
2137
2138			/* last unsafe request during umount? */
2139			if (mdsc->stopping && !__get_oldest_req(mdsc))
2140				complete_all(&mdsc->safe_umount_waiters);
2141			mutex_unlock(&mdsc->mutex);
2142			goto out;
2143		}
2144	} else {
2145		req->r_got_unsafe = true;
2146		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2147	}
2148
2149	dout("handle_reply tid %lld result %d\n", tid, result);
2150	rinfo = &req->r_reply_info;
2151	err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2152	mutex_unlock(&mdsc->mutex);
2153
2154	mutex_lock(&session->s_mutex);
2155	if (err < 0) {
2156		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2157		ceph_msg_dump(msg);
2158		goto out_err;
2159	}
2160
2161	/* snap trace */
2162	if (rinfo->snapblob_len) {
2163		down_write(&mdsc->snap_rwsem);
2164		ceph_update_snap_trace(mdsc, rinfo->snapblob,
2165			       rinfo->snapblob + rinfo->snapblob_len,
2166			       le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2167		downgrade_write(&mdsc->snap_rwsem);
2168	} else {
2169		down_read(&mdsc->snap_rwsem);
2170	}
2171
2172	/* insert trace into our cache */
2173	mutex_lock(&req->r_fill_mutex);
2174	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2175	if (err == 0) {
2176		if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2177		    rinfo->dir_nr)
2178			ceph_readdir_prepopulate(req, req->r_session);
2179		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2180	}
2181	mutex_unlock(&req->r_fill_mutex);
2182
2183	up_read(&mdsc->snap_rwsem);
2184out_err:
2185	mutex_lock(&mdsc->mutex);
2186	if (!req->r_aborted) {
2187		if (err) {
2188			req->r_err = err;
2189		} else {
2190			req->r_reply = msg;
2191			ceph_msg_get(msg);
2192			req->r_got_result = true;
2193		}
2194	} else {
2195		dout("reply arrived after request %lld was aborted\n", tid);
2196	}
2197	mutex_unlock(&mdsc->mutex);
2198
2199	ceph_add_cap_releases(mdsc, req->r_session);
2200	mutex_unlock(&session->s_mutex);
2201
2202	/* kick calling process */
2203	complete_request(mdsc, req);
2204out:
2205	ceph_mdsc_put_request(req);
2206	return;
2207}
2208
2209
2210
2211/*
2212 * handle mds notification that our request has been forwarded.
2213 */
2214static void handle_forward(struct ceph_mds_client *mdsc,
2215			   struct ceph_mds_session *session,
2216			   struct ceph_msg *msg)
2217{
2218	struct ceph_mds_request *req;
2219	u64 tid = le64_to_cpu(msg->hdr.tid);
2220	u32 next_mds;
2221	u32 fwd_seq;
2222	int err = -EINVAL;
2223	void *p = msg->front.iov_base;
2224	void *end = p + msg->front.iov_len;
2225
2226	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2227	next_mds = ceph_decode_32(&p);
2228	fwd_seq = ceph_decode_32(&p);
2229
2230	mutex_lock(&mdsc->mutex);
2231	req = __lookup_request(mdsc, tid);
2232	if (!req) {
2233		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2234		goto out;  /* dup reply? */
2235	}
2236
2237	if (req->r_aborted) {
2238		dout("forward tid %llu aborted, unregistering\n", tid);
2239		__unregister_request(mdsc, req);
2240	} else if (fwd_seq <= req->r_num_fwd) {
2241		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2242		     tid, next_mds, req->r_num_fwd, fwd_seq);
2243	} else {
2244		/* resend. forward race not possible; mds would drop */
2245		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2246		BUG_ON(req->r_err);
2247		BUG_ON(req->r_got_result);
2248		req->r_num_fwd = fwd_seq;
2249		req->r_resend_mds = next_mds;
2250		put_request_session(req);
2251		__do_request(mdsc, req);
2252	}
2253	ceph_mdsc_put_request(req);
2254out:
2255	mutex_unlock(&mdsc->mutex);
2256	return;
2257
2258bad:
2259	pr_err("mdsc_handle_forward decode error err=%d\n", err);
2260}
2261
2262/*
2263 * handle a mds session control message
2264 */
2265static void handle_session(struct ceph_mds_session *session,
2266			   struct ceph_msg *msg)
2267{
2268	struct ceph_mds_client *mdsc = session->s_mdsc;
2269	u32 op;
2270	u64 seq;
2271	int mds = session->s_mds;
2272	struct ceph_mds_session_head *h = msg->front.iov_base;
2273	int wake = 0;
2274
2275	/* decode */
2276	if (msg->front.iov_len != sizeof(*h))
2277		goto bad;
2278	op = le32_to_cpu(h->op);
2279	seq = le64_to_cpu(h->seq);
2280
2281	mutex_lock(&mdsc->mutex);
2282	if (op == CEPH_SESSION_CLOSE)
2283		__unregister_session(mdsc, session);
2284	/* FIXME: this ttl calculation is generous */
2285	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2286	mutex_unlock(&mdsc->mutex);
2287
2288	mutex_lock(&session->s_mutex);
2289
2290	dout("handle_session mds%d %s %p state %s seq %llu\n",
2291	     mds, ceph_session_op_name(op), session,
2292	     session_state_name(session->s_state), seq);
2293
2294	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2295		session->s_state = CEPH_MDS_SESSION_OPEN;
2296		pr_info("mds%d came back\n", session->s_mds);
2297	}
2298
2299	switch (op) {
2300	case CEPH_SESSION_OPEN:
2301		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2302			pr_info("mds%d reconnect success\n", session->s_mds);
2303		session->s_state = CEPH_MDS_SESSION_OPEN;
2304		renewed_caps(mdsc, session, 0);
2305		wake = 1;
2306		if (mdsc->stopping)
2307			__close_session(mdsc, session);
2308		break;
2309
2310	case CEPH_SESSION_RENEWCAPS:
2311		if (session->s_renew_seq == seq)
2312			renewed_caps(mdsc, session, 1);
2313		break;
2314
2315	case CEPH_SESSION_CLOSE:
2316		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2317			pr_info("mds%d reconnect denied\n", session->s_mds);
2318		remove_session_caps(session);
2319		wake = 1; /* for good measure */
2320		wake_up_all(&mdsc->session_close_wq);
2321		kick_requests(mdsc, mds);
2322		break;
2323
2324	case CEPH_SESSION_STALE:
2325		pr_info("mds%d caps went stale, renewing\n",
2326			session->s_mds);
2327		spin_lock(&session->s_cap_lock);
2328		session->s_cap_gen++;
2329		session->s_cap_ttl = 0;
2330		spin_unlock(&session->s_cap_lock);
2331		send_renew_caps(mdsc, session);
2332		break;
2333
2334	case CEPH_SESSION_RECALL_STATE:
2335		trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2336		break;
2337
2338	default:
2339		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2340		WARN_ON(1);
2341	}
2342
2343	mutex_unlock(&session->s_mutex);
2344	if (wake) {
2345		mutex_lock(&mdsc->mutex);
2346		__wake_requests(mdsc, &session->s_waiting);
2347		mutex_unlock(&mdsc->mutex);
2348	}
2349	return;
2350
2351bad:
2352	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2353	       (int)msg->front.iov_len);
2354	ceph_msg_dump(msg);
2355	return;
2356}
2357
2358
2359/*
2360 * called under session->mutex.
2361 */
2362static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2363				   struct ceph_mds_session *session)
2364{
2365	struct ceph_mds_request *req, *nreq;
2366	int err;
2367
2368	dout("replay_unsafe_requests mds%d\n", session->s_mds);
2369
2370	mutex_lock(&mdsc->mutex);
2371	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2372		err = __prepare_send_request(mdsc, req, session->s_mds);
2373		if (!err) {
2374			ceph_msg_get(req->r_request);
2375			ceph_con_send(&session->s_con, req->r_request);
2376		}
2377	}
2378	mutex_unlock(&mdsc->mutex);
2379}
2380
2381/*
2382 * Encode information about a cap for a reconnect with the MDS.
2383 */
2384static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2385			  void *arg)
2386{
2387	union {
2388		struct ceph_mds_cap_reconnect v2;
2389		struct ceph_mds_cap_reconnect_v1 v1;
2390	} rec;
2391	size_t reclen;
2392	struct ceph_inode_info *ci;
2393	struct ceph_reconnect_state *recon_state = arg;
2394	struct ceph_pagelist *pagelist = recon_state->pagelist;
2395	char *path;
2396	int pathlen, err;
2397	u64 pathbase;
2398	struct dentry *dentry;
2399
2400	ci = cap->ci;
2401
2402	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2403	     inode, ceph_vinop(inode), cap, cap->cap_id,
2404	     ceph_cap_string(cap->issued));
2405	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2406	if (err)
2407		return err;
2408
2409	dentry = d_find_alias(inode);
2410	if (dentry) {
2411		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2412		if (IS_ERR(path)) {
2413			err = PTR_ERR(path);
2414			goto out_dput;
2415		}
2416	} else {
2417		path = NULL;
2418		pathlen = 0;
2419	}
2420	err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2421	if (err)
2422		goto out_free;
2423
2424	spin_lock(&inode->i_lock);
2425	cap->seq = 0;        /* reset cap seq */
2426	cap->issue_seq = 0;  /* and issue_seq */
2427
2428	if (recon_state->flock) {
2429		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2430		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2431		rec.v2.issued = cpu_to_le32(cap->issued);
2432		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2433		rec.v2.pathbase = cpu_to_le64(pathbase);
2434		rec.v2.flock_len = 0;
2435		reclen = sizeof(rec.v2);
2436	} else {
2437		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2438		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2439		rec.v1.issued = cpu_to_le32(cap->issued);
2440		rec.v1.size = cpu_to_le64(inode->i_size);
2441		ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2442		ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2443		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2444		rec.v1.pathbase = cpu_to_le64(pathbase);
2445		reclen = sizeof(rec.v1);
2446	}
2447	spin_unlock(&inode->i_lock);
2448
2449	if (recon_state->flock) {
2450		int num_fcntl_locks, num_flock_locks;
2451		struct ceph_pagelist_cursor trunc_point;
2452
2453		ceph_pagelist_set_cursor(pagelist, &trunc_point);
2454		do {
2455			lock_flocks();
2456			ceph_count_locks(inode, &num_fcntl_locks,
2457					 &num_flock_locks);
2458			rec.v2.flock_len = (2*sizeof(u32) +
2459					    (num_fcntl_locks+num_flock_locks) *
2460					    sizeof(struct ceph_filelock));
2461			unlock_flocks();
2462
2463			/* pre-alloc pagelist */
2464			ceph_pagelist_truncate(pagelist, &trunc_point);
2465			err = ceph_pagelist_append(pagelist, &rec, reclen);
2466			if (!err)
2467				err = ceph_pagelist_reserve(pagelist,
2468							    rec.v2.flock_len);
2469
2470			/* encode locks */
2471			if (!err) {
2472				lock_flocks();
2473				err = ceph_encode_locks(inode,
2474							pagelist,
2475							num_fcntl_locks,
2476							num_flock_locks);
2477				unlock_flocks();
2478			}
2479		} while (err == -ENOSPC);
2480	} else {
2481		err = ceph_pagelist_append(pagelist, &rec, reclen);
2482	}
2483
2484out_free:
2485	kfree(path);
2486out_dput:
2487	dput(dentry);
2488	return err;
2489}
2490
2491
2492/*
2493 * If an MDS fails and recovers, clients need to reconnect in order to
2494 * reestablish shared state.  This includes all caps issued through
2495 * this session _and_ the snap_realm hierarchy.  Because it's not
2496 * clear which snap realms the mds cares about, we send everything we
2497 * know about.. that ensures we'll then get any new info the
2498 * recovering MDS might have.
2499 *
2500 * This is a relatively heavyweight operation, but it's rare.
2501 *
2502 * called with mdsc->mutex held.
2503 */
2504static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2505			       struct ceph_mds_session *session)
2506{
2507	struct ceph_msg *reply;
2508	struct rb_node *p;
2509	int mds = session->s_mds;
2510	int err = -ENOMEM;
2511	struct ceph_pagelist *pagelist;
2512	struct ceph_reconnect_state recon_state;
2513
2514	pr_info("mds%d reconnect start\n", mds);
2515
2516	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2517	if (!pagelist)
2518		goto fail_nopagelist;
2519	ceph_pagelist_init(pagelist);
2520
2521	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
2522	if (!reply)
2523		goto fail_nomsg;
2524
2525	mutex_lock(&session->s_mutex);
2526	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2527	session->s_seq = 0;
2528
2529	ceph_con_open(&session->s_con,
2530		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2531
2532	/* replay unsafe requests */
2533	replay_unsafe_requests(mdsc, session);
2534
2535	down_read(&mdsc->snap_rwsem);
2536
2537	dout("session %p state %s\n", session,
2538	     session_state_name(session->s_state));
2539
2540	/* drop old cap expires; we're about to reestablish that state */
2541	discard_cap_releases(mdsc, session);
2542
2543	/* traverse this session's caps */
2544	err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2545	if (err)
2546		goto fail;
2547
2548	recon_state.pagelist = pagelist;
2549	recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2550	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2551	if (err < 0)
2552		goto fail;
2553
2554	/*
2555	 * snaprealms.  we provide mds with the ino, seq (version), and
2556	 * parent for all of our realms.  If the mds has any newer info,
2557	 * it will tell us.
2558	 */
2559	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2560		struct ceph_snap_realm *realm =
2561			rb_entry(p, struct ceph_snap_realm, node);
2562		struct ceph_mds_snaprealm_reconnect sr_rec;
2563
2564		dout(" adding snap realm %llx seq %lld parent %llx\n",
2565		     realm->ino, realm->seq, realm->parent_ino);
2566		sr_rec.ino = cpu_to_le64(realm->ino);
2567		sr_rec.seq = cpu_to_le64(realm->seq);
2568		sr_rec.parent = cpu_to_le64(realm->parent_ino);
2569		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2570		if (err)
2571			goto fail;
2572	}
2573
2574	reply->pagelist = pagelist;
2575	if (recon_state.flock)
2576		reply->hdr.version = cpu_to_le16(2);
2577	reply->hdr.data_len = cpu_to_le32(pagelist->length);
2578	reply->nr_pages = calc_pages_for(0, pagelist->length);
2579	ceph_con_send(&session->s_con, reply);
2580
2581	mutex_unlock(&session->s_mutex);
2582
2583	mutex_lock(&mdsc->mutex);
2584	__wake_requests(mdsc, &session->s_waiting);
2585	mutex_unlock(&mdsc->mutex);
2586
2587	up_read(&mdsc->snap_rwsem);
2588	return;
2589
2590fail:
2591	ceph_msg_put(reply);
2592	up_read(&mdsc->snap_rwsem);
2593	mutex_unlock(&session->s_mutex);
2594fail_nomsg:
2595	ceph_pagelist_release(pagelist);
2596	kfree(pagelist);
2597fail_nopagelist:
2598	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2599	return;
2600}
2601
2602
2603/*
2604 * compare old and new mdsmaps, kicking requests
2605 * and closing out old connections as necessary
2606 *
2607 * called under mdsc->mutex.
2608 */
2609static void check_new_map(struct ceph_mds_client *mdsc,
2610			  struct ceph_mdsmap *newmap,
2611			  struct ceph_mdsmap *oldmap)
2612{
2613	int i;
2614	int oldstate, newstate;
2615	struct ceph_mds_session *s;
2616
2617	dout("check_new_map new %u old %u\n",
2618	     newmap->m_epoch, oldmap->m_epoch);
2619
2620	for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2621		if (mdsc->sessions[i] == NULL)
2622			continue;
2623		s = mdsc->sessions[i];
2624		oldstate = ceph_mdsmap_get_state(oldmap, i);
2625		newstate = ceph_mdsmap_get_state(newmap, i);
2626
2627		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2628		     i, ceph_mds_state_name(oldstate),
2629		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2630		     ceph_mds_state_name(newstate),
2631		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2632		     session_state_name(s->s_state));
2633
2634		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2635			   ceph_mdsmap_get_addr(newmap, i),
2636			   sizeof(struct ceph_entity_addr))) {
2637			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2638				/* the session never opened, just close it
2639				 * out now */
2640				__wake_requests(mdsc, &s->s_waiting);
2641				__unregister_session(mdsc, s);
2642			} else {
2643				/* just close it */
2644				mutex_unlock(&mdsc->mutex);
2645				mutex_lock(&s->s_mutex);
2646				mutex_lock(&mdsc->mutex);
2647				ceph_con_close(&s->s_con);
2648				mutex_unlock(&s->s_mutex);
2649				s->s_state = CEPH_MDS_SESSION_RESTARTING;
2650			}
2651
2652			/* kick any requests waiting on the recovering mds */
2653			kick_requests(mdsc, i);
2654		} else if (oldstate == newstate) {
2655			continue;  /* nothing new with this mds */
2656		}
2657
2658		/*
2659		 * send reconnect?
2660		 */
2661		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2662		    newstate >= CEPH_MDS_STATE_RECONNECT) {
2663			mutex_unlock(&mdsc->mutex);
2664			send_mds_reconnect(mdsc, s);
2665			mutex_lock(&mdsc->mutex);
2666		}
2667
2668		/*
2669		 * kick request on any mds that has gone active.
2670		 */
2671		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2672		    newstate >= CEPH_MDS_STATE_ACTIVE) {
2673			if (oldstate != CEPH_MDS_STATE_CREATING &&
2674			    oldstate != CEPH_MDS_STATE_STARTING)
2675				pr_info("mds%d recovery completed\n", s->s_mds);
2676			kick_requests(mdsc, i);
2677			ceph_kick_flushing_caps(mdsc, s);
2678			wake_up_session_caps(s, 1);
2679		}
2680	}
2681
2682	for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2683		s = mdsc->sessions[i];
2684		if (!s)
2685			continue;
2686		if (!ceph_mdsmap_is_laggy(newmap, i))
2687			continue;
2688		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2689		    s->s_state == CEPH_MDS_SESSION_HUNG ||
2690		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
2691			dout(" connecting to export targets of laggy mds%d\n",
2692			     i);
2693			__open_export_target_sessions(mdsc, s);
2694		}
2695	}
2696}
2697
2698
2699
2700/*
2701 * leases
2702 */
2703
2704/*
2705 * caller must hold session s_mutex, dentry->d_lock
2706 */
2707void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2708{
2709	struct ceph_dentry_info *di = ceph_dentry(dentry);
2710
2711	ceph_put_mds_session(di->lease_session);
2712	di->lease_session = NULL;
2713}
2714
2715static void handle_lease(struct ceph_mds_client *mdsc,
2716			 struct ceph_mds_session *session,
2717			 struct ceph_msg *msg)
2718{
2719	struct super_block *sb = mdsc->fsc->sb;
2720	struct inode *inode;
2721	struct dentry *parent, *dentry;
2722	struct ceph_dentry_info *di;
2723	int mds = session->s_mds;
2724	struct ceph_mds_lease *h = msg->front.iov_base;
2725	u32 seq;
2726	struct ceph_vino vino;
2727	struct qstr dname;
2728	int release = 0;
2729
2730	dout("handle_lease from mds%d\n", mds);
2731
2732	/* decode */
2733	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2734		goto bad;
2735	vino.ino = le64_to_cpu(h->ino);
2736	vino.snap = CEPH_NOSNAP;
2737	seq = le32_to_cpu(h->seq);
2738	dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2739	dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2740	if (dname.len != get_unaligned_le32(h+1))
2741		goto bad;
2742
2743	mutex_lock(&session->s_mutex);
2744	session->s_seq++;
2745
2746	/* lookup inode */
2747	inode = ceph_find_inode(sb, vino);
2748	dout("handle_lease %s, ino %llx %p %.*s\n",
2749	     ceph_lease_op_name(h->action), vino.ino, inode,
2750	     dname.len, dname.name);
2751	if (inode == NULL) {
2752		dout("handle_lease no inode %llx\n", vino.ino);
2753		goto release;
2754	}
2755
2756	/* dentry */
2757	parent = d_find_alias(inode);
2758	if (!parent) {
2759		dout("no parent dentry on inode %p\n", inode);
2760		WARN_ON(1);
2761		goto release;  /* hrm... */
2762	}
2763	dname.hash = full_name_hash(dname.name, dname.len);
2764	dentry = d_lookup(parent, &dname);
2765	dput(parent);
2766	if (!dentry)
2767		goto release;
2768
2769	spin_lock(&dentry->d_lock);
2770	di = ceph_dentry(dentry);
2771	switch (h->action) {
2772	case CEPH_MDS_LEASE_REVOKE:
2773		if (di && di->lease_session == session) {
2774			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2775				h->seq = cpu_to_le32(di->lease_seq);
2776			__ceph_mdsc_drop_dentry_lease(dentry);
2777		}
2778		release = 1;
2779		break;
2780
2781	case CEPH_MDS_LEASE_RENEW:
2782		if (di && di->lease_session == session &&
2783		    di->lease_gen == session->s_cap_gen &&
2784		    di->lease_renew_from &&
2785		    di->lease_renew_after == 0) {
2786			unsigned long duration =
2787				le32_to_cpu(h->duration_ms) * HZ / 1000;
2788
2789			di->lease_seq = seq;
2790			dentry->d_time = di->lease_renew_from + duration;
2791			di->lease_renew_after = di->lease_renew_from +
2792				(duration >> 1);
2793			di->lease_renew_from = 0;
2794		}
2795		break;
2796	}
2797	spin_unlock(&dentry->d_lock);
2798	dput(dentry);
2799
2800	if (!release)
2801		goto out;
2802
2803release:
2804	/* let's just reuse the same message */
2805	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2806	ceph_msg_get(msg);
2807	ceph_con_send(&session->s_con, msg);
2808
2809out:
2810	iput(inode);
2811	mutex_unlock(&session->s_mutex);
2812	return;
2813
2814bad:
2815	pr_err("corrupt lease message\n");
2816	ceph_msg_dump(msg);
2817}
2818
2819void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2820			      struct inode *inode,
2821			      struct dentry *dentry, char action,
2822			      u32 seq)
2823{
2824	struct ceph_msg *msg;
2825	struct ceph_mds_lease *lease;
2826	int len = sizeof(*lease) + sizeof(u32);
2827	int dnamelen = 0;
2828
2829	dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2830	     inode, dentry, ceph_lease_op_name(action), session->s_mds);
2831	dnamelen = dentry->d_name.len;
2832	len += dnamelen;
2833
2834	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
2835	if (!msg)
2836		return;
2837	lease = msg->front.iov_base;
2838	lease->action = action;
2839	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2840	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2841	lease->seq = cpu_to_le32(seq);
2842	put_unaligned_le32(dnamelen, lease + 1);
2843	memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2844
2845	/*
2846	 * if this is a preemptive lease RELEASE, no need to
2847	 * flush request stream, since the actual request will
2848	 * soon follow.
2849	 */
2850	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2851
2852	ceph_con_send(&session->s_con, msg);
2853}
2854
2855/*
2856 * Preemptively release a lease we expect to invalidate anyway.
2857 * Pass @inode always, @dentry is optional.
2858 */
2859void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2860			     struct dentry *dentry)
2861{
2862	struct ceph_dentry_info *di;
2863	struct ceph_mds_session *session;
2864	u32 seq;
2865
2866	BUG_ON(inode == NULL);
2867	BUG_ON(dentry == NULL);
2868
2869	/* is dentry lease valid? */
2870	spin_lock(&dentry->d_lock);
2871	di = ceph_dentry(dentry);
2872	if (!di || !di->lease_session ||
2873	    di->lease_session->s_mds < 0 ||
2874	    di->lease_gen != di->lease_session->s_cap_gen ||
2875	    !time_before(jiffies, dentry->d_time)) {
2876		dout("lease_release inode %p dentry %p -- "
2877		     "no lease\n",
2878		     inode, dentry);
2879		spin_unlock(&dentry->d_lock);
2880		return;
2881	}
2882
2883	/* we do have a lease on this dentry; note mds and seq */
2884	session = ceph_get_mds_session(di->lease_session);
2885	seq = di->lease_seq;
2886	__ceph_mdsc_drop_dentry_lease(dentry);
2887	spin_unlock(&dentry->d_lock);
2888
2889	dout("lease_release inode %p dentry %p to mds%d\n",
2890	     inode, dentry, session->s_mds);
2891	ceph_mdsc_lease_send_msg(session, inode, dentry,
2892				 CEPH_MDS_LEASE_RELEASE, seq);
2893	ceph_put_mds_session(session);
2894}
2895
2896/*
2897 * drop all leases (and dentry refs) in preparation for umount
2898 */
2899static void drop_leases(struct ceph_mds_client *mdsc)
2900{
2901	int i;
2902
2903	dout("drop_leases\n");
2904	mutex_lock(&mdsc->mutex);
2905	for (i = 0; i < mdsc->max_sessions; i++) {
2906		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2907		if (!s)
2908			continue;
2909		mutex_unlock(&mdsc->mutex);
2910		mutex_lock(&s->s_mutex);
2911		mutex_unlock(&s->s_mutex);
2912		ceph_put_mds_session(s);
2913		mutex_lock(&mdsc->mutex);
2914	}
2915	mutex_unlock(&mdsc->mutex);
2916}
2917
2918
2919
2920/*
2921 * delayed work -- periodically trim expired leases, renew caps with mds
2922 */
2923static void schedule_delayed(struct ceph_mds_client *mdsc)
2924{
2925	int delay = 5;
2926	unsigned hz = round_jiffies_relative(HZ * delay);
2927	schedule_delayed_work(&mdsc->delayed_work, hz);
2928}
2929
2930static void delayed_work(struct work_struct *work)
2931{
2932	int i;
2933	struct ceph_mds_client *mdsc =
2934		container_of(work, struct ceph_mds_client, delayed_work.work);
2935	int renew_interval;
2936	int renew_caps;
2937
2938	dout("mdsc delayed_work\n");
2939	ceph_check_delayed_caps(mdsc);
2940
2941	mutex_lock(&mdsc->mutex);
2942	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2943	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2944				   mdsc->last_renew_caps);
2945	if (renew_caps)
2946		mdsc->last_renew_caps = jiffies;
2947
2948	for (i = 0; i < mdsc->max_sessions; i++) {
2949		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2950		if (s == NULL)
2951			continue;
2952		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2953			dout("resending session close request for mds%d\n",
2954			     s->s_mds);
2955			request_close_session(mdsc, s);
2956			ceph_put_mds_session(s);
2957			continue;
2958		}
2959		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2960			if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2961				s->s_state = CEPH_MDS_SESSION_HUNG;
2962				pr_info("mds%d hung\n", s->s_mds);
2963			}
2964		}
2965		if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2966			/* this mds is failed or recovering, just wait */
2967			ceph_put_mds_session(s);
2968			continue;
2969		}
2970		mutex_unlock(&mdsc->mutex);
2971
2972		mutex_lock(&s->s_mutex);
2973		if (renew_caps)
2974			send_renew_caps(mdsc, s);
2975		else
2976			ceph_con_keepalive(&s->s_con);
2977		ceph_add_cap_releases(mdsc, s);
2978		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2979		    s->s_state == CEPH_MDS_SESSION_HUNG)
2980			ceph_send_cap_releases(mdsc, s);
2981		mutex_unlock(&s->s_mutex);
2982		ceph_put_mds_session(s);
2983
2984		mutex_lock(&mdsc->mutex);
2985	}
2986	mutex_unlock(&mdsc->mutex);
2987
2988	schedule_delayed(mdsc);
2989}
2990
2991int ceph_mdsc_init(struct ceph_fs_client *fsc)
2992
2993{
2994	struct ceph_mds_client *mdsc;
2995
2996	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2997	if (!mdsc)
2998		return -ENOMEM;
2999	mdsc->fsc = fsc;
3000	fsc->mdsc = mdsc;
3001	mutex_init(&mdsc->mutex);
3002	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3003	if (mdsc->mdsmap == NULL)
3004		return -ENOMEM;
3005
3006	init_completion(&mdsc->safe_umount_waiters);
3007	init_waitqueue_head(&mdsc->session_close_wq);
3008	INIT_LIST_HEAD(&mdsc->waiting_for_map);
3009	mdsc->sessions = NULL;
3010	mdsc->max_sessions = 0;
3011	mdsc->stopping = 0;
3012	init_rwsem(&mdsc->snap_rwsem);
3013	mdsc->snap_realms = RB_ROOT;
3014	INIT_LIST_HEAD(&mdsc->snap_empty);
3015	spin_lock_init(&mdsc->snap_empty_lock);
3016	mdsc->last_tid = 0;
3017	mdsc->request_tree = RB_ROOT;
3018	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3019	mdsc->last_renew_caps = jiffies;
3020	INIT_LIST_HEAD(&mdsc->cap_delay_list);
3021	spin_lock_init(&mdsc->cap_delay_lock);
3022	INIT_LIST_HEAD(&mdsc->snap_flush_list);
3023	spin_lock_init(&mdsc->snap_flush_lock);
3024	mdsc->cap_flush_seq = 0;
3025	INIT_LIST_HEAD(&mdsc->cap_dirty);
3026	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3027	mdsc->num_cap_flushing = 0;
3028	spin_lock_init(&mdsc->cap_dirty_lock);
3029	init_waitqueue_head(&mdsc->cap_flushing_wq);
3030	spin_lock_init(&mdsc->dentry_lru_lock);
3031	INIT_LIST_HEAD(&mdsc->dentry_lru);
3032
3033	ceph_caps_init(mdsc);
3034	ceph_adjust_min_caps(mdsc, fsc->min_caps);
3035
3036	return 0;
3037}
3038
3039/*
3040 * Wait for safe replies on open mds requests.  If we time out, drop
3041 * all requests from the tree to avoid dangling dentry refs.
3042 */
3043static void wait_requests(struct ceph_mds_client *mdsc)
3044{
3045	struct ceph_mds_request *req;
3046	struct ceph_fs_client *fsc = mdsc->fsc;
3047
3048	mutex_lock(&mdsc->mutex);
3049	if (__get_oldest_req(mdsc)) {
3050		mutex_unlock(&mdsc->mutex);
3051
3052		dout("wait_requests waiting for requests\n");
3053		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3054				    fsc->client->options->mount_timeout * HZ);
3055
3056		/* tear down remaining requests */
3057		mutex_lock(&mdsc->mutex);
3058		while ((req = __get_oldest_req(mdsc))) {
3059			dout("wait_requests timed out on tid %llu\n",
3060			     req->r_tid);
3061			__unregister_request(mdsc, req);
3062		}
3063	}
3064	mutex_unlock(&mdsc->mutex);
3065	dout("wait_requests done\n");
3066}
3067
3068/*
3069 * called before mount is ro, and before dentries are torn down.
3070 * (hmm, does this still race with new lookups?)
3071 */
3072void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3073{
3074	dout("pre_umount\n");
3075	mdsc->stopping = 1;
3076
3077	drop_leases(mdsc);
3078	ceph_flush_dirty_caps(mdsc);
3079	wait_requests(mdsc);
3080
3081	/*
3082	 * wait for reply handlers to drop their request refs and
3083	 * their inode/dcache refs
3084	 */
3085	ceph_msgr_flush();
3086}
3087
3088/*
3089 * wait for all write mds requests to flush.
3090 */
3091static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3092{
3093	struct ceph_mds_request *req = NULL, *nextreq;
3094	struct rb_node *n;
3095
3096	mutex_lock(&mdsc->mutex);
3097	dout("wait_unsafe_requests want %lld\n", want_tid);
3098restart:
3099	req = __get_oldest_req(mdsc);
3100	while (req && req->r_tid <= want_tid) {
3101		/* find next request */
3102		n = rb_next(&req->r_node);
3103		if (n)
3104			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3105		else
3106			nextreq = NULL;
3107		if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3108			/* write op */
3109			ceph_mdsc_get_request(req);
3110			if (nextreq)
3111				ceph_mdsc_get_request(nextreq);
3112			mutex_unlock(&mdsc->mutex);
3113			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3114			     req->r_tid, want_tid);
3115			wait_for_completion(&req->r_safe_completion);
3116			mutex_lock(&mdsc->mutex);
3117			ceph_mdsc_put_request(req);
3118			if (!nextreq)
3119				break;  /* next dne before, so we're done! */
3120			if (RB_EMPTY_NODE(&nextreq->r_node)) {
3121				/* next request was removed from tree */
3122				ceph_mdsc_put_request(nextreq);
3123				goto restart;
3124			}
3125			ceph_mdsc_put_request(nextreq);  /* won't go away */
3126		}
3127		req = nextreq;
3128	}
3129	mutex_unlock(&mdsc->mutex);
3130	dout("wait_unsafe_requests done\n");
3131}
3132
3133void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3134{
3135	u64 want_tid, want_flush;
3136
3137	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3138		return;
3139
3140	dout("sync\n");
3141	mutex_lock(&mdsc->mutex);
3142	want_tid = mdsc->last_tid;
3143	want_flush = mdsc->cap_flush_seq;
3144	mutex_unlock(&mdsc->mutex);
3145	dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3146
3147	ceph_flush_dirty_caps(mdsc);
3148
3149	wait_unsafe_requests(mdsc, want_tid);
3150	wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3151}
3152
3153/*
3154 * true if all sessions are closed, or we force unmount
3155 */
3156bool done_closing_sessions(struct ceph_mds_client *mdsc)
3157{
3158	int i, n = 0;
3159
3160	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3161		return true;
3162
3163	mutex_lock(&mdsc->mutex);
3164	for (i = 0; i < mdsc->max_sessions; i++)
3165		if (mdsc->sessions[i])
3166			n++;
3167	mutex_unlock(&mdsc->mutex);
3168	return n == 0;
3169}
3170
3171/*
3172 * called after sb is ro.
3173 */
3174void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3175{
3176	struct ceph_mds_session *session;
3177	int i;
3178	struct ceph_fs_client *fsc = mdsc->fsc;
3179	unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3180
3181	dout("close_sessions\n");
3182
3183	/* close sessions */
3184	mutex_lock(&mdsc->mutex);
3185	for (i = 0; i < mdsc->max_sessions; i++) {
3186		session = __ceph_lookup_mds_session(mdsc, i);
3187		if (!session)
3188			continue;
3189		mutex_unlock(&mdsc->mutex);
3190		mutex_lock(&session->s_mutex);
3191		__close_session(mdsc, session);
3192		mutex_unlock(&session->s_mutex);
3193		ceph_put_mds_session(session);
3194		mutex_lock(&mdsc->mutex);
3195	}
3196	mutex_unlock(&mdsc->mutex);
3197
3198	dout("waiting for sessions to close\n");
3199	wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3200			   timeout);
3201
3202	/* tear down remaining sessions */
3203	mutex_lock(&mdsc->mutex);
3204	for (i = 0; i < mdsc->max_sessions; i++) {
3205		if (mdsc->sessions[i]) {
3206			session = get_session(mdsc->sessions[i]);
3207			__unregister_session(mdsc, session);
3208			mutex_unlock(&mdsc->mutex);
3209			mutex_lock(&session->s_mutex);
3210			remove_session_caps(session);
3211			mutex_unlock(&session->s_mutex);
3212			ceph_put_mds_session(session);
3213			mutex_lock(&mdsc->mutex);
3214		}
3215	}
3216	WARN_ON(!list_empty(&mdsc->cap_delay_list));
3217	mutex_unlock(&mdsc->mutex);
3218
3219	ceph_cleanup_empty_realms(mdsc);
3220
3221	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3222
3223	dout("stopped\n");
3224}
3225
3226static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3227{
3228	dout("stop\n");
3229	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3230	if (mdsc->mdsmap)
3231		ceph_mdsmap_destroy(mdsc->mdsmap);
3232	kfree(mdsc->sessions);
3233	ceph_caps_finalize(mdsc);
3234}
3235
3236void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3237{
3238	struct ceph_mds_client *mdsc = fsc->mdsc;
3239
3240	dout("mdsc_destroy %p\n", mdsc);
3241	ceph_mdsc_stop(mdsc);
3242
3243	/* flush out any connection work with references to us */
3244	ceph_msgr_flush();
3245
3246	fsc->mdsc = NULL;
3247	kfree(mdsc);
3248	dout("mdsc_destroy %p done\n", mdsc);
3249}
3250
3251
3252/*
3253 * handle mds map update.
3254 */
3255void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3256{
3257	u32 epoch;
3258	u32 maplen;
3259	void *p = msg->front.iov_base;
3260	void *end = p + msg->front.iov_len;
3261	struct ceph_mdsmap *newmap, *oldmap;
3262	struct ceph_fsid fsid;
3263	int err = -EINVAL;
3264
3265	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3266	ceph_decode_copy(&p, &fsid, sizeof(fsid));
3267	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3268		return;
3269	epoch = ceph_decode_32(&p);
3270	maplen = ceph_decode_32(&p);
3271	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3272
3273	/* do we need it? */
3274	ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3275	mutex_lock(&mdsc->mutex);
3276	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3277		dout("handle_map epoch %u <= our %u\n",
3278		     epoch, mdsc->mdsmap->m_epoch);
3279		mutex_unlock(&mdsc->mutex);
3280		return;
3281	}
3282
3283	newmap = ceph_mdsmap_decode(&p, end);
3284	if (IS_ERR(newmap)) {
3285		err = PTR_ERR(newmap);
3286		goto bad_unlock;
3287	}
3288
3289	/* swap into place */
3290	if (mdsc->mdsmap) {
3291		oldmap = mdsc->mdsmap;
3292		mdsc->mdsmap = newmap;
3293		check_new_map(mdsc, newmap, oldmap);
3294		ceph_mdsmap_destroy(oldmap);
3295	} else {
3296		mdsc->mdsmap = newmap;  /* first mds map */
3297	}
3298	mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3299
3300	__wake_requests(mdsc, &mdsc->waiting_for_map);
3301
3302	mutex_unlock(&mdsc->mutex);
3303	schedule_delayed(mdsc);
3304	return;
3305
3306bad_unlock:
3307	mutex_unlock(&mdsc->mutex);
3308bad:
3309	pr_err("error decoding mdsmap %d\n", err);
3310	return;
3311}
3312
3313static struct ceph_connection *con_get(struct ceph_connection *con)
3314{
3315	struct ceph_mds_session *s = con->private;
3316
3317	if (get_session(s)) {
3318		dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3319		return con;
3320	}
3321	dout("mdsc con_get %p FAIL\n", s);
3322	return NULL;
3323}
3324
3325static void con_put(struct ceph_connection *con)
3326{
3327	struct ceph_mds_session *s = con->private;
3328
3329	dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3330	ceph_put_mds_session(s);
3331}
3332
3333/*
3334 * if the client is unresponsive for long enough, the mds will kill
3335 * the session entirely.
3336 */
3337static void peer_reset(struct ceph_connection *con)
3338{
3339	struct ceph_mds_session *s = con->private;
3340	struct ceph_mds_client *mdsc = s->s_mdsc;
3341
3342	pr_warning("mds%d closed our session\n", s->s_mds);
3343	send_mds_reconnect(mdsc, s);
3344}
3345
3346static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3347{
3348	struct ceph_mds_session *s = con->private;
3349	struct ceph_mds_client *mdsc = s->s_mdsc;
3350	int type = le16_to_cpu(msg->hdr.type);
3351
3352	mutex_lock(&mdsc->mutex);
3353	if (__verify_registered_session(mdsc, s) < 0) {
3354		mutex_unlock(&mdsc->mutex);
3355		goto out;
3356	}
3357	mutex_unlock(&mdsc->mutex);
3358
3359	switch (type) {
3360	case CEPH_MSG_MDS_MAP:
3361		ceph_mdsc_handle_map(mdsc, msg);
3362		break;
3363	case CEPH_MSG_CLIENT_SESSION:
3364		handle_session(s, msg);
3365		break;
3366	case CEPH_MSG_CLIENT_REPLY:
3367		handle_reply(s, msg);
3368		break;
3369	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3370		handle_forward(mdsc, s, msg);
3371		break;
3372	case CEPH_MSG_CLIENT_CAPS:
3373		ceph_handle_caps(s, msg);
3374		break;
3375	case CEPH_MSG_CLIENT_SNAP:
3376		ceph_handle_snap(mdsc, s, msg);
3377		break;
3378	case CEPH_MSG_CLIENT_LEASE:
3379		handle_lease(mdsc, s, msg);
3380		break;
3381
3382	default:
3383		pr_err("received unknown message type %d %s\n", type,
3384		       ceph_msg_type_name(type));
3385	}
3386out:
3387	ceph_msg_put(msg);
3388}
3389
3390/*
3391 * authentication
3392 */
3393static int get_authorizer(struct ceph_connection *con,
3394			  void **buf, int *len, int *proto,
3395			  void **reply_buf, int *reply_len, int force_new)
 
 
 
 
3396{
3397	struct ceph_mds_session *s = con->private;
3398	struct ceph_mds_client *mdsc = s->s_mdsc;
3399	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3400	int ret = 0;
3401
3402	if (force_new && s->s_authorizer) {
3403		ac->ops->destroy_authorizer(ac, s->s_authorizer);
3404		s->s_authorizer = NULL;
3405	}
3406	if (s->s_authorizer == NULL) {
3407		if (ac->ops->create_authorizer) {
3408			ret = ac->ops->create_authorizer(
3409				ac, CEPH_ENTITY_TYPE_MDS,
3410				&s->s_authorizer,
3411				&s->s_authorizer_buf,
3412				&s->s_authorizer_buf_len,
3413				&s->s_authorizer_reply_buf,
3414				&s->s_authorizer_reply_buf_len);
3415			if (ret)
3416				return ret;
3417		}
3418	}
 
3419
3420	*proto = ac->protocol;
3421	*buf = s->s_authorizer_buf;
3422	*len = s->s_authorizer_buf_len;
3423	*reply_buf = s->s_authorizer_reply_buf;
3424	*reply_len = s->s_authorizer_reply_buf_len;
3425	return 0;
3426}
3427
3428
3429static int verify_authorizer_reply(struct ceph_connection *con, int len)
3430{
3431	struct ceph_mds_session *s = con->private;
3432	struct ceph_mds_client *mdsc = s->s_mdsc;
3433	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3434
3435	return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3436}
3437
3438static int invalidate_authorizer(struct ceph_connection *con)
3439{
3440	struct ceph_mds_session *s = con->private;
3441	struct ceph_mds_client *mdsc = s->s_mdsc;
3442	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3443
3444	if (ac->ops->invalidate_authorizer)
3445		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3446
3447	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3448}
3449
3450static const struct ceph_connection_operations mds_con_ops = {
3451	.get = con_get,
3452	.put = con_put,
3453	.dispatch = dispatch,
3454	.get_authorizer = get_authorizer,
3455	.verify_authorizer_reply = verify_authorizer_reply,
3456	.invalidate_authorizer = invalidate_authorizer,
3457	.peer_reset = peer_reset,
3458};
3459
3460/* eof */