Linux Audio

Check our new training course

Loading...
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/err.h>
   5#include <linux/highmem.h>
   6#include <linux/mm.h>
   7#include <linux/pagemap.h>
   8#include <linux/slab.h>
   9#include <linux/uaccess.h>
  10#ifdef CONFIG_BLOCK
  11#include <linux/bio.h>
  12#endif
  13
  14#include <linux/ceph/libceph.h>
  15#include <linux/ceph/osd_client.h>
  16#include <linux/ceph/messenger.h>
  17#include <linux/ceph/decode.h>
  18#include <linux/ceph/auth.h>
  19#include <linux/ceph/pagelist.h>
  20
  21#define OSD_OP_FRONT_LEN	4096
  22#define OSD_OPREPLY_FRONT_LEN	512
  23
  24static const struct ceph_connection_operations osd_con_ops;
  25
  26static void send_queued(struct ceph_osd_client *osdc);
  27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  28static void __register_request(struct ceph_osd_client *osdc,
  29			       struct ceph_osd_request *req);
  30static void __unregister_linger_request(struct ceph_osd_client *osdc,
  31					struct ceph_osd_request *req);
  32static void __send_request(struct ceph_osd_client *osdc,
  33			   struct ceph_osd_request *req);
  34
  35static int op_needs_trail(int op)
  36{
  37	switch (op) {
  38	case CEPH_OSD_OP_GETXATTR:
  39	case CEPH_OSD_OP_SETXATTR:
  40	case CEPH_OSD_OP_CMPXATTR:
  41	case CEPH_OSD_OP_CALL:
  42	case CEPH_OSD_OP_NOTIFY:
  43		return 1;
  44	default:
  45		return 0;
  46	}
  47}
  48
  49static int op_has_extent(int op)
  50{
  51	return (op == CEPH_OSD_OP_READ ||
  52		op == CEPH_OSD_OP_WRITE);
  53}
  54
  55void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
  56			struct ceph_file_layout *layout,
  57			u64 snapid,
  58			u64 off, u64 *plen, u64 *bno,
  59			struct ceph_osd_request *req,
  60			struct ceph_osd_req_op *op)
  61{
  62	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
  63	u64 orig_len = *plen;
  64	u64 objoff, objlen;    /* extent in object */
  65
  66	reqhead->snapid = cpu_to_le64(snapid);
  67
  68	/* object extent? */
  69	ceph_calc_file_object_mapping(layout, off, plen, bno,
  70				      &objoff, &objlen);
  71	if (*plen < orig_len)
  72		dout(" skipping last %llu, final file extent %llu~%llu\n",
  73		     orig_len - *plen, off, *plen);
  74
  75	if (op_has_extent(op->op)) {
  76		op->extent.offset = objoff;
  77		op->extent.length = objlen;
  78	}
  79	req->r_num_pages = calc_pages_for(off, *plen);
  80	req->r_page_alignment = off & ~PAGE_MASK;
  81	if (op->op == CEPH_OSD_OP_WRITE)
  82		op->payload_len = *plen;
  83
  84	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
  85	     *bno, objoff, objlen, req->r_num_pages);
  86
  87}
  88EXPORT_SYMBOL(ceph_calc_raw_layout);
  89
  90/*
  91 * Implement client access to distributed object storage cluster.
  92 *
  93 * All data objects are stored within a cluster/cloud of OSDs, or
  94 * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
  95 * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
  96 * remote daemons serving up and coordinating consistent and safe
  97 * access to storage.
  98 *
  99 * Cluster membership and the mapping of data objects onto storage devices
 100 * are described by the osd map.
 101 *
 102 * We keep track of pending OSD requests (read, write), resubmit
 103 * requests to different OSDs when the cluster topology/data layout
 104 * change, or retry the affected requests when the communications
 105 * channel with an OSD is reset.
 106 */
 107
 108/*
 109 * calculate the mapping of a file extent onto an object, and fill out the
 110 * request accordingly.  shorten extent as necessary if it crosses an
 111 * object boundary.
 112 *
 113 * fill osd op in request message.
 114 */
 115static void calc_layout(struct ceph_osd_client *osdc,
 116			struct ceph_vino vino,
 117			struct ceph_file_layout *layout,
 118			u64 off, u64 *plen,
 119			struct ceph_osd_request *req,
 120			struct ceph_osd_req_op *op)
 121{
 122	u64 bno;
 123
 124	ceph_calc_raw_layout(osdc, layout, vino.snap, off,
 125			     plen, &bno, req, op);
 126
 127	snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
 128	req->r_oid_len = strlen(req->r_oid);
 129}
 130
 131/*
 132 * requests
 133 */
 134void ceph_osdc_release_request(struct kref *kref)
 135{
 136	struct ceph_osd_request *req = container_of(kref,
 137						    struct ceph_osd_request,
 138						    r_kref);
 139
 140	if (req->r_request)
 141		ceph_msg_put(req->r_request);
 142	if (req->r_con_filling_msg) {
 143		dout("release_request revoking pages %p from con %p\n",
 144		     req->r_pages, req->r_con_filling_msg);
 145		ceph_con_revoke_message(req->r_con_filling_msg,
 146				      req->r_reply);
 147		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 148	}
 149	if (req->r_reply)
 150		ceph_msg_put(req->r_reply);
 151	if (req->r_own_pages)
 152		ceph_release_page_vector(req->r_pages,
 153					 req->r_num_pages);
 154#ifdef CONFIG_BLOCK
 155	if (req->r_bio)
 156		bio_put(req->r_bio);
 157#endif
 158	ceph_put_snap_context(req->r_snapc);
 159	if (req->r_trail) {
 160		ceph_pagelist_release(req->r_trail);
 161		kfree(req->r_trail);
 162	}
 163	if (req->r_mempool)
 164		mempool_free(req, req->r_osdc->req_mempool);
 165	else
 166		kfree(req);
 167}
 168EXPORT_SYMBOL(ceph_osdc_release_request);
 169
 170static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
 171{
 172	int i = 0;
 173
 174	if (needs_trail)
 175		*needs_trail = 0;
 176	while (ops[i].op) {
 177		if (needs_trail && op_needs_trail(ops[i].op))
 178			*needs_trail = 1;
 179		i++;
 180	}
 181
 182	return i;
 183}
 184
 185struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 186					       int flags,
 187					       struct ceph_snap_context *snapc,
 188					       struct ceph_osd_req_op *ops,
 189					       bool use_mempool,
 190					       gfp_t gfp_flags,
 191					       struct page **pages,
 192					       struct bio *bio)
 193{
 194	struct ceph_osd_request *req;
 195	struct ceph_msg *msg;
 196	int needs_trail;
 197	int num_op = get_num_ops(ops, &needs_trail);
 198	size_t msg_size = sizeof(struct ceph_osd_request_head);
 199
 200	msg_size += num_op*sizeof(struct ceph_osd_op);
 201
 202	if (use_mempool) {
 203		req = mempool_alloc(osdc->req_mempool, gfp_flags);
 204		memset(req, 0, sizeof(*req));
 205	} else {
 206		req = kzalloc(sizeof(*req), gfp_flags);
 207	}
 208	if (req == NULL)
 209		return NULL;
 210
 211	req->r_osdc = osdc;
 212	req->r_mempool = use_mempool;
 213
 214	kref_init(&req->r_kref);
 215	init_completion(&req->r_completion);
 216	init_completion(&req->r_safe_completion);
 217	INIT_LIST_HEAD(&req->r_unsafe_item);
 218	INIT_LIST_HEAD(&req->r_linger_item);
 219	INIT_LIST_HEAD(&req->r_linger_osd);
 220	INIT_LIST_HEAD(&req->r_req_lru_item);
 221	req->r_flags = flags;
 222
 223	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 224
 225	/* create reply message */
 226	if (use_mempool)
 227		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 228	else
 229		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
 230				   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
 231	if (!msg) {
 232		ceph_osdc_put_request(req);
 233		return NULL;
 234	}
 235	req->r_reply = msg;
 236
 237	/* allocate space for the trailing data */
 238	if (needs_trail) {
 239		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
 240		if (!req->r_trail) {
 241			ceph_osdc_put_request(req);
 242			return NULL;
 243		}
 244		ceph_pagelist_init(req->r_trail);
 245	}
 246	/* create request message; allow space for oid */
 247	msg_size += MAX_OBJ_NAME_SIZE;
 248	if (snapc)
 249		msg_size += sizeof(u64) * snapc->num_snaps;
 250	if (use_mempool)
 251		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 252	else
 253		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
 254	if (!msg) {
 255		ceph_osdc_put_request(req);
 256		return NULL;
 257	}
 258
 259	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 260	memset(msg->front.iov_base, 0, msg->front.iov_len);
 261
 262	req->r_request = msg;
 263	req->r_pages = pages;
 264#ifdef CONFIG_BLOCK
 265	if (bio) {
 266		req->r_bio = bio;
 267		bio_get(req->r_bio);
 268	}
 269#endif
 270
 271	return req;
 272}
 273EXPORT_SYMBOL(ceph_osdc_alloc_request);
 274
 275static void osd_req_encode_op(struct ceph_osd_request *req,
 276			      struct ceph_osd_op *dst,
 277			      struct ceph_osd_req_op *src)
 278{
 279	dst->op = cpu_to_le16(src->op);
 280
 281	switch (src->op) {
 282	case CEPH_OSD_OP_READ:
 283	case CEPH_OSD_OP_WRITE:
 284		dst->extent.offset =
 285			cpu_to_le64(src->extent.offset);
 286		dst->extent.length =
 287			cpu_to_le64(src->extent.length);
 288		dst->extent.truncate_size =
 289			cpu_to_le64(src->extent.truncate_size);
 290		dst->extent.truncate_seq =
 291			cpu_to_le32(src->extent.truncate_seq);
 292		break;
 293
 294	case CEPH_OSD_OP_GETXATTR:
 295	case CEPH_OSD_OP_SETXATTR:
 296	case CEPH_OSD_OP_CMPXATTR:
 297		BUG_ON(!req->r_trail);
 298
 299		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
 300		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
 301		dst->xattr.cmp_op = src->xattr.cmp_op;
 302		dst->xattr.cmp_mode = src->xattr.cmp_mode;
 303		ceph_pagelist_append(req->r_trail, src->xattr.name,
 304				     src->xattr.name_len);
 305		ceph_pagelist_append(req->r_trail, src->xattr.val,
 306				     src->xattr.value_len);
 307		break;
 308	case CEPH_OSD_OP_CALL:
 309		BUG_ON(!req->r_trail);
 310
 311		dst->cls.class_len = src->cls.class_len;
 312		dst->cls.method_len = src->cls.method_len;
 313		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 314
 315		ceph_pagelist_append(req->r_trail, src->cls.class_name,
 316				     src->cls.class_len);
 317		ceph_pagelist_append(req->r_trail, src->cls.method_name,
 318				     src->cls.method_len);
 319		ceph_pagelist_append(req->r_trail, src->cls.indata,
 320				     src->cls.indata_len);
 321		break;
 322	case CEPH_OSD_OP_ROLLBACK:
 323		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
 324		break;
 325	case CEPH_OSD_OP_STARTSYNC:
 326		break;
 327	case CEPH_OSD_OP_NOTIFY:
 328		{
 329			__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
 330			__le32 timeout = cpu_to_le32(src->watch.timeout);
 331
 332			BUG_ON(!req->r_trail);
 333
 334			ceph_pagelist_append(req->r_trail,
 335						&prot_ver, sizeof(prot_ver));
 336			ceph_pagelist_append(req->r_trail,
 337						&timeout, sizeof(timeout));
 338		}
 339	case CEPH_OSD_OP_NOTIFY_ACK:
 340	case CEPH_OSD_OP_WATCH:
 341		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 342		dst->watch.ver = cpu_to_le64(src->watch.ver);
 343		dst->watch.flag = src->watch.flag;
 344		break;
 345	default:
 346		pr_err("unrecognized osd opcode %d\n", dst->op);
 347		WARN_ON(1);
 348		break;
 349	}
 350	dst->payload_len = cpu_to_le32(src->payload_len);
 351}
 352
 353/*
 354 * build new request AND message
 355 *
 356 */
 357void ceph_osdc_build_request(struct ceph_osd_request *req,
 358			     u64 off, u64 *plen,
 359			     struct ceph_osd_req_op *src_ops,
 360			     struct ceph_snap_context *snapc,
 361			     struct timespec *mtime,
 362			     const char *oid,
 363			     int oid_len)
 364{
 365	struct ceph_msg *msg = req->r_request;
 366	struct ceph_osd_request_head *head;
 367	struct ceph_osd_req_op *src_op;
 368	struct ceph_osd_op *op;
 369	void *p;
 370	int num_op = get_num_ops(src_ops, NULL);
 371	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
 372	int flags = req->r_flags;
 373	u64 data_len = 0;
 374	int i;
 375
 376	head = msg->front.iov_base;
 377	op = (void *)(head + 1);
 378	p = (void *)(op + num_op);
 379
 380	req->r_snapc = ceph_get_snap_context(snapc);
 381
 382	head->client_inc = cpu_to_le32(1); /* always, for now. */
 383	head->flags = cpu_to_le32(flags);
 384	if (flags & CEPH_OSD_FLAG_WRITE)
 385		ceph_encode_timespec(&head->mtime, mtime);
 386	head->num_ops = cpu_to_le16(num_op);
 387
 388
 389	/* fill in oid */
 390	head->object_len = cpu_to_le32(oid_len);
 391	memcpy(p, oid, oid_len);
 392	p += oid_len;
 393
 394	src_op = src_ops;
 395	while (src_op->op) {
 396		osd_req_encode_op(req, op, src_op);
 397		src_op++;
 398		op++;
 399	}
 400
 401	if (req->r_trail)
 402		data_len += req->r_trail->length;
 403
 404	if (snapc) {
 405		head->snap_seq = cpu_to_le64(snapc->seq);
 406		head->num_snaps = cpu_to_le32(snapc->num_snaps);
 407		for (i = 0; i < snapc->num_snaps; i++) {
 408			put_unaligned_le64(snapc->snaps[i], p);
 409			p += sizeof(u64);
 410		}
 411	}
 412
 413	if (flags & CEPH_OSD_FLAG_WRITE) {
 414		req->r_request->hdr.data_off = cpu_to_le16(off);
 415		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
 416	} else if (data_len) {
 417		req->r_request->hdr.data_off = 0;
 418		req->r_request->hdr.data_len = cpu_to_le32(data_len);
 419	}
 420
 421	req->r_request->page_alignment = req->r_page_alignment;
 422
 423	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 424	msg_size = p - msg->front.iov_base;
 425	msg->front.iov_len = msg_size;
 426	msg->hdr.front_len = cpu_to_le32(msg_size);
 427	return;
 428}
 429EXPORT_SYMBOL(ceph_osdc_build_request);
 430
 431/*
 432 * build new request AND message, calculate layout, and adjust file
 433 * extent as needed.
 434 *
 435 * if the file was recently truncated, we include information about its
 436 * old and new size so that the object can be updated appropriately.  (we
 437 * avoid synchronously deleting truncated objects because it's slow.)
 438 *
 439 * if @do_sync, include a 'startsync' command so that the osd will flush
 440 * data quickly.
 441 */
 442struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 443					       struct ceph_file_layout *layout,
 444					       struct ceph_vino vino,
 445					       u64 off, u64 *plen,
 446					       int opcode, int flags,
 447					       struct ceph_snap_context *snapc,
 448					       int do_sync,
 449					       u32 truncate_seq,
 450					       u64 truncate_size,
 451					       struct timespec *mtime,
 452					       bool use_mempool, int num_reply,
 453					       int page_align)
 454{
 455	struct ceph_osd_req_op ops[3];
 456	struct ceph_osd_request *req;
 457
 458	ops[0].op = opcode;
 459	ops[0].extent.truncate_seq = truncate_seq;
 460	ops[0].extent.truncate_size = truncate_size;
 461	ops[0].payload_len = 0;
 462
 463	if (do_sync) {
 464		ops[1].op = CEPH_OSD_OP_STARTSYNC;
 465		ops[1].payload_len = 0;
 466		ops[2].op = 0;
 467	} else
 468		ops[1].op = 0;
 469
 470	req = ceph_osdc_alloc_request(osdc, flags,
 471					 snapc, ops,
 472					 use_mempool,
 473					 GFP_NOFS, NULL, NULL);
 474	if (!req)
 475		return NULL;
 476
 477	/* calculate max write size */
 478	calc_layout(osdc, vino, layout, off, plen, req, ops);
 479	req->r_file_layout = *layout;  /* keep a copy */
 480
 481	/* in case it differs from natural (file) alignment that
 482	   calc_layout filled in for us */
 483	req->r_num_pages = calc_pages_for(page_align, *plen);
 484	req->r_page_alignment = page_align;
 485
 486	ceph_osdc_build_request(req, off, plen, ops,
 487				snapc,
 488				mtime,
 489				req->r_oid, req->r_oid_len);
 490
 491	return req;
 492}
 493EXPORT_SYMBOL(ceph_osdc_new_request);
 494
 495/*
 496 * We keep osd requests in an rbtree, sorted by ->r_tid.
 497 */
 498static void __insert_request(struct ceph_osd_client *osdc,
 499			     struct ceph_osd_request *new)
 500{
 501	struct rb_node **p = &osdc->requests.rb_node;
 502	struct rb_node *parent = NULL;
 503	struct ceph_osd_request *req = NULL;
 504
 505	while (*p) {
 506		parent = *p;
 507		req = rb_entry(parent, struct ceph_osd_request, r_node);
 508		if (new->r_tid < req->r_tid)
 509			p = &(*p)->rb_left;
 510		else if (new->r_tid > req->r_tid)
 511			p = &(*p)->rb_right;
 512		else
 513			BUG();
 514	}
 515
 516	rb_link_node(&new->r_node, parent, p);
 517	rb_insert_color(&new->r_node, &osdc->requests);
 518}
 519
 520static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
 521						 u64 tid)
 522{
 523	struct ceph_osd_request *req;
 524	struct rb_node *n = osdc->requests.rb_node;
 525
 526	while (n) {
 527		req = rb_entry(n, struct ceph_osd_request, r_node);
 528		if (tid < req->r_tid)
 529			n = n->rb_left;
 530		else if (tid > req->r_tid)
 531			n = n->rb_right;
 532		else
 533			return req;
 534	}
 535	return NULL;
 536}
 537
 538static struct ceph_osd_request *
 539__lookup_request_ge(struct ceph_osd_client *osdc,
 540		    u64 tid)
 541{
 542	struct ceph_osd_request *req;
 543	struct rb_node *n = osdc->requests.rb_node;
 544
 545	while (n) {
 546		req = rb_entry(n, struct ceph_osd_request, r_node);
 547		if (tid < req->r_tid) {
 548			if (!n->rb_left)
 549				return req;
 550			n = n->rb_left;
 551		} else if (tid > req->r_tid) {
 552			n = n->rb_right;
 553		} else {
 554			return req;
 555		}
 556	}
 557	return NULL;
 558}
 559
 560/*
 561 * Resubmit requests pending on the given osd.
 562 */
 563static void __kick_osd_requests(struct ceph_osd_client *osdc,
 564				struct ceph_osd *osd)
 565{
 566	struct ceph_osd_request *req, *nreq;
 567	int err;
 568
 569	dout("__kick_osd_requests osd%d\n", osd->o_osd);
 570	err = __reset_osd(osdc, osd);
 571	if (err == -EAGAIN)
 572		return;
 573
 574	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
 575		list_move(&req->r_req_lru_item, &osdc->req_unsent);
 576		dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
 577		     osd->o_osd);
 578		if (!req->r_linger)
 579			req->r_flags |= CEPH_OSD_FLAG_RETRY;
 580	}
 581
 582	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
 583				 r_linger_osd) {
 584		/*
 585		 * reregister request prior to unregistering linger so
 586		 * that r_osd is preserved.
 587		 */
 588		BUG_ON(!list_empty(&req->r_req_lru_item));
 589		__register_request(osdc, req);
 590		list_add(&req->r_req_lru_item, &osdc->req_unsent);
 591		list_add(&req->r_osd_item, &req->r_osd->o_requests);
 592		__unregister_linger_request(osdc, req);
 593		dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
 594		     osd->o_osd);
 595	}
 596}
 597
 598static void kick_osd_requests(struct ceph_osd_client *osdc,
 599			      struct ceph_osd *kickosd)
 600{
 601	mutex_lock(&osdc->request_mutex);
 602	__kick_osd_requests(osdc, kickosd);
 603	mutex_unlock(&osdc->request_mutex);
 604}
 605
 606/*
 607 * If the osd connection drops, we need to resubmit all requests.
 608 */
 609static void osd_reset(struct ceph_connection *con)
 610{
 611	struct ceph_osd *osd = con->private;
 612	struct ceph_osd_client *osdc;
 613
 614	if (!osd)
 615		return;
 616	dout("osd_reset osd%d\n", osd->o_osd);
 617	osdc = osd->o_osdc;
 618	down_read(&osdc->map_sem);
 619	kick_osd_requests(osdc, osd);
 620	send_queued(osdc);
 621	up_read(&osdc->map_sem);
 622}
 623
 624/*
 625 * Track open sessions with osds.
 626 */
 627static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 628{
 629	struct ceph_osd *osd;
 630
 631	osd = kzalloc(sizeof(*osd), GFP_NOFS);
 632	if (!osd)
 633		return NULL;
 634
 635	atomic_set(&osd->o_ref, 1);
 636	osd->o_osdc = osdc;
 637	INIT_LIST_HEAD(&osd->o_requests);
 638	INIT_LIST_HEAD(&osd->o_linger_requests);
 639	INIT_LIST_HEAD(&osd->o_osd_lru);
 640	osd->o_incarnation = 1;
 641
 642	ceph_con_init(osdc->client->msgr, &osd->o_con);
 643	osd->o_con.private = osd;
 644	osd->o_con.ops = &osd_con_ops;
 645	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 646
 647	INIT_LIST_HEAD(&osd->o_keepalive_item);
 648	return osd;
 649}
 650
 651static struct ceph_osd *get_osd(struct ceph_osd *osd)
 652{
 653	if (atomic_inc_not_zero(&osd->o_ref)) {
 654		dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
 655		     atomic_read(&osd->o_ref));
 656		return osd;
 657	} else {
 658		dout("get_osd %p FAIL\n", osd);
 659		return NULL;
 660	}
 661}
 662
 663static void put_osd(struct ceph_osd *osd)
 664{
 665	dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
 666	     atomic_read(&osd->o_ref) - 1);
 667	if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
 668		struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
 669
 670		if (ac->ops && ac->ops->destroy_authorizer)
 671			ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
 672		kfree(osd);
 673	}
 674}
 675
 676/*
 677 * remove an osd from our map
 678 */
 679static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 680{
 681	dout("__remove_osd %p\n", osd);
 682	BUG_ON(!list_empty(&osd->o_requests));
 683	rb_erase(&osd->o_node, &osdc->osds);
 684	list_del_init(&osd->o_osd_lru);
 685	ceph_con_close(&osd->o_con);
 686	put_osd(osd);
 687}
 688
 689static void remove_all_osds(struct ceph_osd_client *osdc)
 690{
 691	dout("__remove_old_osds %p\n", osdc);
 692	mutex_lock(&osdc->request_mutex);
 693	while (!RB_EMPTY_ROOT(&osdc->osds)) {
 694		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
 695						struct ceph_osd, o_node);
 696		__remove_osd(osdc, osd);
 697	}
 698	mutex_unlock(&osdc->request_mutex);
 699}
 700
 701static void __move_osd_to_lru(struct ceph_osd_client *osdc,
 702			      struct ceph_osd *osd)
 703{
 704	dout("__move_osd_to_lru %p\n", osd);
 705	BUG_ON(!list_empty(&osd->o_osd_lru));
 706	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
 707	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
 708}
 709
 710static void __remove_osd_from_lru(struct ceph_osd *osd)
 711{
 712	dout("__remove_osd_from_lru %p\n", osd);
 713	if (!list_empty(&osd->o_osd_lru))
 714		list_del_init(&osd->o_osd_lru);
 715}
 716
 717static void remove_old_osds(struct ceph_osd_client *osdc)
 718{
 719	struct ceph_osd *osd, *nosd;
 720
 721	dout("__remove_old_osds %p\n", osdc);
 722	mutex_lock(&osdc->request_mutex);
 723	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
 724		if (time_before(jiffies, osd->lru_ttl))
 725			break;
 726		__remove_osd(osdc, osd);
 727	}
 728	mutex_unlock(&osdc->request_mutex);
 729}
 730
 731/*
 732 * reset osd connect
 733 */
 734static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 735{
 736	struct ceph_osd_request *req;
 737	int ret = 0;
 738
 739	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 740	if (list_empty(&osd->o_requests) &&
 741	    list_empty(&osd->o_linger_requests)) {
 742		__remove_osd(osdc, osd);
 743	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
 744			  &osd->o_con.peer_addr,
 745			  sizeof(osd->o_con.peer_addr)) == 0 &&
 746		   !ceph_con_opened(&osd->o_con)) {
 747		dout(" osd addr hasn't changed and connection never opened,"
 748		     " letting msgr retry");
 749		/* touch each r_stamp for handle_timeout()'s benfit */
 750		list_for_each_entry(req, &osd->o_requests, r_osd_item)
 751			req->r_stamp = jiffies;
 752		ret = -EAGAIN;
 753	} else {
 754		ceph_con_close(&osd->o_con);
 755		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
 756		osd->o_incarnation++;
 757	}
 758	return ret;
 759}
 760
 761static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
 762{
 763	struct rb_node **p = &osdc->osds.rb_node;
 764	struct rb_node *parent = NULL;
 765	struct ceph_osd *osd = NULL;
 766
 767	dout("__insert_osd %p osd%d\n", new, new->o_osd);
 768	while (*p) {
 769		parent = *p;
 770		osd = rb_entry(parent, struct ceph_osd, o_node);
 771		if (new->o_osd < osd->o_osd)
 772			p = &(*p)->rb_left;
 773		else if (new->o_osd > osd->o_osd)
 774			p = &(*p)->rb_right;
 775		else
 776			BUG();
 777	}
 778
 779	rb_link_node(&new->o_node, parent, p);
 780	rb_insert_color(&new->o_node, &osdc->osds);
 781}
 782
 783static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
 784{
 785	struct ceph_osd *osd;
 786	struct rb_node *n = osdc->osds.rb_node;
 787
 788	while (n) {
 789		osd = rb_entry(n, struct ceph_osd, o_node);
 790		if (o < osd->o_osd)
 791			n = n->rb_left;
 792		else if (o > osd->o_osd)
 793			n = n->rb_right;
 794		else
 795			return osd;
 796	}
 797	return NULL;
 798}
 799
 800static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
 801{
 802	schedule_delayed_work(&osdc->timeout_work,
 803			osdc->client->options->osd_keepalive_timeout * HZ);
 804}
 805
 806static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
 807{
 808	cancel_delayed_work(&osdc->timeout_work);
 809}
 810
 811/*
 812 * Register request, assign tid.  If this is the first request, set up
 813 * the timeout event.
 814 */
 815static void __register_request(struct ceph_osd_client *osdc,
 816			       struct ceph_osd_request *req)
 817{
 818	req->r_tid = ++osdc->last_tid;
 819	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
 820	dout("__register_request %p tid %lld\n", req, req->r_tid);
 821	__insert_request(osdc, req);
 822	ceph_osdc_get_request(req);
 823	osdc->num_requests++;
 824	if (osdc->num_requests == 1) {
 825		dout(" first request, scheduling timeout\n");
 826		__schedule_osd_timeout(osdc);
 827	}
 828}
 829
 830static void register_request(struct ceph_osd_client *osdc,
 831			     struct ceph_osd_request *req)
 832{
 833	mutex_lock(&osdc->request_mutex);
 834	__register_request(osdc, req);
 835	mutex_unlock(&osdc->request_mutex);
 836}
 837
 838/*
 839 * called under osdc->request_mutex
 840 */
 841static void __unregister_request(struct ceph_osd_client *osdc,
 842				 struct ceph_osd_request *req)
 843{
 844	if (RB_EMPTY_NODE(&req->r_node)) {
 845		dout("__unregister_request %p tid %lld not registered\n",
 846			req, req->r_tid);
 847		return;
 848	}
 849
 850	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 851	rb_erase(&req->r_node, &osdc->requests);
 852	osdc->num_requests--;
 853
 854	if (req->r_osd) {
 855		/* make sure the original request isn't in flight. */
 856		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 857
 858		list_del_init(&req->r_osd_item);
 859		if (list_empty(&req->r_osd->o_requests) &&
 860		    list_empty(&req->r_osd->o_linger_requests)) {
 861			dout("moving osd to %p lru\n", req->r_osd);
 862			__move_osd_to_lru(osdc, req->r_osd);
 863		}
 864		if (list_empty(&req->r_linger_item))
 865			req->r_osd = NULL;
 866	}
 867
 868	ceph_osdc_put_request(req);
 869
 870	list_del_init(&req->r_req_lru_item);
 871	if (osdc->num_requests == 0) {
 872		dout(" no requests, canceling timeout\n");
 873		__cancel_osd_timeout(osdc);
 874	}
 875}
 876
 877/*
 878 * Cancel a previously queued request message
 879 */
 880static void __cancel_request(struct ceph_osd_request *req)
 881{
 882	if (req->r_sent && req->r_osd) {
 883		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 884		req->r_sent = 0;
 885	}
 886}
 887
 888static void __register_linger_request(struct ceph_osd_client *osdc,
 889				    struct ceph_osd_request *req)
 890{
 891	dout("__register_linger_request %p\n", req);
 892	list_add_tail(&req->r_linger_item, &osdc->req_linger);
 893	list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
 894}
 895
 896static void __unregister_linger_request(struct ceph_osd_client *osdc,
 897					struct ceph_osd_request *req)
 898{
 899	dout("__unregister_linger_request %p\n", req);
 900	if (req->r_osd) {
 901		list_del_init(&req->r_linger_item);
 902		list_del_init(&req->r_linger_osd);
 903
 904		if (list_empty(&req->r_osd->o_requests) &&
 905		    list_empty(&req->r_osd->o_linger_requests)) {
 906			dout("moving osd to %p lru\n", req->r_osd);
 907			__move_osd_to_lru(osdc, req->r_osd);
 908		}
 909		if (list_empty(&req->r_osd_item))
 910			req->r_osd = NULL;
 911	}
 912}
 913
 914void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
 915					 struct ceph_osd_request *req)
 916{
 917	mutex_lock(&osdc->request_mutex);
 918	if (req->r_linger) {
 919		__unregister_linger_request(osdc, req);
 920		ceph_osdc_put_request(req);
 921	}
 922	mutex_unlock(&osdc->request_mutex);
 923}
 924EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
 925
 926void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 927				  struct ceph_osd_request *req)
 928{
 929	if (!req->r_linger) {
 930		dout("set_request_linger %p\n", req);
 931		req->r_linger = 1;
 932		/*
 933		 * caller is now responsible for calling
 934		 * unregister_linger_request
 935		 */
 936		ceph_osdc_get_request(req);
 937	}
 938}
 939EXPORT_SYMBOL(ceph_osdc_set_request_linger);
 940
 941/*
 942 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
 943 * (as needed), and set the request r_osd appropriately.  If there is
 944 * no up osd, set r_osd to NULL.  Move the request to the appropriate list
 945 * (unsent, homeless) or leave on in-flight lru.
 946 *
 947 * Return 0 if unchanged, 1 if changed, or negative on error.
 948 *
 949 * Caller should hold map_sem for read and request_mutex.
 950 */
 951static int __map_request(struct ceph_osd_client *osdc,
 952			 struct ceph_osd_request *req, int force_resend)
 953{
 954	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 955	struct ceph_pg pgid;
 956	int acting[CEPH_PG_MAX_SIZE];
 957	int o = -1, num = 0;
 958	int err;
 959
 960	dout("map_request %p tid %lld\n", req, req->r_tid);
 961	err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
 962				      &req->r_file_layout, osdc->osdmap);
 963	if (err) {
 964		list_move(&req->r_req_lru_item, &osdc->req_notarget);
 965		return err;
 966	}
 967	pgid = reqhead->layout.ol_pgid;
 968	req->r_pgid = pgid;
 969
 970	err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
 971	if (err > 0) {
 972		o = acting[0];
 973		num = err;
 974	}
 975
 976	if ((!force_resend &&
 977	     req->r_osd && req->r_osd->o_osd == o &&
 978	     req->r_sent >= req->r_osd->o_incarnation &&
 979	     req->r_num_pg_osds == num &&
 980	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
 981	    (req->r_osd == NULL && o == -1))
 982		return 0;  /* no change */
 983
 984	dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
 985	     req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
 986	     req->r_osd ? req->r_osd->o_osd : -1);
 987
 988	/* record full pg acting set */
 989	memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
 990	req->r_num_pg_osds = num;
 991
 992	if (req->r_osd) {
 993		__cancel_request(req);
 994		list_del_init(&req->r_osd_item);
 995		req->r_osd = NULL;
 996	}
 997
 998	req->r_osd = __lookup_osd(osdc, o);
 999	if (!req->r_osd && o >= 0) {
1000		err = -ENOMEM;
1001		req->r_osd = create_osd(osdc);
1002		if (!req->r_osd) {
1003			list_move(&req->r_req_lru_item, &osdc->req_notarget);
1004			goto out;
1005		}
1006
1007		dout("map_request osd %p is osd%d\n", req->r_osd, o);
1008		req->r_osd->o_osd = o;
1009		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
1010		__insert_osd(osdc, req->r_osd);
1011
1012		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
1013	}
1014
1015	if (req->r_osd) {
1016		__remove_osd_from_lru(req->r_osd);
1017		list_add(&req->r_osd_item, &req->r_osd->o_requests);
1018		list_move(&req->r_req_lru_item, &osdc->req_unsent);
1019	} else {
1020		list_move(&req->r_req_lru_item, &osdc->req_notarget);
1021	}
1022	err = 1;   /* osd or pg changed */
1023
1024out:
1025	return err;
1026}
1027
1028/*
1029 * caller should hold map_sem (for read) and request_mutex
1030 */
1031static void __send_request(struct ceph_osd_client *osdc,
1032			   struct ceph_osd_request *req)
1033{
1034	struct ceph_osd_request_head *reqhead;
1035
1036	dout("send_request %p tid %llu to osd%d flags %d\n",
1037	     req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1038
1039	reqhead = req->r_request->front.iov_base;
1040	reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1041	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
1042	reqhead->reassert_version = req->r_reassert_version;
1043
1044	req->r_stamp = jiffies;
1045	list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1046
1047	ceph_msg_get(req->r_request); /* send consumes a ref */
1048	ceph_con_send(&req->r_osd->o_con, req->r_request);
1049	req->r_sent = req->r_osd->o_incarnation;
1050}
1051
1052/*
1053 * Send any requests in the queue (req_unsent).
1054 */
1055static void send_queued(struct ceph_osd_client *osdc)
1056{
1057	struct ceph_osd_request *req, *tmp;
1058
1059	dout("send_queued\n");
1060	mutex_lock(&osdc->request_mutex);
1061	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1062		__send_request(osdc, req);
1063	}
1064	mutex_unlock(&osdc->request_mutex);
1065}
1066
1067/*
1068 * Timeout callback, called every N seconds when 1 or more osd
1069 * requests has been active for more than N seconds.  When this
1070 * happens, we ping all OSDs with requests who have timed out to
1071 * ensure any communications channel reset is detected.  Reset the
1072 * request timeouts another N seconds in the future as we go.
1073 * Reschedule the timeout event another N seconds in future (unless
1074 * there are no open requests).
1075 */
1076static void handle_timeout(struct work_struct *work)
1077{
1078	struct ceph_osd_client *osdc =
1079		container_of(work, struct ceph_osd_client, timeout_work.work);
1080	struct ceph_osd_request *req, *last_req = NULL;
1081	struct ceph_osd *osd;
1082	unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1083	unsigned long keepalive =
1084		osdc->client->options->osd_keepalive_timeout * HZ;
1085	unsigned long last_stamp = 0;
1086	struct list_head slow_osds;
1087	dout("timeout\n");
1088	down_read(&osdc->map_sem);
1089
1090	ceph_monc_request_next_osdmap(&osdc->client->monc);
1091
1092	mutex_lock(&osdc->request_mutex);
1093
1094	/*
1095	 * reset osds that appear to be _really_ unresponsive.  this
1096	 * is a failsafe measure.. we really shouldn't be getting to
1097	 * this point if the system is working properly.  the monitors
1098	 * should mark the osd as failed and we should find out about
1099	 * it from an updated osd map.
1100	 */
1101	while (timeout && !list_empty(&osdc->req_lru)) {
1102		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1103				 r_req_lru_item);
1104
1105		/* hasn't been long enough since we sent it? */
1106		if (time_before(jiffies, req->r_stamp + timeout))
1107			break;
1108
1109		/* hasn't been long enough since it was acked? */
1110		if (req->r_request->ack_stamp == 0 ||
1111		    time_before(jiffies, req->r_request->ack_stamp + timeout))
1112			break;
1113
1114		BUG_ON(req == last_req && req->r_stamp == last_stamp);
1115		last_req = req;
1116		last_stamp = req->r_stamp;
1117
1118		osd = req->r_osd;
1119		BUG_ON(!osd);
1120		pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1121			   req->r_tid, osd->o_osd);
1122		__kick_osd_requests(osdc, osd);
1123	}
1124
1125	/*
1126	 * ping osds that are a bit slow.  this ensures that if there
1127	 * is a break in the TCP connection we will notice, and reopen
1128	 * a connection with that osd (from the fault callback).
1129	 */
1130	INIT_LIST_HEAD(&slow_osds);
1131	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1132		if (time_before(jiffies, req->r_stamp + keepalive))
1133			break;
1134
1135		osd = req->r_osd;
1136		BUG_ON(!osd);
1137		dout(" tid %llu is slow, will send keepalive on osd%d\n",
1138		     req->r_tid, osd->o_osd);
1139		list_move_tail(&osd->o_keepalive_item, &slow_osds);
1140	}
1141	while (!list_empty(&slow_osds)) {
1142		osd = list_entry(slow_osds.next, struct ceph_osd,
1143				 o_keepalive_item);
1144		list_del_init(&osd->o_keepalive_item);
1145		ceph_con_keepalive(&osd->o_con);
1146	}
1147
1148	__schedule_osd_timeout(osdc);
1149	mutex_unlock(&osdc->request_mutex);
1150	send_queued(osdc);
1151	up_read(&osdc->map_sem);
1152}
1153
1154static void handle_osds_timeout(struct work_struct *work)
1155{
1156	struct ceph_osd_client *osdc =
1157		container_of(work, struct ceph_osd_client,
1158			     osds_timeout_work.work);
1159	unsigned long delay =
1160		osdc->client->options->osd_idle_ttl * HZ >> 2;
1161
1162	dout("osds timeout\n");
1163	down_read(&osdc->map_sem);
1164	remove_old_osds(osdc);
1165	up_read(&osdc->map_sem);
1166
1167	schedule_delayed_work(&osdc->osds_timeout_work,
1168			      round_jiffies_relative(delay));
1169}
1170
1171static void complete_request(struct ceph_osd_request *req)
1172{
1173	if (req->r_safe_callback)
1174		req->r_safe_callback(req, NULL);
1175	complete_all(&req->r_safe_completion);  /* fsync waiter */
1176}
1177
1178/*
1179 * handle osd op reply.  either call the callback if it is specified,
1180 * or do the completion to wake up the waiting thread.
1181 */
1182static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1183			 struct ceph_connection *con)
1184{
1185	struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1186	struct ceph_osd_request *req;
1187	u64 tid;
1188	int numops, object_len, flags;
1189	s32 result;
1190
1191	tid = le64_to_cpu(msg->hdr.tid);
1192	if (msg->front.iov_len < sizeof(*rhead))
1193		goto bad;
1194	numops = le32_to_cpu(rhead->num_ops);
1195	object_len = le32_to_cpu(rhead->object_len);
1196	result = le32_to_cpu(rhead->result);
1197	if (msg->front.iov_len != sizeof(*rhead) + object_len +
1198	    numops * sizeof(struct ceph_osd_op))
1199		goto bad;
1200	dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1201	/* lookup */
1202	mutex_lock(&osdc->request_mutex);
1203	req = __lookup_request(osdc, tid);
1204	if (req == NULL) {
1205		dout("handle_reply tid %llu dne\n", tid);
1206		mutex_unlock(&osdc->request_mutex);
1207		return;
1208	}
1209	ceph_osdc_get_request(req);
1210	flags = le32_to_cpu(rhead->flags);
1211
1212	/*
1213	 * if this connection filled our message, drop our reference now, to
1214	 * avoid a (safe but slower) revoke later.
1215	 */
1216	if (req->r_con_filling_msg == con && req->r_reply == msg) {
1217		dout(" dropping con_filling_msg ref %p\n", con);
1218		req->r_con_filling_msg = NULL;
1219		con->ops->put(con);
1220	}
1221
1222	if (!req->r_got_reply) {
1223		unsigned int bytes;
1224
1225		req->r_result = le32_to_cpu(rhead->result);
1226		bytes = le32_to_cpu(msg->hdr.data_len);
1227		dout("handle_reply result %d bytes %d\n", req->r_result,
1228		     bytes);
1229		if (req->r_result == 0)
1230			req->r_result = bytes;
1231
1232		/* in case this is a write and we need to replay, */
1233		req->r_reassert_version = rhead->reassert_version;
1234
1235		req->r_got_reply = 1;
1236	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1237		dout("handle_reply tid %llu dup ack\n", tid);
1238		mutex_unlock(&osdc->request_mutex);
1239		goto done;
1240	}
1241
1242	dout("handle_reply tid %llu flags %d\n", tid, flags);
1243
1244	if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1245		__register_linger_request(osdc, req);
1246
1247	/* either this is a read, or we got the safe response */
1248	if (result < 0 ||
1249	    (flags & CEPH_OSD_FLAG_ONDISK) ||
1250	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1251		__unregister_request(osdc, req);
1252
1253	mutex_unlock(&osdc->request_mutex);
1254
1255	if (req->r_callback)
1256		req->r_callback(req, msg);
1257	else
1258		complete_all(&req->r_completion);
1259
1260	if (flags & CEPH_OSD_FLAG_ONDISK)
1261		complete_request(req);
1262
1263done:
1264	dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1265	ceph_osdc_put_request(req);
1266	return;
1267
1268bad:
1269	pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1270	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1271	       (int)sizeof(*rhead));
1272	ceph_msg_dump(msg);
1273}
1274
1275static void reset_changed_osds(struct ceph_osd_client *osdc)
1276{
1277	struct rb_node *p, *n;
1278
1279	for (p = rb_first(&osdc->osds); p; p = n) {
1280		struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1281
1282		n = rb_next(p);
1283		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1284		    memcmp(&osd->o_con.peer_addr,
1285			   ceph_osd_addr(osdc->osdmap,
1286					 osd->o_osd),
1287			   sizeof(struct ceph_entity_addr)) != 0)
1288			__reset_osd(osdc, osd);
1289	}
1290}
1291
1292/*
1293 * Requeue requests whose mapping to an OSD has changed.  If requests map to
1294 * no osd, request a new map.
1295 *
1296 * Caller should hold map_sem for read and request_mutex.
1297 */
1298static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1299{
1300	struct ceph_osd_request *req, *nreq;
1301	struct rb_node *p;
1302	int needmap = 0;
1303	int err;
1304
1305	dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1306	mutex_lock(&osdc->request_mutex);
1307	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1308		req = rb_entry(p, struct ceph_osd_request, r_node);
1309		err = __map_request(osdc, req, force_resend);
1310		if (err < 0)
1311			continue;  /* error */
1312		if (req->r_osd == NULL) {
1313			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1314			needmap++;  /* request a newer map */
1315		} else if (err > 0) {
1316			dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1317			     req->r_osd ? req->r_osd->o_osd : -1);
1318			if (!req->r_linger)
1319				req->r_flags |= CEPH_OSD_FLAG_RETRY;
1320		}
1321	}
1322
1323	list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1324				 r_linger_item) {
1325		dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1326
1327		err = __map_request(osdc, req, force_resend);
1328		if (err == 0)
1329			continue;  /* no change and no osd was specified */
1330		if (err < 0)
1331			continue;  /* hrm! */
1332		if (req->r_osd == NULL) {
1333			dout("tid %llu maps to no valid osd\n", req->r_tid);
1334			needmap++;  /* request a newer map */
1335			continue;
1336		}
1337
1338		dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1339		     req->r_osd ? req->r_osd->o_osd : -1);
1340		__unregister_linger_request(osdc, req);
1341		__register_request(osdc, req);
1342	}
1343	mutex_unlock(&osdc->request_mutex);
1344
1345	if (needmap) {
1346		dout("%d requests for down osds, need new map\n", needmap);
1347		ceph_monc_request_next_osdmap(&osdc->client->monc);
1348	}
1349}
1350
1351
1352/*
1353 * Process updated osd map.
1354 *
1355 * The message contains any number of incremental and full maps, normally
1356 * indicating some sort of topology change in the cluster.  Kick requests
1357 * off to different OSDs as needed.
1358 */
1359void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1360{
1361	void *p, *end, *next;
1362	u32 nr_maps, maplen;
1363	u32 epoch;
1364	struct ceph_osdmap *newmap = NULL, *oldmap;
1365	int err;
1366	struct ceph_fsid fsid;
1367
1368	dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1369	p = msg->front.iov_base;
1370	end = p + msg->front.iov_len;
1371
1372	/* verify fsid */
1373	ceph_decode_need(&p, end, sizeof(fsid), bad);
1374	ceph_decode_copy(&p, &fsid, sizeof(fsid));
1375	if (ceph_check_fsid(osdc->client, &fsid) < 0)
1376		return;
1377
1378	down_write(&osdc->map_sem);
1379
1380	/* incremental maps */
1381	ceph_decode_32_safe(&p, end, nr_maps, bad);
1382	dout(" %d inc maps\n", nr_maps);
1383	while (nr_maps > 0) {
1384		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1385		epoch = ceph_decode_32(&p);
1386		maplen = ceph_decode_32(&p);
1387		ceph_decode_need(&p, end, maplen, bad);
1388		next = p + maplen;
1389		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1390			dout("applying incremental map %u len %d\n",
1391			     epoch, maplen);
1392			newmap = osdmap_apply_incremental(&p, next,
1393							  osdc->osdmap,
1394							  osdc->client->msgr);
1395			if (IS_ERR(newmap)) {
1396				err = PTR_ERR(newmap);
1397				goto bad;
1398			}
1399			BUG_ON(!newmap);
1400			if (newmap != osdc->osdmap) {
1401				ceph_osdmap_destroy(osdc->osdmap);
1402				osdc->osdmap = newmap;
1403			}
1404			kick_requests(osdc, 0);
1405			reset_changed_osds(osdc);
1406		} else {
1407			dout("ignoring incremental map %u len %d\n",
1408			     epoch, maplen);
1409		}
1410		p = next;
1411		nr_maps--;
1412	}
1413	if (newmap)
1414		goto done;
1415
1416	/* full maps */
1417	ceph_decode_32_safe(&p, end, nr_maps, bad);
1418	dout(" %d full maps\n", nr_maps);
1419	while (nr_maps) {
1420		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1421		epoch = ceph_decode_32(&p);
1422		maplen = ceph_decode_32(&p);
1423		ceph_decode_need(&p, end, maplen, bad);
1424		if (nr_maps > 1) {
1425			dout("skipping non-latest full map %u len %d\n",
1426			     epoch, maplen);
1427		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1428			dout("skipping full map %u len %d, "
1429			     "older than our %u\n", epoch, maplen,
1430			     osdc->osdmap->epoch);
1431		} else {
1432			int skipped_map = 0;
1433
1434			dout("taking full map %u len %d\n", epoch, maplen);
1435			newmap = osdmap_decode(&p, p+maplen);
1436			if (IS_ERR(newmap)) {
1437				err = PTR_ERR(newmap);
1438				goto bad;
1439			}
1440			BUG_ON(!newmap);
1441			oldmap = osdc->osdmap;
1442			osdc->osdmap = newmap;
1443			if (oldmap) {
1444				if (oldmap->epoch + 1 < newmap->epoch)
1445					skipped_map = 1;
1446				ceph_osdmap_destroy(oldmap);
1447			}
1448			kick_requests(osdc, skipped_map);
1449		}
1450		p += maplen;
1451		nr_maps--;
1452	}
1453
1454done:
1455	downgrade_write(&osdc->map_sem);
1456	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1457
1458	/*
1459	 * subscribe to subsequent osdmap updates if full to ensure
1460	 * we find out when we are no longer full and stop returning
1461	 * ENOSPC.
1462	 */
1463	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1464		ceph_monc_request_next_osdmap(&osdc->client->monc);
1465
1466	send_queued(osdc);
1467	up_read(&osdc->map_sem);
1468	wake_up_all(&osdc->client->auth_wq);
1469	return;
1470
1471bad:
1472	pr_err("osdc handle_map corrupt msg\n");
1473	ceph_msg_dump(msg);
1474	up_write(&osdc->map_sem);
1475	return;
1476}
1477
1478/*
1479 * watch/notify callback event infrastructure
1480 *
1481 * These callbacks are used both for watch and notify operations.
1482 */
1483static void __release_event(struct kref *kref)
1484{
1485	struct ceph_osd_event *event =
1486		container_of(kref, struct ceph_osd_event, kref);
1487
1488	dout("__release_event %p\n", event);
1489	kfree(event);
1490}
1491
1492static void get_event(struct ceph_osd_event *event)
1493{
1494	kref_get(&event->kref);
1495}
1496
1497void ceph_osdc_put_event(struct ceph_osd_event *event)
1498{
1499	kref_put(&event->kref, __release_event);
1500}
1501EXPORT_SYMBOL(ceph_osdc_put_event);
1502
1503static void __insert_event(struct ceph_osd_client *osdc,
1504			     struct ceph_osd_event *new)
1505{
1506	struct rb_node **p = &osdc->event_tree.rb_node;
1507	struct rb_node *parent = NULL;
1508	struct ceph_osd_event *event = NULL;
1509
1510	while (*p) {
1511		parent = *p;
1512		event = rb_entry(parent, struct ceph_osd_event, node);
1513		if (new->cookie < event->cookie)
1514			p = &(*p)->rb_left;
1515		else if (new->cookie > event->cookie)
1516			p = &(*p)->rb_right;
1517		else
1518			BUG();
1519	}
1520
1521	rb_link_node(&new->node, parent, p);
1522	rb_insert_color(&new->node, &osdc->event_tree);
1523}
1524
1525static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1526					        u64 cookie)
1527{
1528	struct rb_node **p = &osdc->event_tree.rb_node;
1529	struct rb_node *parent = NULL;
1530	struct ceph_osd_event *event = NULL;
1531
1532	while (*p) {
1533		parent = *p;
1534		event = rb_entry(parent, struct ceph_osd_event, node);
1535		if (cookie < event->cookie)
1536			p = &(*p)->rb_left;
1537		else if (cookie > event->cookie)
1538			p = &(*p)->rb_right;
1539		else
1540			return event;
1541	}
1542	return NULL;
1543}
1544
1545static void __remove_event(struct ceph_osd_event *event)
1546{
1547	struct ceph_osd_client *osdc = event->osdc;
1548
1549	if (!RB_EMPTY_NODE(&event->node)) {
1550		dout("__remove_event removed %p\n", event);
1551		rb_erase(&event->node, &osdc->event_tree);
1552		ceph_osdc_put_event(event);
1553	} else {
1554		dout("__remove_event didn't remove %p\n", event);
1555	}
1556}
1557
1558int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1559			   void (*event_cb)(u64, u64, u8, void *),
1560			   int one_shot, void *data,
1561			   struct ceph_osd_event **pevent)
1562{
1563	struct ceph_osd_event *event;
1564
1565	event = kmalloc(sizeof(*event), GFP_NOIO);
1566	if (!event)
1567		return -ENOMEM;
1568
1569	dout("create_event %p\n", event);
1570	event->cb = event_cb;
1571	event->one_shot = one_shot;
1572	event->data = data;
1573	event->osdc = osdc;
1574	INIT_LIST_HEAD(&event->osd_node);
1575	kref_init(&event->kref);   /* one ref for us */
1576	kref_get(&event->kref);    /* one ref for the caller */
1577	init_completion(&event->completion);
1578
1579	spin_lock(&osdc->event_lock);
1580	event->cookie = ++osdc->event_count;
1581	__insert_event(osdc, event);
1582	spin_unlock(&osdc->event_lock);
1583
1584	*pevent = event;
1585	return 0;
1586}
1587EXPORT_SYMBOL(ceph_osdc_create_event);
1588
1589void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1590{
1591	struct ceph_osd_client *osdc = event->osdc;
1592
1593	dout("cancel_event %p\n", event);
1594	spin_lock(&osdc->event_lock);
1595	__remove_event(event);
1596	spin_unlock(&osdc->event_lock);
1597	ceph_osdc_put_event(event); /* caller's */
1598}
1599EXPORT_SYMBOL(ceph_osdc_cancel_event);
1600
1601
1602static void do_event_work(struct work_struct *work)
1603{
1604	struct ceph_osd_event_work *event_work =
1605		container_of(work, struct ceph_osd_event_work, work);
1606	struct ceph_osd_event *event = event_work->event;
1607	u64 ver = event_work->ver;
1608	u64 notify_id = event_work->notify_id;
1609	u8 opcode = event_work->opcode;
1610
1611	dout("do_event_work completing %p\n", event);
1612	event->cb(ver, notify_id, opcode, event->data);
1613	complete(&event->completion);
1614	dout("do_event_work completed %p\n", event);
1615	ceph_osdc_put_event(event);
1616	kfree(event_work);
1617}
1618
1619
1620/*
1621 * Process osd watch notifications
1622 */
1623void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1624{
1625	void *p, *end;
1626	u8 proto_ver;
1627	u64 cookie, ver, notify_id;
1628	u8 opcode;
1629	struct ceph_osd_event *event;
1630	struct ceph_osd_event_work *event_work;
1631
1632	p = msg->front.iov_base;
1633	end = p + msg->front.iov_len;
1634
1635	ceph_decode_8_safe(&p, end, proto_ver, bad);
1636	ceph_decode_8_safe(&p, end, opcode, bad);
1637	ceph_decode_64_safe(&p, end, cookie, bad);
1638	ceph_decode_64_safe(&p, end, ver, bad);
1639	ceph_decode_64_safe(&p, end, notify_id, bad);
1640
1641	spin_lock(&osdc->event_lock);
1642	event = __find_event(osdc, cookie);
1643	if (event) {
1644		get_event(event);
1645		if (event->one_shot)
1646			__remove_event(event);
1647	}
1648	spin_unlock(&osdc->event_lock);
1649	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1650	     cookie, ver, event);
1651	if (event) {
1652		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1653		if (!event_work) {
1654			dout("ERROR: could not allocate event_work\n");
1655			goto done_err;
1656		}
1657		INIT_WORK(&event_work->work, do_event_work);
1658		event_work->event = event;
1659		event_work->ver = ver;
1660		event_work->notify_id = notify_id;
1661		event_work->opcode = opcode;
1662		if (!queue_work(osdc->notify_wq, &event_work->work)) {
1663			dout("WARNING: failed to queue notify event work\n");
1664			goto done_err;
1665		}
1666	}
1667
1668	return;
1669
1670done_err:
1671	complete(&event->completion);
1672	ceph_osdc_put_event(event);
1673	return;
1674
1675bad:
1676	pr_err("osdc handle_watch_notify corrupt msg\n");
1677	return;
1678}
1679
1680int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1681{
1682	int err;
1683
1684	dout("wait_event %p\n", event);
1685	err = wait_for_completion_interruptible_timeout(&event->completion,
1686							timeout * HZ);
1687	ceph_osdc_put_event(event);
1688	if (err > 0)
1689		err = 0;
1690	dout("wait_event %p returns %d\n", event, err);
1691	return err;
1692}
1693EXPORT_SYMBOL(ceph_osdc_wait_event);
1694
1695/*
1696 * Register request, send initial attempt.
1697 */
1698int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1699			    struct ceph_osd_request *req,
1700			    bool nofail)
1701{
1702	int rc = 0;
1703
1704	req->r_request->pages = req->r_pages;
1705	req->r_request->nr_pages = req->r_num_pages;
1706#ifdef CONFIG_BLOCK
1707	req->r_request->bio = req->r_bio;
1708#endif
1709	req->r_request->trail = req->r_trail;
1710
1711	register_request(osdc, req);
1712
1713	down_read(&osdc->map_sem);
1714	mutex_lock(&osdc->request_mutex);
1715	/*
1716	 * a racing kick_requests() may have sent the message for us
1717	 * while we dropped request_mutex above, so only send now if
1718	 * the request still han't been touched yet.
1719	 */
1720	if (req->r_sent == 0) {
1721		rc = __map_request(osdc, req, 0);
1722		if (rc < 0) {
1723			if (nofail) {
1724				dout("osdc_start_request failed map, "
1725				     " will retry %lld\n", req->r_tid);
1726				rc = 0;
1727			}
1728			goto out_unlock;
1729		}
1730		if (req->r_osd == NULL) {
1731			dout("send_request %p no up osds in pg\n", req);
1732			ceph_monc_request_next_osdmap(&osdc->client->monc);
1733		} else {
1734			__send_request(osdc, req);
1735		}
1736		rc = 0;
1737	}
1738
1739out_unlock:
1740	mutex_unlock(&osdc->request_mutex);
1741	up_read(&osdc->map_sem);
1742	return rc;
1743}
1744EXPORT_SYMBOL(ceph_osdc_start_request);
1745
1746/*
1747 * wait for a request to complete
1748 */
1749int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1750			   struct ceph_osd_request *req)
1751{
1752	int rc;
1753
1754	rc = wait_for_completion_interruptible(&req->r_completion);
1755	if (rc < 0) {
1756		mutex_lock(&osdc->request_mutex);
1757		__cancel_request(req);
1758		__unregister_request(osdc, req);
1759		mutex_unlock(&osdc->request_mutex);
1760		complete_request(req);
1761		dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1762		return rc;
1763	}
1764
1765	dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1766	return req->r_result;
1767}
1768EXPORT_SYMBOL(ceph_osdc_wait_request);
1769
1770/*
1771 * sync - wait for all in-flight requests to flush.  avoid starvation.
1772 */
1773void ceph_osdc_sync(struct ceph_osd_client *osdc)
1774{
1775	struct ceph_osd_request *req;
1776	u64 last_tid, next_tid = 0;
1777
1778	mutex_lock(&osdc->request_mutex);
1779	last_tid = osdc->last_tid;
1780	while (1) {
1781		req = __lookup_request_ge(osdc, next_tid);
1782		if (!req)
1783			break;
1784		if (req->r_tid > last_tid)
1785			break;
1786
1787		next_tid = req->r_tid + 1;
1788		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1789			continue;
1790
1791		ceph_osdc_get_request(req);
1792		mutex_unlock(&osdc->request_mutex);
1793		dout("sync waiting on tid %llu (last is %llu)\n",
1794		     req->r_tid, last_tid);
1795		wait_for_completion(&req->r_safe_completion);
1796		mutex_lock(&osdc->request_mutex);
1797		ceph_osdc_put_request(req);
1798	}
1799	mutex_unlock(&osdc->request_mutex);
1800	dout("sync done (thru tid %llu)\n", last_tid);
1801}
1802EXPORT_SYMBOL(ceph_osdc_sync);
1803
1804/*
1805 * init, shutdown
1806 */
1807int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1808{
1809	int err;
1810
1811	dout("init\n");
1812	osdc->client = client;
1813	osdc->osdmap = NULL;
1814	init_rwsem(&osdc->map_sem);
1815	init_completion(&osdc->map_waiters);
1816	osdc->last_requested_map = 0;
1817	mutex_init(&osdc->request_mutex);
1818	osdc->last_tid = 0;
1819	osdc->osds = RB_ROOT;
1820	INIT_LIST_HEAD(&osdc->osd_lru);
1821	osdc->requests = RB_ROOT;
1822	INIT_LIST_HEAD(&osdc->req_lru);
1823	INIT_LIST_HEAD(&osdc->req_unsent);
1824	INIT_LIST_HEAD(&osdc->req_notarget);
1825	INIT_LIST_HEAD(&osdc->req_linger);
1826	osdc->num_requests = 0;
1827	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1828	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1829	spin_lock_init(&osdc->event_lock);
1830	osdc->event_tree = RB_ROOT;
1831	osdc->event_count = 0;
1832
1833	schedule_delayed_work(&osdc->osds_timeout_work,
1834	   round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1835
1836	err = -ENOMEM;
1837	osdc->req_mempool = mempool_create_kmalloc_pool(10,
1838					sizeof(struct ceph_osd_request));
1839	if (!osdc->req_mempool)
1840		goto out;
1841
1842	err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1843				"osd_op");
1844	if (err < 0)
1845		goto out_mempool;
1846	err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1847				OSD_OPREPLY_FRONT_LEN, 10, true,
1848				"osd_op_reply");
1849	if (err < 0)
1850		goto out_msgpool;
1851
1852	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1853	if (IS_ERR(osdc->notify_wq)) {
1854		err = PTR_ERR(osdc->notify_wq);
1855		osdc->notify_wq = NULL;
1856		goto out_msgpool;
1857	}
1858	return 0;
1859
1860out_msgpool:
1861	ceph_msgpool_destroy(&osdc->msgpool_op);
1862out_mempool:
1863	mempool_destroy(osdc->req_mempool);
1864out:
1865	return err;
1866}
1867EXPORT_SYMBOL(ceph_osdc_init);
1868
1869void ceph_osdc_stop(struct ceph_osd_client *osdc)
1870{
1871	flush_workqueue(osdc->notify_wq);
1872	destroy_workqueue(osdc->notify_wq);
1873	cancel_delayed_work_sync(&osdc->timeout_work);
1874	cancel_delayed_work_sync(&osdc->osds_timeout_work);
1875	if (osdc->osdmap) {
1876		ceph_osdmap_destroy(osdc->osdmap);
1877		osdc->osdmap = NULL;
1878	}
1879	remove_all_osds(osdc);
1880	mempool_destroy(osdc->req_mempool);
1881	ceph_msgpool_destroy(&osdc->msgpool_op);
1882	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1883}
1884EXPORT_SYMBOL(ceph_osdc_stop);
1885
1886/*
1887 * Read some contiguous pages.  If we cross a stripe boundary, shorten
1888 * *plen.  Return number of bytes read, or error.
1889 */
1890int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1891			struct ceph_vino vino, struct ceph_file_layout *layout,
1892			u64 off, u64 *plen,
1893			u32 truncate_seq, u64 truncate_size,
1894			struct page **pages, int num_pages, int page_align)
1895{
1896	struct ceph_osd_request *req;
1897	int rc = 0;
1898
1899	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1900	     vino.snap, off, *plen);
1901	req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1902				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1903				    NULL, 0, truncate_seq, truncate_size, NULL,
1904				    false, 1, page_align);
1905	if (!req)
1906		return -ENOMEM;
1907
1908	/* it may be a short read due to an object boundary */
1909	req->r_pages = pages;
1910
1911	dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1912	     off, *plen, req->r_num_pages, page_align);
1913
1914	rc = ceph_osdc_start_request(osdc, req, false);
1915	if (!rc)
1916		rc = ceph_osdc_wait_request(osdc, req);
1917
1918	ceph_osdc_put_request(req);
1919	dout("readpages result %d\n", rc);
1920	return rc;
1921}
1922EXPORT_SYMBOL(ceph_osdc_readpages);
1923
1924/*
1925 * do a synchronous write on N pages
1926 */
1927int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1928			 struct ceph_file_layout *layout,
1929			 struct ceph_snap_context *snapc,
1930			 u64 off, u64 len,
1931			 u32 truncate_seq, u64 truncate_size,
1932			 struct timespec *mtime,
1933			 struct page **pages, int num_pages,
1934			 int flags, int do_sync, bool nofail)
1935{
1936	struct ceph_osd_request *req;
1937	int rc = 0;
1938	int page_align = off & ~PAGE_MASK;
1939
1940	BUG_ON(vino.snap != CEPH_NOSNAP);
1941	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1942				    CEPH_OSD_OP_WRITE,
1943				    flags | CEPH_OSD_FLAG_ONDISK |
1944					    CEPH_OSD_FLAG_WRITE,
1945				    snapc, do_sync,
1946				    truncate_seq, truncate_size, mtime,
1947				    nofail, 1, page_align);
1948	if (!req)
1949		return -ENOMEM;
1950
1951	/* it may be a short write due to an object boundary */
1952	req->r_pages = pages;
1953	dout("writepages %llu~%llu (%d pages)\n", off, len,
1954	     req->r_num_pages);
1955
1956	rc = ceph_osdc_start_request(osdc, req, nofail);
1957	if (!rc)
1958		rc = ceph_osdc_wait_request(osdc, req);
1959
1960	ceph_osdc_put_request(req);
1961	if (rc == 0)
1962		rc = len;
1963	dout("writepages result %d\n", rc);
1964	return rc;
1965}
1966EXPORT_SYMBOL(ceph_osdc_writepages);
1967
1968/*
1969 * handle incoming message
1970 */
1971static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1972{
1973	struct ceph_osd *osd = con->private;
1974	struct ceph_osd_client *osdc;
1975	int type = le16_to_cpu(msg->hdr.type);
1976
1977	if (!osd)
1978		goto out;
1979	osdc = osd->o_osdc;
1980
1981	switch (type) {
1982	case CEPH_MSG_OSD_MAP:
1983		ceph_osdc_handle_map(osdc, msg);
1984		break;
1985	case CEPH_MSG_OSD_OPREPLY:
1986		handle_reply(osdc, msg, con);
1987		break;
1988	case CEPH_MSG_WATCH_NOTIFY:
1989		handle_watch_notify(osdc, msg);
1990		break;
1991
1992	default:
1993		pr_err("received unknown message type %d %s\n", type,
1994		       ceph_msg_type_name(type));
1995	}
1996out:
1997	ceph_msg_put(msg);
1998}
1999
2000/*
2001 * lookup and return message for incoming reply.  set up reply message
2002 * pages.
2003 */
2004static struct ceph_msg *get_reply(struct ceph_connection *con,
2005				  struct ceph_msg_header *hdr,
2006				  int *skip)
2007{
2008	struct ceph_osd *osd = con->private;
2009	struct ceph_osd_client *osdc = osd->o_osdc;
2010	struct ceph_msg *m;
2011	struct ceph_osd_request *req;
2012	int front = le32_to_cpu(hdr->front_len);
2013	int data_len = le32_to_cpu(hdr->data_len);
2014	u64 tid;
2015
2016	tid = le64_to_cpu(hdr->tid);
2017	mutex_lock(&osdc->request_mutex);
2018	req = __lookup_request(osdc, tid);
2019	if (!req) {
2020		*skip = 1;
2021		m = NULL;
2022		pr_info("get_reply unknown tid %llu from osd%d\n", tid,
2023			osd->o_osd);
2024		goto out;
2025	}
2026
2027	if (req->r_con_filling_msg) {
2028		dout("get_reply revoking msg %p from old con %p\n",
2029		     req->r_reply, req->r_con_filling_msg);
2030		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
2031		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2032		req->r_con_filling_msg = NULL;
2033	}
2034
2035	if (front > req->r_reply->front.iov_len) {
2036		pr_warning("get_reply front %d > preallocated %d\n",
2037			   front, (int)req->r_reply->front.iov_len);
2038		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2039		if (!m)
2040			goto out;
2041		ceph_msg_put(req->r_reply);
2042		req->r_reply = m;
2043	}
2044	m = ceph_msg_get(req->r_reply);
2045
2046	if (data_len > 0) {
2047		int want = calc_pages_for(req->r_page_alignment, data_len);
2048
2049		if (unlikely(req->r_num_pages < want)) {
2050			pr_warning("tid %lld reply has %d bytes %d pages, we"
2051				   " had only %d pages ready\n", tid, data_len,
2052				   want, req->r_num_pages);
2053			*skip = 1;
2054			ceph_msg_put(m);
2055			m = NULL;
2056			goto out;
2057		}
2058		m->pages = req->r_pages;
2059		m->nr_pages = req->r_num_pages;
2060		m->page_alignment = req->r_page_alignment;
2061#ifdef CONFIG_BLOCK
2062		m->bio = req->r_bio;
2063#endif
2064	}
2065	*skip = 0;
2066	req->r_con_filling_msg = con->ops->get(con);
2067	dout("get_reply tid %lld %p\n", tid, m);
2068
2069out:
2070	mutex_unlock(&osdc->request_mutex);
2071	return m;
2072
2073}
2074
2075static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2076				  struct ceph_msg_header *hdr,
2077				  int *skip)
2078{
2079	struct ceph_osd *osd = con->private;
2080	int type = le16_to_cpu(hdr->type);
2081	int front = le32_to_cpu(hdr->front_len);
2082
2083	switch (type) {
2084	case CEPH_MSG_OSD_MAP:
2085	case CEPH_MSG_WATCH_NOTIFY:
2086		return ceph_msg_new(type, front, GFP_NOFS, false);
2087	case CEPH_MSG_OSD_OPREPLY:
2088		return get_reply(con, hdr, skip);
2089	default:
2090		pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2091			osd->o_osd);
2092		*skip = 1;
2093		return NULL;
2094	}
2095}
2096
2097/*
2098 * Wrappers to refcount containing ceph_osd struct
2099 */
2100static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2101{
2102	struct ceph_osd *osd = con->private;
2103	if (get_osd(osd))
2104		return con;
2105	return NULL;
2106}
2107
2108static void put_osd_con(struct ceph_connection *con)
2109{
2110	struct ceph_osd *osd = con->private;
2111	put_osd(osd);
2112}
2113
2114/*
2115 * authentication
2116 */
2117/*
2118 * Note: returned pointer is the address of a structure that's
2119 * managed separately.  Caller must *not* attempt to free it.
2120 */
2121static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2122					int *proto, int force_new)
2123{
2124	struct ceph_osd *o = con->private;
2125	struct ceph_osd_client *osdc = o->o_osdc;
2126	struct ceph_auth_client *ac = osdc->client->monc.auth;
2127	struct ceph_auth_handshake *auth = &o->o_auth;
2128
2129	if (force_new && auth->authorizer) {
2130		if (ac->ops && ac->ops->destroy_authorizer)
2131			ac->ops->destroy_authorizer(ac, auth->authorizer);
2132		auth->authorizer = NULL;
2133	}
2134	if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2135		int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2136							auth);
2137		if (ret)
2138			return ERR_PTR(ret);
2139	}
2140	*proto = ac->protocol;
2141
2142	return auth;
2143}
2144
2145
2146static int verify_authorizer_reply(struct ceph_connection *con, int len)
2147{
2148	struct ceph_osd *o = con->private;
2149	struct ceph_osd_client *osdc = o->o_osdc;
2150	struct ceph_auth_client *ac = osdc->client->monc.auth;
2151
2152	/*
2153	 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2154	 * XXX which do we do:  succeed or fail?
2155	 */
2156	return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2157}
2158
2159static int invalidate_authorizer(struct ceph_connection *con)
2160{
2161	struct ceph_osd *o = con->private;
2162	struct ceph_osd_client *osdc = o->o_osdc;
2163	struct ceph_auth_client *ac = osdc->client->monc.auth;
2164
2165	if (ac->ops && ac->ops->invalidate_authorizer)
2166		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2167
2168	return ceph_monc_validate_auth(&osdc->client->monc);
2169}
2170
2171static const struct ceph_connection_operations osd_con_ops = {
2172	.get = get_osd_con,
2173	.put = put_osd_con,
2174	.dispatch = dispatch,
2175	.get_authorizer = get_authorizer,
2176	.verify_authorizer_reply = verify_authorizer_reply,
2177	.invalidate_authorizer = invalidate_authorizer,
2178	.alloc_msg = alloc_msg,
2179	.fault = osd_reset,
2180};