Linux Audio

Check our new training course

Loading...
v3.5.6
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/err.h>
   5#include <linux/highmem.h>
   6#include <linux/mm.h>
   7#include <linux/pagemap.h>
   8#include <linux/slab.h>
   9#include <linux/uaccess.h>
  10#ifdef CONFIG_BLOCK
  11#include <linux/bio.h>
  12#endif
  13
  14#include <linux/ceph/libceph.h>
  15#include <linux/ceph/osd_client.h>
  16#include <linux/ceph/messenger.h>
  17#include <linux/ceph/decode.h>
  18#include <linux/ceph/auth.h>
  19#include <linux/ceph/pagelist.h>
  20
  21#define OSD_OP_FRONT_LEN	4096
  22#define OSD_OPREPLY_FRONT_LEN	512
  23
  24static const struct ceph_connection_operations osd_con_ops;
  25
  26static void send_queued(struct ceph_osd_client *osdc);
  27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  28static void __register_request(struct ceph_osd_client *osdc,
  29			       struct ceph_osd_request *req);
  30static void __unregister_linger_request(struct ceph_osd_client *osdc,
  31					struct ceph_osd_request *req);
  32static void __send_request(struct ceph_osd_client *osdc,
  33			   struct ceph_osd_request *req);
  34
  35static int op_needs_trail(int op)
  36{
  37	switch (op) {
  38	case CEPH_OSD_OP_GETXATTR:
  39	case CEPH_OSD_OP_SETXATTR:
  40	case CEPH_OSD_OP_CMPXATTR:
  41	case CEPH_OSD_OP_CALL:
  42	case CEPH_OSD_OP_NOTIFY:
  43		return 1;
  44	default:
  45		return 0;
  46	}
  47}
  48
  49static int op_has_extent(int op)
  50{
  51	return (op == CEPH_OSD_OP_READ ||
  52		op == CEPH_OSD_OP_WRITE);
  53}
  54
  55void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
  56			struct ceph_file_layout *layout,
  57			u64 snapid,
  58			u64 off, u64 *plen, u64 *bno,
  59			struct ceph_osd_request *req,
  60			struct ceph_osd_req_op *op)
  61{
  62	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
  63	u64 orig_len = *plen;
  64	u64 objoff, objlen;    /* extent in object */
  65
  66	reqhead->snapid = cpu_to_le64(snapid);
  67
  68	/* object extent? */
  69	ceph_calc_file_object_mapping(layout, off, plen, bno,
  70				      &objoff, &objlen);
  71	if (*plen < orig_len)
  72		dout(" skipping last %llu, final file extent %llu~%llu\n",
  73		     orig_len - *plen, off, *plen);
  74
  75	if (op_has_extent(op->op)) {
  76		op->extent.offset = objoff;
  77		op->extent.length = objlen;
  78	}
  79	req->r_num_pages = calc_pages_for(off, *plen);
  80	req->r_page_alignment = off & ~PAGE_MASK;
  81	if (op->op == CEPH_OSD_OP_WRITE)
  82		op->payload_len = *plen;
  83
  84	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
  85	     *bno, objoff, objlen, req->r_num_pages);
  86
  87}
  88EXPORT_SYMBOL(ceph_calc_raw_layout);
  89
  90/*
  91 * Implement client access to distributed object storage cluster.
  92 *
  93 * All data objects are stored within a cluster/cloud of OSDs, or
  94 * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
  95 * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
  96 * remote daemons serving up and coordinating consistent and safe
  97 * access to storage.
  98 *
  99 * Cluster membership and the mapping of data objects onto storage devices
 100 * are described by the osd map.
 101 *
 102 * We keep track of pending OSD requests (read, write), resubmit
 103 * requests to different OSDs when the cluster topology/data layout
 104 * change, or retry the affected requests when the communications
 105 * channel with an OSD is reset.
 106 */
 107
 108/*
 109 * calculate the mapping of a file extent onto an object, and fill out the
 110 * request accordingly.  shorten extent as necessary if it crosses an
 111 * object boundary.
 112 *
 113 * fill osd op in request message.
 114 */
 115static void calc_layout(struct ceph_osd_client *osdc,
 116			struct ceph_vino vino,
 117			struct ceph_file_layout *layout,
 118			u64 off, u64 *plen,
 119			struct ceph_osd_request *req,
 120			struct ceph_osd_req_op *op)
 121{
 122	u64 bno;
 123
 124	ceph_calc_raw_layout(osdc, layout, vino.snap, off,
 125			     plen, &bno, req, op);
 126
 127	snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
 128	req->r_oid_len = strlen(req->r_oid);
 129}
 130
 131/*
 132 * requests
 133 */
 134void ceph_osdc_release_request(struct kref *kref)
 135{
 136	struct ceph_osd_request *req = container_of(kref,
 137						    struct ceph_osd_request,
 138						    r_kref);
 139
 140	if (req->r_request)
 141		ceph_msg_put(req->r_request);
 
 
 142	if (req->r_con_filling_msg) {
 143		dout("release_request revoking pages %p from con %p\n",
 144		     req->r_pages, req->r_con_filling_msg);
 145		ceph_con_revoke_message(req->r_con_filling_msg,
 146				      req->r_reply);
 147		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 148	}
 149	if (req->r_reply)
 150		ceph_msg_put(req->r_reply);
 151	if (req->r_own_pages)
 152		ceph_release_page_vector(req->r_pages,
 153					 req->r_num_pages);
 154#ifdef CONFIG_BLOCK
 155	if (req->r_bio)
 156		bio_put(req->r_bio);
 157#endif
 158	ceph_put_snap_context(req->r_snapc);
 159	if (req->r_trail) {
 160		ceph_pagelist_release(req->r_trail);
 161		kfree(req->r_trail);
 162	}
 163	if (req->r_mempool)
 164		mempool_free(req, req->r_osdc->req_mempool);
 165	else
 166		kfree(req);
 167}
 168EXPORT_SYMBOL(ceph_osdc_release_request);
 169
 170static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
 171{
 172	int i = 0;
 173
 174	if (needs_trail)
 175		*needs_trail = 0;
 176	while (ops[i].op) {
 177		if (needs_trail && op_needs_trail(ops[i].op))
 178			*needs_trail = 1;
 179		i++;
 180	}
 181
 182	return i;
 183}
 184
 185struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 186					       int flags,
 187					       struct ceph_snap_context *snapc,
 188					       struct ceph_osd_req_op *ops,
 189					       bool use_mempool,
 190					       gfp_t gfp_flags,
 191					       struct page **pages,
 192					       struct bio *bio)
 193{
 194	struct ceph_osd_request *req;
 195	struct ceph_msg *msg;
 196	int needs_trail;
 197	int num_op = get_num_ops(ops, &needs_trail);
 198	size_t msg_size = sizeof(struct ceph_osd_request_head);
 199
 200	msg_size += num_op*sizeof(struct ceph_osd_op);
 201
 202	if (use_mempool) {
 203		req = mempool_alloc(osdc->req_mempool, gfp_flags);
 204		memset(req, 0, sizeof(*req));
 205	} else {
 206		req = kzalloc(sizeof(*req), gfp_flags);
 207	}
 208	if (req == NULL)
 209		return NULL;
 210
 211	req->r_osdc = osdc;
 212	req->r_mempool = use_mempool;
 213
 214	kref_init(&req->r_kref);
 215	init_completion(&req->r_completion);
 216	init_completion(&req->r_safe_completion);
 217	INIT_LIST_HEAD(&req->r_unsafe_item);
 218	INIT_LIST_HEAD(&req->r_linger_item);
 219	INIT_LIST_HEAD(&req->r_linger_osd);
 220	INIT_LIST_HEAD(&req->r_req_lru_item);
 221	req->r_flags = flags;
 222
 223	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 224
 225	/* create reply message */
 226	if (use_mempool)
 227		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 228	else
 229		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
 230				   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
 231	if (!msg) {
 232		ceph_osdc_put_request(req);
 233		return NULL;
 234	}
 235	req->r_reply = msg;
 236
 237	/* allocate space for the trailing data */
 238	if (needs_trail) {
 239		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
 240		if (!req->r_trail) {
 241			ceph_osdc_put_request(req);
 242			return NULL;
 243		}
 244		ceph_pagelist_init(req->r_trail);
 245	}
 246	/* create request message; allow space for oid */
 247	msg_size += MAX_OBJ_NAME_SIZE;
 248	if (snapc)
 249		msg_size += sizeof(u64) * snapc->num_snaps;
 250	if (use_mempool)
 251		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 252	else
 253		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
 254	if (!msg) {
 255		ceph_osdc_put_request(req);
 256		return NULL;
 257	}
 258
 259	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 260	memset(msg->front.iov_base, 0, msg->front.iov_len);
 261
 262	req->r_request = msg;
 263	req->r_pages = pages;
 264#ifdef CONFIG_BLOCK
 265	if (bio) {
 266		req->r_bio = bio;
 267		bio_get(req->r_bio);
 268	}
 269#endif
 270
 271	return req;
 272}
 273EXPORT_SYMBOL(ceph_osdc_alloc_request);
 274
 275static void osd_req_encode_op(struct ceph_osd_request *req,
 276			      struct ceph_osd_op *dst,
 277			      struct ceph_osd_req_op *src)
 278{
 279	dst->op = cpu_to_le16(src->op);
 280
 281	switch (src->op) {
 282	case CEPH_OSD_OP_READ:
 283	case CEPH_OSD_OP_WRITE:
 284		dst->extent.offset =
 285			cpu_to_le64(src->extent.offset);
 286		dst->extent.length =
 287			cpu_to_le64(src->extent.length);
 288		dst->extent.truncate_size =
 289			cpu_to_le64(src->extent.truncate_size);
 290		dst->extent.truncate_seq =
 291			cpu_to_le32(src->extent.truncate_seq);
 292		break;
 293
 294	case CEPH_OSD_OP_GETXATTR:
 295	case CEPH_OSD_OP_SETXATTR:
 296	case CEPH_OSD_OP_CMPXATTR:
 297		BUG_ON(!req->r_trail);
 298
 299		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
 300		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
 301		dst->xattr.cmp_op = src->xattr.cmp_op;
 302		dst->xattr.cmp_mode = src->xattr.cmp_mode;
 303		ceph_pagelist_append(req->r_trail, src->xattr.name,
 304				     src->xattr.name_len);
 305		ceph_pagelist_append(req->r_trail, src->xattr.val,
 306				     src->xattr.value_len);
 307		break;
 308	case CEPH_OSD_OP_CALL:
 309		BUG_ON(!req->r_trail);
 310
 311		dst->cls.class_len = src->cls.class_len;
 312		dst->cls.method_len = src->cls.method_len;
 313		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 314
 315		ceph_pagelist_append(req->r_trail, src->cls.class_name,
 316				     src->cls.class_len);
 317		ceph_pagelist_append(req->r_trail, src->cls.method_name,
 318				     src->cls.method_len);
 319		ceph_pagelist_append(req->r_trail, src->cls.indata,
 320				     src->cls.indata_len);
 321		break;
 322	case CEPH_OSD_OP_ROLLBACK:
 323		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
 324		break;
 325	case CEPH_OSD_OP_STARTSYNC:
 326		break;
 327	case CEPH_OSD_OP_NOTIFY:
 328		{
 329			__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
 330			__le32 timeout = cpu_to_le32(src->watch.timeout);
 331
 332			BUG_ON(!req->r_trail);
 333
 334			ceph_pagelist_append(req->r_trail,
 335						&prot_ver, sizeof(prot_ver));
 336			ceph_pagelist_append(req->r_trail,
 337						&timeout, sizeof(timeout));
 338		}
 339	case CEPH_OSD_OP_NOTIFY_ACK:
 340	case CEPH_OSD_OP_WATCH:
 341		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 342		dst->watch.ver = cpu_to_le64(src->watch.ver);
 343		dst->watch.flag = src->watch.flag;
 344		break;
 345	default:
 346		pr_err("unrecognized osd opcode %d\n", dst->op);
 347		WARN_ON(1);
 348		break;
 349	}
 350	dst->payload_len = cpu_to_le32(src->payload_len);
 351}
 352
 353/*
 354 * build new request AND message
 355 *
 356 */
 357void ceph_osdc_build_request(struct ceph_osd_request *req,
 358			     u64 off, u64 *plen,
 359			     struct ceph_osd_req_op *src_ops,
 360			     struct ceph_snap_context *snapc,
 361			     struct timespec *mtime,
 362			     const char *oid,
 363			     int oid_len)
 364{
 365	struct ceph_msg *msg = req->r_request;
 366	struct ceph_osd_request_head *head;
 367	struct ceph_osd_req_op *src_op;
 368	struct ceph_osd_op *op;
 369	void *p;
 370	int num_op = get_num_ops(src_ops, NULL);
 371	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
 372	int flags = req->r_flags;
 373	u64 data_len = 0;
 374	int i;
 375
 376	head = msg->front.iov_base;
 377	op = (void *)(head + 1);
 378	p = (void *)(op + num_op);
 379
 380	req->r_snapc = ceph_get_snap_context(snapc);
 381
 382	head->client_inc = cpu_to_le32(1); /* always, for now. */
 383	head->flags = cpu_to_le32(flags);
 384	if (flags & CEPH_OSD_FLAG_WRITE)
 385		ceph_encode_timespec(&head->mtime, mtime);
 386	head->num_ops = cpu_to_le16(num_op);
 387
 388
 389	/* fill in oid */
 390	head->object_len = cpu_to_le32(oid_len);
 391	memcpy(p, oid, oid_len);
 392	p += oid_len;
 393
 394	src_op = src_ops;
 395	while (src_op->op) {
 396		osd_req_encode_op(req, op, src_op);
 397		src_op++;
 398		op++;
 399	}
 400
 401	if (req->r_trail)
 402		data_len += req->r_trail->length;
 403
 404	if (snapc) {
 405		head->snap_seq = cpu_to_le64(snapc->seq);
 406		head->num_snaps = cpu_to_le32(snapc->num_snaps);
 407		for (i = 0; i < snapc->num_snaps; i++) {
 408			put_unaligned_le64(snapc->snaps[i], p);
 409			p += sizeof(u64);
 410		}
 411	}
 412
 413	if (flags & CEPH_OSD_FLAG_WRITE) {
 414		req->r_request->hdr.data_off = cpu_to_le16(off);
 415		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
 416	} else if (data_len) {
 417		req->r_request->hdr.data_off = 0;
 418		req->r_request->hdr.data_len = cpu_to_le32(data_len);
 419	}
 420
 421	req->r_request->page_alignment = req->r_page_alignment;
 422
 423	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 424	msg_size = p - msg->front.iov_base;
 425	msg->front.iov_len = msg_size;
 426	msg->hdr.front_len = cpu_to_le32(msg_size);
 427	return;
 428}
 429EXPORT_SYMBOL(ceph_osdc_build_request);
 430
 431/*
 432 * build new request AND message, calculate layout, and adjust file
 433 * extent as needed.
 434 *
 435 * if the file was recently truncated, we include information about its
 436 * old and new size so that the object can be updated appropriately.  (we
 437 * avoid synchronously deleting truncated objects because it's slow.)
 438 *
 439 * if @do_sync, include a 'startsync' command so that the osd will flush
 440 * data quickly.
 441 */
 442struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 443					       struct ceph_file_layout *layout,
 444					       struct ceph_vino vino,
 445					       u64 off, u64 *plen,
 446					       int opcode, int flags,
 447					       struct ceph_snap_context *snapc,
 448					       int do_sync,
 449					       u32 truncate_seq,
 450					       u64 truncate_size,
 451					       struct timespec *mtime,
 452					       bool use_mempool, int num_reply,
 453					       int page_align)
 454{
 455	struct ceph_osd_req_op ops[3];
 456	struct ceph_osd_request *req;
 457
 458	ops[0].op = opcode;
 459	ops[0].extent.truncate_seq = truncate_seq;
 460	ops[0].extent.truncate_size = truncate_size;
 461	ops[0].payload_len = 0;
 462
 463	if (do_sync) {
 464		ops[1].op = CEPH_OSD_OP_STARTSYNC;
 465		ops[1].payload_len = 0;
 466		ops[2].op = 0;
 467	} else
 468		ops[1].op = 0;
 469
 470	req = ceph_osdc_alloc_request(osdc, flags,
 471					 snapc, ops,
 472					 use_mempool,
 473					 GFP_NOFS, NULL, NULL);
 474	if (!req)
 475		return NULL;
 476
 477	/* calculate max write size */
 478	calc_layout(osdc, vino, layout, off, plen, req, ops);
 479	req->r_file_layout = *layout;  /* keep a copy */
 480
 481	/* in case it differs from natural (file) alignment that
 482	   calc_layout filled in for us */
 483	req->r_num_pages = calc_pages_for(page_align, *plen);
 484	req->r_page_alignment = page_align;
 485
 486	ceph_osdc_build_request(req, off, plen, ops,
 487				snapc,
 488				mtime,
 489				req->r_oid, req->r_oid_len);
 490
 491	return req;
 492}
 493EXPORT_SYMBOL(ceph_osdc_new_request);
 494
 495/*
 496 * We keep osd requests in an rbtree, sorted by ->r_tid.
 497 */
 498static void __insert_request(struct ceph_osd_client *osdc,
 499			     struct ceph_osd_request *new)
 500{
 501	struct rb_node **p = &osdc->requests.rb_node;
 502	struct rb_node *parent = NULL;
 503	struct ceph_osd_request *req = NULL;
 504
 505	while (*p) {
 506		parent = *p;
 507		req = rb_entry(parent, struct ceph_osd_request, r_node);
 508		if (new->r_tid < req->r_tid)
 509			p = &(*p)->rb_left;
 510		else if (new->r_tid > req->r_tid)
 511			p = &(*p)->rb_right;
 512		else
 513			BUG();
 514	}
 515
 516	rb_link_node(&new->r_node, parent, p);
 517	rb_insert_color(&new->r_node, &osdc->requests);
 518}
 519
 520static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
 521						 u64 tid)
 522{
 523	struct ceph_osd_request *req;
 524	struct rb_node *n = osdc->requests.rb_node;
 525
 526	while (n) {
 527		req = rb_entry(n, struct ceph_osd_request, r_node);
 528		if (tid < req->r_tid)
 529			n = n->rb_left;
 530		else if (tid > req->r_tid)
 531			n = n->rb_right;
 532		else
 533			return req;
 534	}
 535	return NULL;
 536}
 537
 538static struct ceph_osd_request *
 539__lookup_request_ge(struct ceph_osd_client *osdc,
 540		    u64 tid)
 541{
 542	struct ceph_osd_request *req;
 543	struct rb_node *n = osdc->requests.rb_node;
 544
 545	while (n) {
 546		req = rb_entry(n, struct ceph_osd_request, r_node);
 547		if (tid < req->r_tid) {
 548			if (!n->rb_left)
 549				return req;
 550			n = n->rb_left;
 551		} else if (tid > req->r_tid) {
 552			n = n->rb_right;
 553		} else {
 554			return req;
 555		}
 556	}
 557	return NULL;
 558}
 559
 560/*
 561 * Resubmit requests pending on the given osd.
 562 */
 563static void __kick_osd_requests(struct ceph_osd_client *osdc,
 564				struct ceph_osd *osd)
 565{
 566	struct ceph_osd_request *req, *nreq;
 567	int err;
 568
 569	dout("__kick_osd_requests osd%d\n", osd->o_osd);
 570	err = __reset_osd(osdc, osd);
 571	if (err == -EAGAIN)
 572		return;
 573
 574	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
 575		list_move(&req->r_req_lru_item, &osdc->req_unsent);
 576		dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
 577		     osd->o_osd);
 578		if (!req->r_linger)
 579			req->r_flags |= CEPH_OSD_FLAG_RETRY;
 580	}
 581
 582	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
 583				 r_linger_osd) {
 584		/*
 585		 * reregister request prior to unregistering linger so
 586		 * that r_osd is preserved.
 587		 */
 588		BUG_ON(!list_empty(&req->r_req_lru_item));
 589		__register_request(osdc, req);
 590		list_add(&req->r_req_lru_item, &osdc->req_unsent);
 591		list_add(&req->r_osd_item, &req->r_osd->o_requests);
 592		__unregister_linger_request(osdc, req);
 593		dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
 594		     osd->o_osd);
 595	}
 596}
 597
 598static void kick_osd_requests(struct ceph_osd_client *osdc,
 599			      struct ceph_osd *kickosd)
 600{
 601	mutex_lock(&osdc->request_mutex);
 602	__kick_osd_requests(osdc, kickosd);
 603	mutex_unlock(&osdc->request_mutex);
 604}
 605
 606/*
 607 * If the osd connection drops, we need to resubmit all requests.
 608 */
 609static void osd_reset(struct ceph_connection *con)
 610{
 611	struct ceph_osd *osd = con->private;
 612	struct ceph_osd_client *osdc;
 613
 614	if (!osd)
 615		return;
 616	dout("osd_reset osd%d\n", osd->o_osd);
 617	osdc = osd->o_osdc;
 618	down_read(&osdc->map_sem);
 619	kick_osd_requests(osdc, osd);
 620	send_queued(osdc);
 621	up_read(&osdc->map_sem);
 622}
 623
 624/*
 625 * Track open sessions with osds.
 626 */
 627static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 628{
 629	struct ceph_osd *osd;
 630
 631	osd = kzalloc(sizeof(*osd), GFP_NOFS);
 632	if (!osd)
 633		return NULL;
 634
 635	atomic_set(&osd->o_ref, 1);
 636	osd->o_osdc = osdc;
 637	INIT_LIST_HEAD(&osd->o_requests);
 638	INIT_LIST_HEAD(&osd->o_linger_requests);
 639	INIT_LIST_HEAD(&osd->o_osd_lru);
 640	osd->o_incarnation = 1;
 641
 642	ceph_con_init(osdc->client->msgr, &osd->o_con);
 643	osd->o_con.private = osd;
 644	osd->o_con.ops = &osd_con_ops;
 645	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 646
 647	INIT_LIST_HEAD(&osd->o_keepalive_item);
 648	return osd;
 649}
 650
 651static struct ceph_osd *get_osd(struct ceph_osd *osd)
 652{
 653	if (atomic_inc_not_zero(&osd->o_ref)) {
 654		dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
 655		     atomic_read(&osd->o_ref));
 656		return osd;
 657	} else {
 658		dout("get_osd %p FAIL\n", osd);
 659		return NULL;
 660	}
 661}
 662
 663static void put_osd(struct ceph_osd *osd)
 664{
 665	dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
 666	     atomic_read(&osd->o_ref) - 1);
 667	if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
 668		struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
 669
 670		if (ac->ops && ac->ops->destroy_authorizer)
 671			ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
 672		kfree(osd);
 673	}
 674}
 675
 676/*
 677 * remove an osd from our map
 678 */
 679static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 680{
 681	dout("__remove_osd %p\n", osd);
 682	BUG_ON(!list_empty(&osd->o_requests));
 683	rb_erase(&osd->o_node, &osdc->osds);
 684	list_del_init(&osd->o_osd_lru);
 685	ceph_con_close(&osd->o_con);
 686	put_osd(osd);
 687}
 688
 689static void remove_all_osds(struct ceph_osd_client *osdc)
 690{
 691	dout("__remove_old_osds %p\n", osdc);
 692	mutex_lock(&osdc->request_mutex);
 693	while (!RB_EMPTY_ROOT(&osdc->osds)) {
 694		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
 695						struct ceph_osd, o_node);
 696		__remove_osd(osdc, osd);
 697	}
 698	mutex_unlock(&osdc->request_mutex);
 699}
 700
 701static void __move_osd_to_lru(struct ceph_osd_client *osdc,
 702			      struct ceph_osd *osd)
 703{
 704	dout("__move_osd_to_lru %p\n", osd);
 705	BUG_ON(!list_empty(&osd->o_osd_lru));
 706	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
 707	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
 708}
 709
 710static void __remove_osd_from_lru(struct ceph_osd *osd)
 711{
 712	dout("__remove_osd_from_lru %p\n", osd);
 713	if (!list_empty(&osd->o_osd_lru))
 714		list_del_init(&osd->o_osd_lru);
 715}
 716
 717static void remove_old_osds(struct ceph_osd_client *osdc)
 718{
 719	struct ceph_osd *osd, *nosd;
 720
 721	dout("__remove_old_osds %p\n", osdc);
 722	mutex_lock(&osdc->request_mutex);
 723	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
 724		if (time_before(jiffies, osd->lru_ttl))
 725			break;
 726		__remove_osd(osdc, osd);
 727	}
 728	mutex_unlock(&osdc->request_mutex);
 729}
 730
 731/*
 732 * reset osd connect
 733 */
 734static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 735{
 736	struct ceph_osd_request *req;
 737	int ret = 0;
 738
 739	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 740	if (list_empty(&osd->o_requests) &&
 741	    list_empty(&osd->o_linger_requests)) {
 742		__remove_osd(osdc, osd);
 743	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
 744			  &osd->o_con.peer_addr,
 745			  sizeof(osd->o_con.peer_addr)) == 0 &&
 746		   !ceph_con_opened(&osd->o_con)) {
 747		dout(" osd addr hasn't changed and connection never opened,"
 748		     " letting msgr retry");
 749		/* touch each r_stamp for handle_timeout()'s benfit */
 750		list_for_each_entry(req, &osd->o_requests, r_osd_item)
 751			req->r_stamp = jiffies;
 752		ret = -EAGAIN;
 753	} else {
 754		ceph_con_close(&osd->o_con);
 755		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
 756		osd->o_incarnation++;
 757	}
 758	return ret;
 759}
 760
 761static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
 762{
 763	struct rb_node **p = &osdc->osds.rb_node;
 764	struct rb_node *parent = NULL;
 765	struct ceph_osd *osd = NULL;
 766
 767	dout("__insert_osd %p osd%d\n", new, new->o_osd);
 768	while (*p) {
 769		parent = *p;
 770		osd = rb_entry(parent, struct ceph_osd, o_node);
 771		if (new->o_osd < osd->o_osd)
 772			p = &(*p)->rb_left;
 773		else if (new->o_osd > osd->o_osd)
 774			p = &(*p)->rb_right;
 775		else
 776			BUG();
 777	}
 778
 779	rb_link_node(&new->o_node, parent, p);
 780	rb_insert_color(&new->o_node, &osdc->osds);
 781}
 782
 783static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
 784{
 785	struct ceph_osd *osd;
 786	struct rb_node *n = osdc->osds.rb_node;
 787
 788	while (n) {
 789		osd = rb_entry(n, struct ceph_osd, o_node);
 790		if (o < osd->o_osd)
 791			n = n->rb_left;
 792		else if (o > osd->o_osd)
 793			n = n->rb_right;
 794		else
 795			return osd;
 796	}
 797	return NULL;
 798}
 799
 800static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
 801{
 802	schedule_delayed_work(&osdc->timeout_work,
 803			osdc->client->options->osd_keepalive_timeout * HZ);
 804}
 805
 806static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
 807{
 808	cancel_delayed_work(&osdc->timeout_work);
 809}
 810
 811/*
 812 * Register request, assign tid.  If this is the first request, set up
 813 * the timeout event.
 814 */
 815static void __register_request(struct ceph_osd_client *osdc,
 816			       struct ceph_osd_request *req)
 817{
 818	req->r_tid = ++osdc->last_tid;
 819	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
 820	dout("__register_request %p tid %lld\n", req, req->r_tid);
 821	__insert_request(osdc, req);
 822	ceph_osdc_get_request(req);
 823	osdc->num_requests++;
 824	if (osdc->num_requests == 1) {
 825		dout(" first request, scheduling timeout\n");
 826		__schedule_osd_timeout(osdc);
 827	}
 828}
 829
 830static void register_request(struct ceph_osd_client *osdc,
 831			     struct ceph_osd_request *req)
 832{
 833	mutex_lock(&osdc->request_mutex);
 834	__register_request(osdc, req);
 835	mutex_unlock(&osdc->request_mutex);
 836}
 837
 838/*
 839 * called under osdc->request_mutex
 840 */
 841static void __unregister_request(struct ceph_osd_client *osdc,
 842				 struct ceph_osd_request *req)
 843{
 844	if (RB_EMPTY_NODE(&req->r_node)) {
 845		dout("__unregister_request %p tid %lld not registered\n",
 846			req, req->r_tid);
 847		return;
 848	}
 849
 850	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 851	rb_erase(&req->r_node, &osdc->requests);
 852	osdc->num_requests--;
 853
 854	if (req->r_osd) {
 855		/* make sure the original request isn't in flight. */
 856		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 857
 858		list_del_init(&req->r_osd_item);
 859		if (list_empty(&req->r_osd->o_requests) &&
 860		    list_empty(&req->r_osd->o_linger_requests)) {
 861			dout("moving osd to %p lru\n", req->r_osd);
 862			__move_osd_to_lru(osdc, req->r_osd);
 863		}
 864		if (list_empty(&req->r_linger_item))
 865			req->r_osd = NULL;
 866	}
 867
 868	ceph_osdc_put_request(req);
 869
 870	list_del_init(&req->r_req_lru_item);
 871	if (osdc->num_requests == 0) {
 872		dout(" no requests, canceling timeout\n");
 873		__cancel_osd_timeout(osdc);
 874	}
 875}
 876
 877/*
 878 * Cancel a previously queued request message
 879 */
 880static void __cancel_request(struct ceph_osd_request *req)
 881{
 882	if (req->r_sent && req->r_osd) {
 883		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 884		req->r_sent = 0;
 885	}
 886}
 887
 888static void __register_linger_request(struct ceph_osd_client *osdc,
 889				    struct ceph_osd_request *req)
 890{
 891	dout("__register_linger_request %p\n", req);
 892	list_add_tail(&req->r_linger_item, &osdc->req_linger);
 893	list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
 894}
 895
 896static void __unregister_linger_request(struct ceph_osd_client *osdc,
 897					struct ceph_osd_request *req)
 898{
 899	dout("__unregister_linger_request %p\n", req);
 900	if (req->r_osd) {
 901		list_del_init(&req->r_linger_item);
 902		list_del_init(&req->r_linger_osd);
 903
 904		if (list_empty(&req->r_osd->o_requests) &&
 905		    list_empty(&req->r_osd->o_linger_requests)) {
 906			dout("moving osd to %p lru\n", req->r_osd);
 907			__move_osd_to_lru(osdc, req->r_osd);
 908		}
 909		if (list_empty(&req->r_osd_item))
 910			req->r_osd = NULL;
 911	}
 912}
 913
 914void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
 915					 struct ceph_osd_request *req)
 916{
 917	mutex_lock(&osdc->request_mutex);
 918	if (req->r_linger) {
 919		__unregister_linger_request(osdc, req);
 920		ceph_osdc_put_request(req);
 921	}
 922	mutex_unlock(&osdc->request_mutex);
 923}
 924EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
 925
 926void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 927				  struct ceph_osd_request *req)
 928{
 929	if (!req->r_linger) {
 930		dout("set_request_linger %p\n", req);
 931		req->r_linger = 1;
 932		/*
 933		 * caller is now responsible for calling
 934		 * unregister_linger_request
 935		 */
 936		ceph_osdc_get_request(req);
 937	}
 938}
 939EXPORT_SYMBOL(ceph_osdc_set_request_linger);
 940
 941/*
 942 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
 943 * (as needed), and set the request r_osd appropriately.  If there is
 944 * no up osd, set r_osd to NULL.  Move the request to the appropriate list
 945 * (unsent, homeless) or leave on in-flight lru.
 946 *
 947 * Return 0 if unchanged, 1 if changed, or negative on error.
 948 *
 949 * Caller should hold map_sem for read and request_mutex.
 950 */
 951static int __map_request(struct ceph_osd_client *osdc,
 952			 struct ceph_osd_request *req, int force_resend)
 953{
 954	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 955	struct ceph_pg pgid;
 956	int acting[CEPH_PG_MAX_SIZE];
 957	int o = -1, num = 0;
 958	int err;
 959
 960	dout("map_request %p tid %lld\n", req, req->r_tid);
 961	err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
 962				      &req->r_file_layout, osdc->osdmap);
 963	if (err) {
 964		list_move(&req->r_req_lru_item, &osdc->req_notarget);
 965		return err;
 966	}
 967	pgid = reqhead->layout.ol_pgid;
 968	req->r_pgid = pgid;
 969
 970	err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
 971	if (err > 0) {
 972		o = acting[0];
 973		num = err;
 974	}
 975
 976	if ((!force_resend &&
 977	     req->r_osd && req->r_osd->o_osd == o &&
 978	     req->r_sent >= req->r_osd->o_incarnation &&
 979	     req->r_num_pg_osds == num &&
 980	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
 981	    (req->r_osd == NULL && o == -1))
 982		return 0;  /* no change */
 983
 984	dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
 985	     req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
 986	     req->r_osd ? req->r_osd->o_osd : -1);
 987
 988	/* record full pg acting set */
 989	memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
 990	req->r_num_pg_osds = num;
 991
 992	if (req->r_osd) {
 993		__cancel_request(req);
 994		list_del_init(&req->r_osd_item);
 995		req->r_osd = NULL;
 996	}
 997
 998	req->r_osd = __lookup_osd(osdc, o);
 999	if (!req->r_osd && o >= 0) {
1000		err = -ENOMEM;
1001		req->r_osd = create_osd(osdc);
1002		if (!req->r_osd) {
1003			list_move(&req->r_req_lru_item, &osdc->req_notarget);
1004			goto out;
1005		}
1006
1007		dout("map_request osd %p is osd%d\n", req->r_osd, o);
1008		req->r_osd->o_osd = o;
1009		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
1010		__insert_osd(osdc, req->r_osd);
1011
1012		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
1013	}
1014
1015	if (req->r_osd) {
1016		__remove_osd_from_lru(req->r_osd);
1017		list_add(&req->r_osd_item, &req->r_osd->o_requests);
1018		list_move(&req->r_req_lru_item, &osdc->req_unsent);
1019	} else {
1020		list_move(&req->r_req_lru_item, &osdc->req_notarget);
1021	}
1022	err = 1;   /* osd or pg changed */
1023
1024out:
1025	return err;
1026}
1027
1028/*
1029 * caller should hold map_sem (for read) and request_mutex
1030 */
1031static void __send_request(struct ceph_osd_client *osdc,
1032			   struct ceph_osd_request *req)
1033{
1034	struct ceph_osd_request_head *reqhead;
1035
1036	dout("send_request %p tid %llu to osd%d flags %d\n",
1037	     req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1038
1039	reqhead = req->r_request->front.iov_base;
1040	reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1041	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
1042	reqhead->reassert_version = req->r_reassert_version;
1043
1044	req->r_stamp = jiffies;
1045	list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1046
1047	ceph_msg_get(req->r_request); /* send consumes a ref */
1048	ceph_con_send(&req->r_osd->o_con, req->r_request);
1049	req->r_sent = req->r_osd->o_incarnation;
 
1050}
1051
1052/*
1053 * Send any requests in the queue (req_unsent).
1054 */
1055static void send_queued(struct ceph_osd_client *osdc)
1056{
1057	struct ceph_osd_request *req, *tmp;
1058
1059	dout("send_queued\n");
1060	mutex_lock(&osdc->request_mutex);
1061	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1062		__send_request(osdc, req);
1063	}
1064	mutex_unlock(&osdc->request_mutex);
1065}
1066
1067/*
1068 * Timeout callback, called every N seconds when 1 or more osd
1069 * requests has been active for more than N seconds.  When this
1070 * happens, we ping all OSDs with requests who have timed out to
1071 * ensure any communications channel reset is detected.  Reset the
1072 * request timeouts another N seconds in the future as we go.
1073 * Reschedule the timeout event another N seconds in future (unless
1074 * there are no open requests).
1075 */
1076static void handle_timeout(struct work_struct *work)
1077{
1078	struct ceph_osd_client *osdc =
1079		container_of(work, struct ceph_osd_client, timeout_work.work);
1080	struct ceph_osd_request *req, *last_req = NULL;
1081	struct ceph_osd *osd;
1082	unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1083	unsigned long keepalive =
1084		osdc->client->options->osd_keepalive_timeout * HZ;
1085	unsigned long last_stamp = 0;
1086	struct list_head slow_osds;
1087	dout("timeout\n");
1088	down_read(&osdc->map_sem);
1089
1090	ceph_monc_request_next_osdmap(&osdc->client->monc);
1091
1092	mutex_lock(&osdc->request_mutex);
1093
1094	/*
1095	 * reset osds that appear to be _really_ unresponsive.  this
1096	 * is a failsafe measure.. we really shouldn't be getting to
1097	 * this point if the system is working properly.  the monitors
1098	 * should mark the osd as failed and we should find out about
1099	 * it from an updated osd map.
1100	 */
1101	while (timeout && !list_empty(&osdc->req_lru)) {
1102		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1103				 r_req_lru_item);
1104
1105		/* hasn't been long enough since we sent it? */
1106		if (time_before(jiffies, req->r_stamp + timeout))
1107			break;
1108
1109		/* hasn't been long enough since it was acked? */
1110		if (req->r_request->ack_stamp == 0 ||
1111		    time_before(jiffies, req->r_request->ack_stamp + timeout))
1112			break;
1113
1114		BUG_ON(req == last_req && req->r_stamp == last_stamp);
1115		last_req = req;
1116		last_stamp = req->r_stamp;
1117
1118		osd = req->r_osd;
1119		BUG_ON(!osd);
1120		pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1121			   req->r_tid, osd->o_osd);
1122		__kick_osd_requests(osdc, osd);
1123	}
1124
1125	/*
1126	 * ping osds that are a bit slow.  this ensures that if there
1127	 * is a break in the TCP connection we will notice, and reopen
1128	 * a connection with that osd (from the fault callback).
1129	 */
1130	INIT_LIST_HEAD(&slow_osds);
1131	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1132		if (time_before(jiffies, req->r_stamp + keepalive))
1133			break;
1134
1135		osd = req->r_osd;
1136		BUG_ON(!osd);
1137		dout(" tid %llu is slow, will send keepalive on osd%d\n",
1138		     req->r_tid, osd->o_osd);
1139		list_move_tail(&osd->o_keepalive_item, &slow_osds);
1140	}
1141	while (!list_empty(&slow_osds)) {
1142		osd = list_entry(slow_osds.next, struct ceph_osd,
1143				 o_keepalive_item);
1144		list_del_init(&osd->o_keepalive_item);
1145		ceph_con_keepalive(&osd->o_con);
1146	}
1147
1148	__schedule_osd_timeout(osdc);
1149	mutex_unlock(&osdc->request_mutex);
1150	send_queued(osdc);
1151	up_read(&osdc->map_sem);
1152}
1153
1154static void handle_osds_timeout(struct work_struct *work)
1155{
1156	struct ceph_osd_client *osdc =
1157		container_of(work, struct ceph_osd_client,
1158			     osds_timeout_work.work);
1159	unsigned long delay =
1160		osdc->client->options->osd_idle_ttl * HZ >> 2;
1161
1162	dout("osds timeout\n");
1163	down_read(&osdc->map_sem);
1164	remove_old_osds(osdc);
1165	up_read(&osdc->map_sem);
1166
1167	schedule_delayed_work(&osdc->osds_timeout_work,
1168			      round_jiffies_relative(delay));
1169}
1170
1171static void complete_request(struct ceph_osd_request *req)
1172{
1173	if (req->r_safe_callback)
1174		req->r_safe_callback(req, NULL);
1175	complete_all(&req->r_safe_completion);  /* fsync waiter */
1176}
1177
1178/*
1179 * handle osd op reply.  either call the callback if it is specified,
1180 * or do the completion to wake up the waiting thread.
1181 */
1182static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1183			 struct ceph_connection *con)
1184{
1185	struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1186	struct ceph_osd_request *req;
1187	u64 tid;
1188	int numops, object_len, flags;
1189	s32 result;
1190
1191	tid = le64_to_cpu(msg->hdr.tid);
1192	if (msg->front.iov_len < sizeof(*rhead))
1193		goto bad;
1194	numops = le32_to_cpu(rhead->num_ops);
1195	object_len = le32_to_cpu(rhead->object_len);
1196	result = le32_to_cpu(rhead->result);
1197	if (msg->front.iov_len != sizeof(*rhead) + object_len +
1198	    numops * sizeof(struct ceph_osd_op))
1199		goto bad;
1200	dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1201	/* lookup */
1202	mutex_lock(&osdc->request_mutex);
1203	req = __lookup_request(osdc, tid);
1204	if (req == NULL) {
1205		dout("handle_reply tid %llu dne\n", tid);
1206		mutex_unlock(&osdc->request_mutex);
1207		return;
1208	}
1209	ceph_osdc_get_request(req);
1210	flags = le32_to_cpu(rhead->flags);
1211
1212	/*
1213	 * if this connection filled our message, drop our reference now, to
1214	 * avoid a (safe but slower) revoke later.
1215	 */
1216	if (req->r_con_filling_msg == con && req->r_reply == msg) {
1217		dout(" dropping con_filling_msg ref %p\n", con);
1218		req->r_con_filling_msg = NULL;
1219		con->ops->put(con);
1220	}
1221
1222	if (!req->r_got_reply) {
1223		unsigned int bytes;
1224
1225		req->r_result = le32_to_cpu(rhead->result);
1226		bytes = le32_to_cpu(msg->hdr.data_len);
1227		dout("handle_reply result %d bytes %d\n", req->r_result,
1228		     bytes);
1229		if (req->r_result == 0)
1230			req->r_result = bytes;
1231
1232		/* in case this is a write and we need to replay, */
1233		req->r_reassert_version = rhead->reassert_version;
1234
1235		req->r_got_reply = 1;
1236	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1237		dout("handle_reply tid %llu dup ack\n", tid);
1238		mutex_unlock(&osdc->request_mutex);
1239		goto done;
1240	}
1241
1242	dout("handle_reply tid %llu flags %d\n", tid, flags);
1243
1244	if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1245		__register_linger_request(osdc, req);
1246
1247	/* either this is a read, or we got the safe response */
1248	if (result < 0 ||
1249	    (flags & CEPH_OSD_FLAG_ONDISK) ||
1250	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1251		__unregister_request(osdc, req);
1252
1253	mutex_unlock(&osdc->request_mutex);
1254
1255	if (req->r_callback)
1256		req->r_callback(req, msg);
1257	else
1258		complete_all(&req->r_completion);
1259
1260	if (flags & CEPH_OSD_FLAG_ONDISK)
1261		complete_request(req);
1262
1263done:
1264	dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1265	ceph_osdc_put_request(req);
1266	return;
1267
1268bad:
1269	pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1270	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1271	       (int)sizeof(*rhead));
1272	ceph_msg_dump(msg);
1273}
1274
1275static void reset_changed_osds(struct ceph_osd_client *osdc)
1276{
1277	struct rb_node *p, *n;
1278
1279	for (p = rb_first(&osdc->osds); p; p = n) {
1280		struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1281
1282		n = rb_next(p);
1283		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1284		    memcmp(&osd->o_con.peer_addr,
1285			   ceph_osd_addr(osdc->osdmap,
1286					 osd->o_osd),
1287			   sizeof(struct ceph_entity_addr)) != 0)
1288			__reset_osd(osdc, osd);
1289	}
1290}
1291
1292/*
1293 * Requeue requests whose mapping to an OSD has changed.  If requests map to
1294 * no osd, request a new map.
1295 *
1296 * Caller should hold map_sem for read and request_mutex.
1297 */
1298static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1299{
1300	struct ceph_osd_request *req, *nreq;
1301	struct rb_node *p;
1302	int needmap = 0;
1303	int err;
1304
1305	dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1306	mutex_lock(&osdc->request_mutex);
1307	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1308		req = rb_entry(p, struct ceph_osd_request, r_node);
1309		err = __map_request(osdc, req, force_resend);
1310		if (err < 0)
1311			continue;  /* error */
1312		if (req->r_osd == NULL) {
1313			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1314			needmap++;  /* request a newer map */
1315		} else if (err > 0) {
1316			dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1317			     req->r_osd ? req->r_osd->o_osd : -1);
1318			if (!req->r_linger)
1319				req->r_flags |= CEPH_OSD_FLAG_RETRY;
1320		}
1321	}
1322
1323	list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1324				 r_linger_item) {
1325		dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1326
1327		err = __map_request(osdc, req, force_resend);
1328		if (err == 0)
1329			continue;  /* no change and no osd was specified */
1330		if (err < 0)
1331			continue;  /* hrm! */
1332		if (req->r_osd == NULL) {
1333			dout("tid %llu maps to no valid osd\n", req->r_tid);
1334			needmap++;  /* request a newer map */
1335			continue;
1336		}
1337
1338		dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1339		     req->r_osd ? req->r_osd->o_osd : -1);
1340		__unregister_linger_request(osdc, req);
1341		__register_request(osdc, req);
1342	}
1343	mutex_unlock(&osdc->request_mutex);
1344
1345	if (needmap) {
1346		dout("%d requests for down osds, need new map\n", needmap);
1347		ceph_monc_request_next_osdmap(&osdc->client->monc);
1348	}
1349}
1350
1351
1352/*
1353 * Process updated osd map.
1354 *
1355 * The message contains any number of incremental and full maps, normally
1356 * indicating some sort of topology change in the cluster.  Kick requests
1357 * off to different OSDs as needed.
1358 */
1359void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1360{
1361	void *p, *end, *next;
1362	u32 nr_maps, maplen;
1363	u32 epoch;
1364	struct ceph_osdmap *newmap = NULL, *oldmap;
1365	int err;
1366	struct ceph_fsid fsid;
1367
1368	dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1369	p = msg->front.iov_base;
1370	end = p + msg->front.iov_len;
1371
1372	/* verify fsid */
1373	ceph_decode_need(&p, end, sizeof(fsid), bad);
1374	ceph_decode_copy(&p, &fsid, sizeof(fsid));
1375	if (ceph_check_fsid(osdc->client, &fsid) < 0)
1376		return;
1377
1378	down_write(&osdc->map_sem);
1379
1380	/* incremental maps */
1381	ceph_decode_32_safe(&p, end, nr_maps, bad);
1382	dout(" %d inc maps\n", nr_maps);
1383	while (nr_maps > 0) {
1384		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1385		epoch = ceph_decode_32(&p);
1386		maplen = ceph_decode_32(&p);
1387		ceph_decode_need(&p, end, maplen, bad);
1388		next = p + maplen;
1389		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1390			dout("applying incremental map %u len %d\n",
1391			     epoch, maplen);
1392			newmap = osdmap_apply_incremental(&p, next,
1393							  osdc->osdmap,
1394							  osdc->client->msgr);
1395			if (IS_ERR(newmap)) {
1396				err = PTR_ERR(newmap);
1397				goto bad;
1398			}
1399			BUG_ON(!newmap);
1400			if (newmap != osdc->osdmap) {
1401				ceph_osdmap_destroy(osdc->osdmap);
1402				osdc->osdmap = newmap;
1403			}
1404			kick_requests(osdc, 0);
1405			reset_changed_osds(osdc);
1406		} else {
1407			dout("ignoring incremental map %u len %d\n",
1408			     epoch, maplen);
1409		}
1410		p = next;
1411		nr_maps--;
1412	}
1413	if (newmap)
1414		goto done;
1415
1416	/* full maps */
1417	ceph_decode_32_safe(&p, end, nr_maps, bad);
1418	dout(" %d full maps\n", nr_maps);
1419	while (nr_maps) {
1420		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1421		epoch = ceph_decode_32(&p);
1422		maplen = ceph_decode_32(&p);
1423		ceph_decode_need(&p, end, maplen, bad);
1424		if (nr_maps > 1) {
1425			dout("skipping non-latest full map %u len %d\n",
1426			     epoch, maplen);
1427		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1428			dout("skipping full map %u len %d, "
1429			     "older than our %u\n", epoch, maplen,
1430			     osdc->osdmap->epoch);
1431		} else {
1432			int skipped_map = 0;
1433
1434			dout("taking full map %u len %d\n", epoch, maplen);
1435			newmap = osdmap_decode(&p, p+maplen);
1436			if (IS_ERR(newmap)) {
1437				err = PTR_ERR(newmap);
1438				goto bad;
1439			}
1440			BUG_ON(!newmap);
1441			oldmap = osdc->osdmap;
1442			osdc->osdmap = newmap;
1443			if (oldmap) {
1444				if (oldmap->epoch + 1 < newmap->epoch)
1445					skipped_map = 1;
1446				ceph_osdmap_destroy(oldmap);
1447			}
1448			kick_requests(osdc, skipped_map);
1449		}
1450		p += maplen;
1451		nr_maps--;
1452	}
1453
1454done:
1455	downgrade_write(&osdc->map_sem);
1456	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1457
1458	/*
1459	 * subscribe to subsequent osdmap updates if full to ensure
1460	 * we find out when we are no longer full and stop returning
1461	 * ENOSPC.
1462	 */
1463	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1464		ceph_monc_request_next_osdmap(&osdc->client->monc);
1465
1466	send_queued(osdc);
1467	up_read(&osdc->map_sem);
1468	wake_up_all(&osdc->client->auth_wq);
1469	return;
1470
1471bad:
1472	pr_err("osdc handle_map corrupt msg\n");
1473	ceph_msg_dump(msg);
1474	up_write(&osdc->map_sem);
1475	return;
1476}
1477
1478/*
1479 * watch/notify callback event infrastructure
1480 *
1481 * These callbacks are used both for watch and notify operations.
1482 */
1483static void __release_event(struct kref *kref)
1484{
1485	struct ceph_osd_event *event =
1486		container_of(kref, struct ceph_osd_event, kref);
1487
1488	dout("__release_event %p\n", event);
1489	kfree(event);
1490}
1491
1492static void get_event(struct ceph_osd_event *event)
1493{
1494	kref_get(&event->kref);
1495}
1496
1497void ceph_osdc_put_event(struct ceph_osd_event *event)
1498{
1499	kref_put(&event->kref, __release_event);
1500}
1501EXPORT_SYMBOL(ceph_osdc_put_event);
1502
1503static void __insert_event(struct ceph_osd_client *osdc,
1504			     struct ceph_osd_event *new)
1505{
1506	struct rb_node **p = &osdc->event_tree.rb_node;
1507	struct rb_node *parent = NULL;
1508	struct ceph_osd_event *event = NULL;
1509
1510	while (*p) {
1511		parent = *p;
1512		event = rb_entry(parent, struct ceph_osd_event, node);
1513		if (new->cookie < event->cookie)
1514			p = &(*p)->rb_left;
1515		else if (new->cookie > event->cookie)
1516			p = &(*p)->rb_right;
1517		else
1518			BUG();
1519	}
1520
1521	rb_link_node(&new->node, parent, p);
1522	rb_insert_color(&new->node, &osdc->event_tree);
1523}
1524
1525static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1526					        u64 cookie)
1527{
1528	struct rb_node **p = &osdc->event_tree.rb_node;
1529	struct rb_node *parent = NULL;
1530	struct ceph_osd_event *event = NULL;
1531
1532	while (*p) {
1533		parent = *p;
1534		event = rb_entry(parent, struct ceph_osd_event, node);
1535		if (cookie < event->cookie)
1536			p = &(*p)->rb_left;
1537		else if (cookie > event->cookie)
1538			p = &(*p)->rb_right;
1539		else
1540			return event;
1541	}
1542	return NULL;
1543}
1544
1545static void __remove_event(struct ceph_osd_event *event)
1546{
1547	struct ceph_osd_client *osdc = event->osdc;
1548
1549	if (!RB_EMPTY_NODE(&event->node)) {
1550		dout("__remove_event removed %p\n", event);
1551		rb_erase(&event->node, &osdc->event_tree);
1552		ceph_osdc_put_event(event);
1553	} else {
1554		dout("__remove_event didn't remove %p\n", event);
1555	}
1556}
1557
1558int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1559			   void (*event_cb)(u64, u64, u8, void *),
1560			   int one_shot, void *data,
1561			   struct ceph_osd_event **pevent)
1562{
1563	struct ceph_osd_event *event;
1564
1565	event = kmalloc(sizeof(*event), GFP_NOIO);
1566	if (!event)
1567		return -ENOMEM;
1568
1569	dout("create_event %p\n", event);
1570	event->cb = event_cb;
1571	event->one_shot = one_shot;
1572	event->data = data;
1573	event->osdc = osdc;
1574	INIT_LIST_HEAD(&event->osd_node);
1575	kref_init(&event->kref);   /* one ref for us */
1576	kref_get(&event->kref);    /* one ref for the caller */
1577	init_completion(&event->completion);
1578
1579	spin_lock(&osdc->event_lock);
1580	event->cookie = ++osdc->event_count;
1581	__insert_event(osdc, event);
1582	spin_unlock(&osdc->event_lock);
1583
1584	*pevent = event;
1585	return 0;
1586}
1587EXPORT_SYMBOL(ceph_osdc_create_event);
1588
1589void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1590{
1591	struct ceph_osd_client *osdc = event->osdc;
1592
1593	dout("cancel_event %p\n", event);
1594	spin_lock(&osdc->event_lock);
1595	__remove_event(event);
1596	spin_unlock(&osdc->event_lock);
1597	ceph_osdc_put_event(event); /* caller's */
1598}
1599EXPORT_SYMBOL(ceph_osdc_cancel_event);
1600
1601
1602static void do_event_work(struct work_struct *work)
1603{
1604	struct ceph_osd_event_work *event_work =
1605		container_of(work, struct ceph_osd_event_work, work);
1606	struct ceph_osd_event *event = event_work->event;
1607	u64 ver = event_work->ver;
1608	u64 notify_id = event_work->notify_id;
1609	u8 opcode = event_work->opcode;
1610
1611	dout("do_event_work completing %p\n", event);
1612	event->cb(ver, notify_id, opcode, event->data);
1613	complete(&event->completion);
1614	dout("do_event_work completed %p\n", event);
1615	ceph_osdc_put_event(event);
1616	kfree(event_work);
1617}
1618
1619
1620/*
1621 * Process osd watch notifications
1622 */
1623void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1624{
1625	void *p, *end;
1626	u8 proto_ver;
1627	u64 cookie, ver, notify_id;
1628	u8 opcode;
1629	struct ceph_osd_event *event;
1630	struct ceph_osd_event_work *event_work;
1631
1632	p = msg->front.iov_base;
1633	end = p + msg->front.iov_len;
1634
1635	ceph_decode_8_safe(&p, end, proto_ver, bad);
1636	ceph_decode_8_safe(&p, end, opcode, bad);
1637	ceph_decode_64_safe(&p, end, cookie, bad);
1638	ceph_decode_64_safe(&p, end, ver, bad);
1639	ceph_decode_64_safe(&p, end, notify_id, bad);
1640
1641	spin_lock(&osdc->event_lock);
1642	event = __find_event(osdc, cookie);
1643	if (event) {
1644		get_event(event);
1645		if (event->one_shot)
1646			__remove_event(event);
1647	}
1648	spin_unlock(&osdc->event_lock);
1649	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1650	     cookie, ver, event);
1651	if (event) {
1652		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1653		if (!event_work) {
1654			dout("ERROR: could not allocate event_work\n");
1655			goto done_err;
1656		}
1657		INIT_WORK(&event_work->work, do_event_work);
1658		event_work->event = event;
1659		event_work->ver = ver;
1660		event_work->notify_id = notify_id;
1661		event_work->opcode = opcode;
1662		if (!queue_work(osdc->notify_wq, &event_work->work)) {
1663			dout("WARNING: failed to queue notify event work\n");
1664			goto done_err;
1665		}
1666	}
1667
1668	return;
1669
1670done_err:
1671	complete(&event->completion);
1672	ceph_osdc_put_event(event);
1673	return;
1674
1675bad:
1676	pr_err("osdc handle_watch_notify corrupt msg\n");
1677	return;
1678}
1679
1680int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1681{
1682	int err;
1683
1684	dout("wait_event %p\n", event);
1685	err = wait_for_completion_interruptible_timeout(&event->completion,
1686							timeout * HZ);
1687	ceph_osdc_put_event(event);
1688	if (err > 0)
1689		err = 0;
1690	dout("wait_event %p returns %d\n", event, err);
1691	return err;
1692}
1693EXPORT_SYMBOL(ceph_osdc_wait_event);
1694
1695/*
1696 * Register request, send initial attempt.
1697 */
1698int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1699			    struct ceph_osd_request *req,
1700			    bool nofail)
1701{
1702	int rc = 0;
1703
1704	req->r_request->pages = req->r_pages;
1705	req->r_request->nr_pages = req->r_num_pages;
1706#ifdef CONFIG_BLOCK
1707	req->r_request->bio = req->r_bio;
1708#endif
1709	req->r_request->trail = req->r_trail;
1710
1711	register_request(osdc, req);
1712
1713	down_read(&osdc->map_sem);
1714	mutex_lock(&osdc->request_mutex);
1715	/*
1716	 * a racing kick_requests() may have sent the message for us
1717	 * while we dropped request_mutex above, so only send now if
1718	 * the request still han't been touched yet.
1719	 */
1720	if (req->r_sent == 0) {
1721		rc = __map_request(osdc, req, 0);
1722		if (rc < 0) {
1723			if (nofail) {
1724				dout("osdc_start_request failed map, "
1725				     " will retry %lld\n", req->r_tid);
1726				rc = 0;
1727			}
1728			goto out_unlock;
1729		}
1730		if (req->r_osd == NULL) {
1731			dout("send_request %p no up osds in pg\n", req);
1732			ceph_monc_request_next_osdmap(&osdc->client->monc);
1733		} else {
1734			__send_request(osdc, req);
 
 
 
 
 
 
 
 
 
1735		}
1736		rc = 0;
1737	}
1738
1739out_unlock:
1740	mutex_unlock(&osdc->request_mutex);
1741	up_read(&osdc->map_sem);
1742	return rc;
1743}
1744EXPORT_SYMBOL(ceph_osdc_start_request);
1745
1746/*
1747 * wait for a request to complete
1748 */
1749int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1750			   struct ceph_osd_request *req)
1751{
1752	int rc;
1753
1754	rc = wait_for_completion_interruptible(&req->r_completion);
1755	if (rc < 0) {
1756		mutex_lock(&osdc->request_mutex);
1757		__cancel_request(req);
1758		__unregister_request(osdc, req);
1759		mutex_unlock(&osdc->request_mutex);
1760		complete_request(req);
1761		dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1762		return rc;
1763	}
1764
1765	dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1766	return req->r_result;
1767}
1768EXPORT_SYMBOL(ceph_osdc_wait_request);
1769
1770/*
1771 * sync - wait for all in-flight requests to flush.  avoid starvation.
1772 */
1773void ceph_osdc_sync(struct ceph_osd_client *osdc)
1774{
1775	struct ceph_osd_request *req;
1776	u64 last_tid, next_tid = 0;
1777
1778	mutex_lock(&osdc->request_mutex);
1779	last_tid = osdc->last_tid;
1780	while (1) {
1781		req = __lookup_request_ge(osdc, next_tid);
1782		if (!req)
1783			break;
1784		if (req->r_tid > last_tid)
1785			break;
1786
1787		next_tid = req->r_tid + 1;
1788		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1789			continue;
1790
1791		ceph_osdc_get_request(req);
1792		mutex_unlock(&osdc->request_mutex);
1793		dout("sync waiting on tid %llu (last is %llu)\n",
1794		     req->r_tid, last_tid);
1795		wait_for_completion(&req->r_safe_completion);
1796		mutex_lock(&osdc->request_mutex);
1797		ceph_osdc_put_request(req);
1798	}
1799	mutex_unlock(&osdc->request_mutex);
1800	dout("sync done (thru tid %llu)\n", last_tid);
1801}
1802EXPORT_SYMBOL(ceph_osdc_sync);
1803
1804/*
1805 * init, shutdown
1806 */
1807int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1808{
1809	int err;
1810
1811	dout("init\n");
1812	osdc->client = client;
1813	osdc->osdmap = NULL;
1814	init_rwsem(&osdc->map_sem);
1815	init_completion(&osdc->map_waiters);
1816	osdc->last_requested_map = 0;
1817	mutex_init(&osdc->request_mutex);
1818	osdc->last_tid = 0;
1819	osdc->osds = RB_ROOT;
1820	INIT_LIST_HEAD(&osdc->osd_lru);
1821	osdc->requests = RB_ROOT;
1822	INIT_LIST_HEAD(&osdc->req_lru);
1823	INIT_LIST_HEAD(&osdc->req_unsent);
1824	INIT_LIST_HEAD(&osdc->req_notarget);
1825	INIT_LIST_HEAD(&osdc->req_linger);
1826	osdc->num_requests = 0;
1827	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1828	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1829	spin_lock_init(&osdc->event_lock);
1830	osdc->event_tree = RB_ROOT;
1831	osdc->event_count = 0;
1832
1833	schedule_delayed_work(&osdc->osds_timeout_work,
1834	   round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1835
1836	err = -ENOMEM;
1837	osdc->req_mempool = mempool_create_kmalloc_pool(10,
1838					sizeof(struct ceph_osd_request));
1839	if (!osdc->req_mempool)
1840		goto out;
1841
1842	err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1843				"osd_op");
1844	if (err < 0)
1845		goto out_mempool;
1846	err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1847				OSD_OPREPLY_FRONT_LEN, 10, true,
1848				"osd_op_reply");
1849	if (err < 0)
1850		goto out_msgpool;
1851
1852	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1853	if (IS_ERR(osdc->notify_wq)) {
1854		err = PTR_ERR(osdc->notify_wq);
1855		osdc->notify_wq = NULL;
1856		goto out_msgpool;
1857	}
1858	return 0;
1859
1860out_msgpool:
1861	ceph_msgpool_destroy(&osdc->msgpool_op);
1862out_mempool:
1863	mempool_destroy(osdc->req_mempool);
1864out:
1865	return err;
1866}
1867EXPORT_SYMBOL(ceph_osdc_init);
1868
1869void ceph_osdc_stop(struct ceph_osd_client *osdc)
1870{
1871	flush_workqueue(osdc->notify_wq);
1872	destroy_workqueue(osdc->notify_wq);
1873	cancel_delayed_work_sync(&osdc->timeout_work);
1874	cancel_delayed_work_sync(&osdc->osds_timeout_work);
1875	if (osdc->osdmap) {
1876		ceph_osdmap_destroy(osdc->osdmap);
1877		osdc->osdmap = NULL;
1878	}
1879	remove_all_osds(osdc);
1880	mempool_destroy(osdc->req_mempool);
1881	ceph_msgpool_destroy(&osdc->msgpool_op);
1882	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1883}
1884EXPORT_SYMBOL(ceph_osdc_stop);
1885
1886/*
1887 * Read some contiguous pages.  If we cross a stripe boundary, shorten
1888 * *plen.  Return number of bytes read, or error.
1889 */
1890int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1891			struct ceph_vino vino, struct ceph_file_layout *layout,
1892			u64 off, u64 *plen,
1893			u32 truncate_seq, u64 truncate_size,
1894			struct page **pages, int num_pages, int page_align)
1895{
1896	struct ceph_osd_request *req;
1897	int rc = 0;
1898
1899	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1900	     vino.snap, off, *plen);
1901	req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1902				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1903				    NULL, 0, truncate_seq, truncate_size, NULL,
1904				    false, 1, page_align);
1905	if (!req)
1906		return -ENOMEM;
1907
1908	/* it may be a short read due to an object boundary */
1909	req->r_pages = pages;
1910
1911	dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1912	     off, *plen, req->r_num_pages, page_align);
1913
1914	rc = ceph_osdc_start_request(osdc, req, false);
1915	if (!rc)
1916		rc = ceph_osdc_wait_request(osdc, req);
1917
1918	ceph_osdc_put_request(req);
1919	dout("readpages result %d\n", rc);
1920	return rc;
1921}
1922EXPORT_SYMBOL(ceph_osdc_readpages);
1923
1924/*
1925 * do a synchronous write on N pages
1926 */
1927int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1928			 struct ceph_file_layout *layout,
1929			 struct ceph_snap_context *snapc,
1930			 u64 off, u64 len,
1931			 u32 truncate_seq, u64 truncate_size,
1932			 struct timespec *mtime,
1933			 struct page **pages, int num_pages,
1934			 int flags, int do_sync, bool nofail)
1935{
1936	struct ceph_osd_request *req;
1937	int rc = 0;
1938	int page_align = off & ~PAGE_MASK;
1939
1940	BUG_ON(vino.snap != CEPH_NOSNAP);
1941	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1942				    CEPH_OSD_OP_WRITE,
1943				    flags | CEPH_OSD_FLAG_ONDISK |
1944					    CEPH_OSD_FLAG_WRITE,
1945				    snapc, do_sync,
1946				    truncate_seq, truncate_size, mtime,
1947				    nofail, 1, page_align);
1948	if (!req)
1949		return -ENOMEM;
1950
1951	/* it may be a short write due to an object boundary */
1952	req->r_pages = pages;
1953	dout("writepages %llu~%llu (%d pages)\n", off, len,
1954	     req->r_num_pages);
1955
1956	rc = ceph_osdc_start_request(osdc, req, nofail);
1957	if (!rc)
1958		rc = ceph_osdc_wait_request(osdc, req);
1959
1960	ceph_osdc_put_request(req);
1961	if (rc == 0)
1962		rc = len;
1963	dout("writepages result %d\n", rc);
1964	return rc;
1965}
1966EXPORT_SYMBOL(ceph_osdc_writepages);
1967
1968/*
1969 * handle incoming message
1970 */
1971static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1972{
1973	struct ceph_osd *osd = con->private;
1974	struct ceph_osd_client *osdc;
1975	int type = le16_to_cpu(msg->hdr.type);
1976
1977	if (!osd)
1978		goto out;
1979	osdc = osd->o_osdc;
1980
1981	switch (type) {
1982	case CEPH_MSG_OSD_MAP:
1983		ceph_osdc_handle_map(osdc, msg);
1984		break;
1985	case CEPH_MSG_OSD_OPREPLY:
1986		handle_reply(osdc, msg, con);
1987		break;
1988	case CEPH_MSG_WATCH_NOTIFY:
1989		handle_watch_notify(osdc, msg);
1990		break;
1991
1992	default:
1993		pr_err("received unknown message type %d %s\n", type,
1994		       ceph_msg_type_name(type));
1995	}
1996out:
1997	ceph_msg_put(msg);
1998}
1999
2000/*
2001 * lookup and return message for incoming reply.  set up reply message
2002 * pages.
2003 */
2004static struct ceph_msg *get_reply(struct ceph_connection *con,
2005				  struct ceph_msg_header *hdr,
2006				  int *skip)
2007{
2008	struct ceph_osd *osd = con->private;
2009	struct ceph_osd_client *osdc = osd->o_osdc;
2010	struct ceph_msg *m;
2011	struct ceph_osd_request *req;
2012	int front = le32_to_cpu(hdr->front_len);
2013	int data_len = le32_to_cpu(hdr->data_len);
2014	u64 tid;
2015
2016	tid = le64_to_cpu(hdr->tid);
2017	mutex_lock(&osdc->request_mutex);
2018	req = __lookup_request(osdc, tid);
2019	if (!req) {
2020		*skip = 1;
2021		m = NULL;
2022		pr_info("get_reply unknown tid %llu from osd%d\n", tid,
2023			osd->o_osd);
2024		goto out;
2025	}
2026
2027	if (req->r_con_filling_msg) {
2028		dout("get_reply revoking msg %p from old con %p\n",
2029		     req->r_reply, req->r_con_filling_msg);
2030		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
2031		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2032		req->r_con_filling_msg = NULL;
2033	}
2034
2035	if (front > req->r_reply->front.iov_len) {
2036		pr_warning("get_reply front %d > preallocated %d\n",
2037			   front, (int)req->r_reply->front.iov_len);
2038		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2039		if (!m)
2040			goto out;
2041		ceph_msg_put(req->r_reply);
2042		req->r_reply = m;
2043	}
2044	m = ceph_msg_get(req->r_reply);
2045
2046	if (data_len > 0) {
2047		int want = calc_pages_for(req->r_page_alignment, data_len);
2048
2049		if (unlikely(req->r_num_pages < want)) {
2050			pr_warning("tid %lld reply has %d bytes %d pages, we"
2051				   " had only %d pages ready\n", tid, data_len,
2052				   want, req->r_num_pages);
2053			*skip = 1;
2054			ceph_msg_put(m);
2055			m = NULL;
2056			goto out;
2057		}
2058		m->pages = req->r_pages;
2059		m->nr_pages = req->r_num_pages;
2060		m->page_alignment = req->r_page_alignment;
2061#ifdef CONFIG_BLOCK
2062		m->bio = req->r_bio;
2063#endif
2064	}
2065	*skip = 0;
2066	req->r_con_filling_msg = con->ops->get(con);
2067	dout("get_reply tid %lld %p\n", tid, m);
2068
2069out:
2070	mutex_unlock(&osdc->request_mutex);
2071	return m;
2072
2073}
2074
2075static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2076				  struct ceph_msg_header *hdr,
2077				  int *skip)
2078{
2079	struct ceph_osd *osd = con->private;
2080	int type = le16_to_cpu(hdr->type);
2081	int front = le32_to_cpu(hdr->front_len);
2082
2083	switch (type) {
2084	case CEPH_MSG_OSD_MAP:
2085	case CEPH_MSG_WATCH_NOTIFY:
2086		return ceph_msg_new(type, front, GFP_NOFS, false);
2087	case CEPH_MSG_OSD_OPREPLY:
2088		return get_reply(con, hdr, skip);
2089	default:
2090		pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2091			osd->o_osd);
2092		*skip = 1;
2093		return NULL;
2094	}
2095}
2096
2097/*
2098 * Wrappers to refcount containing ceph_osd struct
2099 */
2100static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2101{
2102	struct ceph_osd *osd = con->private;
2103	if (get_osd(osd))
2104		return con;
2105	return NULL;
2106}
2107
2108static void put_osd_con(struct ceph_connection *con)
2109{
2110	struct ceph_osd *osd = con->private;
2111	put_osd(osd);
2112}
2113
2114/*
2115 * authentication
2116 */
2117/*
2118 * Note: returned pointer is the address of a structure that's
2119 * managed separately.  Caller must *not* attempt to free it.
2120 */
2121static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2122					int *proto, int force_new)
2123{
2124	struct ceph_osd *o = con->private;
2125	struct ceph_osd_client *osdc = o->o_osdc;
2126	struct ceph_auth_client *ac = osdc->client->monc.auth;
2127	struct ceph_auth_handshake *auth = &o->o_auth;
2128
2129	if (force_new && auth->authorizer) {
2130		if (ac->ops && ac->ops->destroy_authorizer)
2131			ac->ops->destroy_authorizer(ac, auth->authorizer);
2132		auth->authorizer = NULL;
2133	}
2134	if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2135		int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2136							auth);
 
 
 
 
2137		if (ret)
2138			return ERR_PTR(ret);
2139	}
2140	*proto = ac->protocol;
2141
2142	return auth;
 
 
 
 
 
2143}
2144
2145
2146static int verify_authorizer_reply(struct ceph_connection *con, int len)
2147{
2148	struct ceph_osd *o = con->private;
2149	struct ceph_osd_client *osdc = o->o_osdc;
2150	struct ceph_auth_client *ac = osdc->client->monc.auth;
2151
2152	/*
2153	 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2154	 * XXX which do we do:  succeed or fail?
2155	 */
2156	return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2157}
2158
2159static int invalidate_authorizer(struct ceph_connection *con)
2160{
2161	struct ceph_osd *o = con->private;
2162	struct ceph_osd_client *osdc = o->o_osdc;
2163	struct ceph_auth_client *ac = osdc->client->monc.auth;
2164
2165	if (ac->ops && ac->ops->invalidate_authorizer)
2166		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2167
2168	return ceph_monc_validate_auth(&osdc->client->monc);
2169}
2170
2171static const struct ceph_connection_operations osd_con_ops = {
2172	.get = get_osd_con,
2173	.put = put_osd_con,
2174	.dispatch = dispatch,
2175	.get_authorizer = get_authorizer,
2176	.verify_authorizer_reply = verify_authorizer_reply,
2177	.invalidate_authorizer = invalidate_authorizer,
2178	.alloc_msg = alloc_msg,
2179	.fault = osd_reset,
2180};
v3.1
   1#include <linux/ceph/ceph_debug.h>
   2
   3#include <linux/module.h>
   4#include <linux/err.h>
   5#include <linux/highmem.h>
   6#include <linux/mm.h>
   7#include <linux/pagemap.h>
   8#include <linux/slab.h>
   9#include <linux/uaccess.h>
  10#ifdef CONFIG_BLOCK
  11#include <linux/bio.h>
  12#endif
  13
  14#include <linux/ceph/libceph.h>
  15#include <linux/ceph/osd_client.h>
  16#include <linux/ceph/messenger.h>
  17#include <linux/ceph/decode.h>
  18#include <linux/ceph/auth.h>
  19#include <linux/ceph/pagelist.h>
  20
  21#define OSD_OP_FRONT_LEN	4096
  22#define OSD_OPREPLY_FRONT_LEN	512
  23
  24static const struct ceph_connection_operations osd_con_ops;
  25
  26static void send_queued(struct ceph_osd_client *osdc);
  27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  28static void __register_request(struct ceph_osd_client *osdc,
  29			       struct ceph_osd_request *req);
  30static void __unregister_linger_request(struct ceph_osd_client *osdc,
  31					struct ceph_osd_request *req);
  32static int __send_request(struct ceph_osd_client *osdc,
  33			  struct ceph_osd_request *req);
  34
  35static int op_needs_trail(int op)
  36{
  37	switch (op) {
  38	case CEPH_OSD_OP_GETXATTR:
  39	case CEPH_OSD_OP_SETXATTR:
  40	case CEPH_OSD_OP_CMPXATTR:
  41	case CEPH_OSD_OP_CALL:
  42	case CEPH_OSD_OP_NOTIFY:
  43		return 1;
  44	default:
  45		return 0;
  46	}
  47}
  48
  49static int op_has_extent(int op)
  50{
  51	return (op == CEPH_OSD_OP_READ ||
  52		op == CEPH_OSD_OP_WRITE);
  53}
  54
  55void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
  56			struct ceph_file_layout *layout,
  57			u64 snapid,
  58			u64 off, u64 *plen, u64 *bno,
  59			struct ceph_osd_request *req,
  60			struct ceph_osd_req_op *op)
  61{
  62	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
  63	u64 orig_len = *plen;
  64	u64 objoff, objlen;    /* extent in object */
  65
  66	reqhead->snapid = cpu_to_le64(snapid);
  67
  68	/* object extent? */
  69	ceph_calc_file_object_mapping(layout, off, plen, bno,
  70				      &objoff, &objlen);
  71	if (*plen < orig_len)
  72		dout(" skipping last %llu, final file extent %llu~%llu\n",
  73		     orig_len - *plen, off, *plen);
  74
  75	if (op_has_extent(op->op)) {
  76		op->extent.offset = objoff;
  77		op->extent.length = objlen;
  78	}
  79	req->r_num_pages = calc_pages_for(off, *plen);
  80	req->r_page_alignment = off & ~PAGE_MASK;
  81	if (op->op == CEPH_OSD_OP_WRITE)
  82		op->payload_len = *plen;
  83
  84	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
  85	     *bno, objoff, objlen, req->r_num_pages);
  86
  87}
  88EXPORT_SYMBOL(ceph_calc_raw_layout);
  89
  90/*
  91 * Implement client access to distributed object storage cluster.
  92 *
  93 * All data objects are stored within a cluster/cloud of OSDs, or
  94 * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
  95 * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
  96 * remote daemons serving up and coordinating consistent and safe
  97 * access to storage.
  98 *
  99 * Cluster membership and the mapping of data objects onto storage devices
 100 * are described by the osd map.
 101 *
 102 * We keep track of pending OSD requests (read, write), resubmit
 103 * requests to different OSDs when the cluster topology/data layout
 104 * change, or retry the affected requests when the communications
 105 * channel with an OSD is reset.
 106 */
 107
 108/*
 109 * calculate the mapping of a file extent onto an object, and fill out the
 110 * request accordingly.  shorten extent as necessary if it crosses an
 111 * object boundary.
 112 *
 113 * fill osd op in request message.
 114 */
 115static void calc_layout(struct ceph_osd_client *osdc,
 116			struct ceph_vino vino,
 117			struct ceph_file_layout *layout,
 118			u64 off, u64 *plen,
 119			struct ceph_osd_request *req,
 120			struct ceph_osd_req_op *op)
 121{
 122	u64 bno;
 123
 124	ceph_calc_raw_layout(osdc, layout, vino.snap, off,
 125			     plen, &bno, req, op);
 126
 127	snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
 128	req->r_oid_len = strlen(req->r_oid);
 129}
 130
 131/*
 132 * requests
 133 */
 134void ceph_osdc_release_request(struct kref *kref)
 135{
 136	struct ceph_osd_request *req = container_of(kref,
 137						    struct ceph_osd_request,
 138						    r_kref);
 139
 140	if (req->r_request)
 141		ceph_msg_put(req->r_request);
 142	if (req->r_reply)
 143		ceph_msg_put(req->r_reply);
 144	if (req->r_con_filling_msg) {
 145		dout("release_request revoking pages %p from con %p\n",
 146		     req->r_pages, req->r_con_filling_msg);
 147		ceph_con_revoke_message(req->r_con_filling_msg,
 148				      req->r_reply);
 149		ceph_con_put(req->r_con_filling_msg);
 150	}
 
 
 151	if (req->r_own_pages)
 152		ceph_release_page_vector(req->r_pages,
 153					 req->r_num_pages);
 154#ifdef CONFIG_BLOCK
 155	if (req->r_bio)
 156		bio_put(req->r_bio);
 157#endif
 158	ceph_put_snap_context(req->r_snapc);
 159	if (req->r_trail) {
 160		ceph_pagelist_release(req->r_trail);
 161		kfree(req->r_trail);
 162	}
 163	if (req->r_mempool)
 164		mempool_free(req, req->r_osdc->req_mempool);
 165	else
 166		kfree(req);
 167}
 168EXPORT_SYMBOL(ceph_osdc_release_request);
 169
 170static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
 171{
 172	int i = 0;
 173
 174	if (needs_trail)
 175		*needs_trail = 0;
 176	while (ops[i].op) {
 177		if (needs_trail && op_needs_trail(ops[i].op))
 178			*needs_trail = 1;
 179		i++;
 180	}
 181
 182	return i;
 183}
 184
 185struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 186					       int flags,
 187					       struct ceph_snap_context *snapc,
 188					       struct ceph_osd_req_op *ops,
 189					       bool use_mempool,
 190					       gfp_t gfp_flags,
 191					       struct page **pages,
 192					       struct bio *bio)
 193{
 194	struct ceph_osd_request *req;
 195	struct ceph_msg *msg;
 196	int needs_trail;
 197	int num_op = get_num_ops(ops, &needs_trail);
 198	size_t msg_size = sizeof(struct ceph_osd_request_head);
 199
 200	msg_size += num_op*sizeof(struct ceph_osd_op);
 201
 202	if (use_mempool) {
 203		req = mempool_alloc(osdc->req_mempool, gfp_flags);
 204		memset(req, 0, sizeof(*req));
 205	} else {
 206		req = kzalloc(sizeof(*req), gfp_flags);
 207	}
 208	if (req == NULL)
 209		return NULL;
 210
 211	req->r_osdc = osdc;
 212	req->r_mempool = use_mempool;
 213
 214	kref_init(&req->r_kref);
 215	init_completion(&req->r_completion);
 216	init_completion(&req->r_safe_completion);
 217	INIT_LIST_HEAD(&req->r_unsafe_item);
 218	INIT_LIST_HEAD(&req->r_linger_item);
 219	INIT_LIST_HEAD(&req->r_linger_osd);
 220	INIT_LIST_HEAD(&req->r_req_lru_item);
 221	req->r_flags = flags;
 222
 223	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 224
 225	/* create reply message */
 226	if (use_mempool)
 227		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 228	else
 229		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
 230				   OSD_OPREPLY_FRONT_LEN, gfp_flags);
 231	if (!msg) {
 232		ceph_osdc_put_request(req);
 233		return NULL;
 234	}
 235	req->r_reply = msg;
 236
 237	/* allocate space for the trailing data */
 238	if (needs_trail) {
 239		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
 240		if (!req->r_trail) {
 241			ceph_osdc_put_request(req);
 242			return NULL;
 243		}
 244		ceph_pagelist_init(req->r_trail);
 245	}
 246	/* create request message; allow space for oid */
 247	msg_size += 40;
 248	if (snapc)
 249		msg_size += sizeof(u64) * snapc->num_snaps;
 250	if (use_mempool)
 251		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 252	else
 253		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
 254	if (!msg) {
 255		ceph_osdc_put_request(req);
 256		return NULL;
 257	}
 258
 259	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 260	memset(msg->front.iov_base, 0, msg->front.iov_len);
 261
 262	req->r_request = msg;
 263	req->r_pages = pages;
 264#ifdef CONFIG_BLOCK
 265	if (bio) {
 266		req->r_bio = bio;
 267		bio_get(req->r_bio);
 268	}
 269#endif
 270
 271	return req;
 272}
 273EXPORT_SYMBOL(ceph_osdc_alloc_request);
 274
 275static void osd_req_encode_op(struct ceph_osd_request *req,
 276			      struct ceph_osd_op *dst,
 277			      struct ceph_osd_req_op *src)
 278{
 279	dst->op = cpu_to_le16(src->op);
 280
 281	switch (dst->op) {
 282	case CEPH_OSD_OP_READ:
 283	case CEPH_OSD_OP_WRITE:
 284		dst->extent.offset =
 285			cpu_to_le64(src->extent.offset);
 286		dst->extent.length =
 287			cpu_to_le64(src->extent.length);
 288		dst->extent.truncate_size =
 289			cpu_to_le64(src->extent.truncate_size);
 290		dst->extent.truncate_seq =
 291			cpu_to_le32(src->extent.truncate_seq);
 292		break;
 293
 294	case CEPH_OSD_OP_GETXATTR:
 295	case CEPH_OSD_OP_SETXATTR:
 296	case CEPH_OSD_OP_CMPXATTR:
 297		BUG_ON(!req->r_trail);
 298
 299		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
 300		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
 301		dst->xattr.cmp_op = src->xattr.cmp_op;
 302		dst->xattr.cmp_mode = src->xattr.cmp_mode;
 303		ceph_pagelist_append(req->r_trail, src->xattr.name,
 304				     src->xattr.name_len);
 305		ceph_pagelist_append(req->r_trail, src->xattr.val,
 306				     src->xattr.value_len);
 307		break;
 308	case CEPH_OSD_OP_CALL:
 309		BUG_ON(!req->r_trail);
 310
 311		dst->cls.class_len = src->cls.class_len;
 312		dst->cls.method_len = src->cls.method_len;
 313		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 314
 315		ceph_pagelist_append(req->r_trail, src->cls.class_name,
 316				     src->cls.class_len);
 317		ceph_pagelist_append(req->r_trail, src->cls.method_name,
 318				     src->cls.method_len);
 319		ceph_pagelist_append(req->r_trail, src->cls.indata,
 320				     src->cls.indata_len);
 321		break;
 322	case CEPH_OSD_OP_ROLLBACK:
 323		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
 324		break;
 325	case CEPH_OSD_OP_STARTSYNC:
 326		break;
 327	case CEPH_OSD_OP_NOTIFY:
 328		{
 329			__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
 330			__le32 timeout = cpu_to_le32(src->watch.timeout);
 331
 332			BUG_ON(!req->r_trail);
 333
 334			ceph_pagelist_append(req->r_trail,
 335						&prot_ver, sizeof(prot_ver));
 336			ceph_pagelist_append(req->r_trail,
 337						&timeout, sizeof(timeout));
 338		}
 339	case CEPH_OSD_OP_NOTIFY_ACK:
 340	case CEPH_OSD_OP_WATCH:
 341		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 342		dst->watch.ver = cpu_to_le64(src->watch.ver);
 343		dst->watch.flag = src->watch.flag;
 344		break;
 345	default:
 346		pr_err("unrecognized osd opcode %d\n", dst->op);
 347		WARN_ON(1);
 348		break;
 349	}
 350	dst->payload_len = cpu_to_le32(src->payload_len);
 351}
 352
 353/*
 354 * build new request AND message
 355 *
 356 */
 357void ceph_osdc_build_request(struct ceph_osd_request *req,
 358			     u64 off, u64 *plen,
 359			     struct ceph_osd_req_op *src_ops,
 360			     struct ceph_snap_context *snapc,
 361			     struct timespec *mtime,
 362			     const char *oid,
 363			     int oid_len)
 364{
 365	struct ceph_msg *msg = req->r_request;
 366	struct ceph_osd_request_head *head;
 367	struct ceph_osd_req_op *src_op;
 368	struct ceph_osd_op *op;
 369	void *p;
 370	int num_op = get_num_ops(src_ops, NULL);
 371	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
 372	int flags = req->r_flags;
 373	u64 data_len = 0;
 374	int i;
 375
 376	head = msg->front.iov_base;
 377	op = (void *)(head + 1);
 378	p = (void *)(op + num_op);
 379
 380	req->r_snapc = ceph_get_snap_context(snapc);
 381
 382	head->client_inc = cpu_to_le32(1); /* always, for now. */
 383	head->flags = cpu_to_le32(flags);
 384	if (flags & CEPH_OSD_FLAG_WRITE)
 385		ceph_encode_timespec(&head->mtime, mtime);
 386	head->num_ops = cpu_to_le16(num_op);
 387
 388
 389	/* fill in oid */
 390	head->object_len = cpu_to_le32(oid_len);
 391	memcpy(p, oid, oid_len);
 392	p += oid_len;
 393
 394	src_op = src_ops;
 395	while (src_op->op) {
 396		osd_req_encode_op(req, op, src_op);
 397		src_op++;
 398		op++;
 399	}
 400
 401	if (req->r_trail)
 402		data_len += req->r_trail->length;
 403
 404	if (snapc) {
 405		head->snap_seq = cpu_to_le64(snapc->seq);
 406		head->num_snaps = cpu_to_le32(snapc->num_snaps);
 407		for (i = 0; i < snapc->num_snaps; i++) {
 408			put_unaligned_le64(snapc->snaps[i], p);
 409			p += sizeof(u64);
 410		}
 411	}
 412
 413	if (flags & CEPH_OSD_FLAG_WRITE) {
 414		req->r_request->hdr.data_off = cpu_to_le16(off);
 415		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
 416	} else if (data_len) {
 417		req->r_request->hdr.data_off = 0;
 418		req->r_request->hdr.data_len = cpu_to_le32(data_len);
 419	}
 420
 421	req->r_request->page_alignment = req->r_page_alignment;
 422
 423	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
 424	msg_size = p - msg->front.iov_base;
 425	msg->front.iov_len = msg_size;
 426	msg->hdr.front_len = cpu_to_le32(msg_size);
 427	return;
 428}
 429EXPORT_SYMBOL(ceph_osdc_build_request);
 430
 431/*
 432 * build new request AND message, calculate layout, and adjust file
 433 * extent as needed.
 434 *
 435 * if the file was recently truncated, we include information about its
 436 * old and new size so that the object can be updated appropriately.  (we
 437 * avoid synchronously deleting truncated objects because it's slow.)
 438 *
 439 * if @do_sync, include a 'startsync' command so that the osd will flush
 440 * data quickly.
 441 */
 442struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 443					       struct ceph_file_layout *layout,
 444					       struct ceph_vino vino,
 445					       u64 off, u64 *plen,
 446					       int opcode, int flags,
 447					       struct ceph_snap_context *snapc,
 448					       int do_sync,
 449					       u32 truncate_seq,
 450					       u64 truncate_size,
 451					       struct timespec *mtime,
 452					       bool use_mempool, int num_reply,
 453					       int page_align)
 454{
 455	struct ceph_osd_req_op ops[3];
 456	struct ceph_osd_request *req;
 457
 458	ops[0].op = opcode;
 459	ops[0].extent.truncate_seq = truncate_seq;
 460	ops[0].extent.truncate_size = truncate_size;
 461	ops[0].payload_len = 0;
 462
 463	if (do_sync) {
 464		ops[1].op = CEPH_OSD_OP_STARTSYNC;
 465		ops[1].payload_len = 0;
 466		ops[2].op = 0;
 467	} else
 468		ops[1].op = 0;
 469
 470	req = ceph_osdc_alloc_request(osdc, flags,
 471					 snapc, ops,
 472					 use_mempool,
 473					 GFP_NOFS, NULL, NULL);
 474	if (!req)
 475		return NULL;
 476
 477	/* calculate max write size */
 478	calc_layout(osdc, vino, layout, off, plen, req, ops);
 479	req->r_file_layout = *layout;  /* keep a copy */
 480
 481	/* in case it differs from natural (file) alignment that
 482	   calc_layout filled in for us */
 483	req->r_num_pages = calc_pages_for(page_align, *plen);
 484	req->r_page_alignment = page_align;
 485
 486	ceph_osdc_build_request(req, off, plen, ops,
 487				snapc,
 488				mtime,
 489				req->r_oid, req->r_oid_len);
 490
 491	return req;
 492}
 493EXPORT_SYMBOL(ceph_osdc_new_request);
 494
 495/*
 496 * We keep osd requests in an rbtree, sorted by ->r_tid.
 497 */
 498static void __insert_request(struct ceph_osd_client *osdc,
 499			     struct ceph_osd_request *new)
 500{
 501	struct rb_node **p = &osdc->requests.rb_node;
 502	struct rb_node *parent = NULL;
 503	struct ceph_osd_request *req = NULL;
 504
 505	while (*p) {
 506		parent = *p;
 507		req = rb_entry(parent, struct ceph_osd_request, r_node);
 508		if (new->r_tid < req->r_tid)
 509			p = &(*p)->rb_left;
 510		else if (new->r_tid > req->r_tid)
 511			p = &(*p)->rb_right;
 512		else
 513			BUG();
 514	}
 515
 516	rb_link_node(&new->r_node, parent, p);
 517	rb_insert_color(&new->r_node, &osdc->requests);
 518}
 519
 520static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
 521						 u64 tid)
 522{
 523	struct ceph_osd_request *req;
 524	struct rb_node *n = osdc->requests.rb_node;
 525
 526	while (n) {
 527		req = rb_entry(n, struct ceph_osd_request, r_node);
 528		if (tid < req->r_tid)
 529			n = n->rb_left;
 530		else if (tid > req->r_tid)
 531			n = n->rb_right;
 532		else
 533			return req;
 534	}
 535	return NULL;
 536}
 537
 538static struct ceph_osd_request *
 539__lookup_request_ge(struct ceph_osd_client *osdc,
 540		    u64 tid)
 541{
 542	struct ceph_osd_request *req;
 543	struct rb_node *n = osdc->requests.rb_node;
 544
 545	while (n) {
 546		req = rb_entry(n, struct ceph_osd_request, r_node);
 547		if (tid < req->r_tid) {
 548			if (!n->rb_left)
 549				return req;
 550			n = n->rb_left;
 551		} else if (tid > req->r_tid) {
 552			n = n->rb_right;
 553		} else {
 554			return req;
 555		}
 556	}
 557	return NULL;
 558}
 559
 560/*
 561 * Resubmit requests pending on the given osd.
 562 */
 563static void __kick_osd_requests(struct ceph_osd_client *osdc,
 564				struct ceph_osd *osd)
 565{
 566	struct ceph_osd_request *req, *nreq;
 567	int err;
 568
 569	dout("__kick_osd_requests osd%d\n", osd->o_osd);
 570	err = __reset_osd(osdc, osd);
 571	if (err == -EAGAIN)
 572		return;
 573
 574	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
 575		list_move(&req->r_req_lru_item, &osdc->req_unsent);
 576		dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
 577		     osd->o_osd);
 578		if (!req->r_linger)
 579			req->r_flags |= CEPH_OSD_FLAG_RETRY;
 580	}
 581
 582	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
 583				 r_linger_osd) {
 584		/*
 585		 * reregister request prior to unregistering linger so
 586		 * that r_osd is preserved.
 587		 */
 588		BUG_ON(!list_empty(&req->r_req_lru_item));
 589		__register_request(osdc, req);
 590		list_add(&req->r_req_lru_item, &osdc->req_unsent);
 591		list_add(&req->r_osd_item, &req->r_osd->o_requests);
 592		__unregister_linger_request(osdc, req);
 593		dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
 594		     osd->o_osd);
 595	}
 596}
 597
 598static void kick_osd_requests(struct ceph_osd_client *osdc,
 599			      struct ceph_osd *kickosd)
 600{
 601	mutex_lock(&osdc->request_mutex);
 602	__kick_osd_requests(osdc, kickosd);
 603	mutex_unlock(&osdc->request_mutex);
 604}
 605
 606/*
 607 * If the osd connection drops, we need to resubmit all requests.
 608 */
 609static void osd_reset(struct ceph_connection *con)
 610{
 611	struct ceph_osd *osd = con->private;
 612	struct ceph_osd_client *osdc;
 613
 614	if (!osd)
 615		return;
 616	dout("osd_reset osd%d\n", osd->o_osd);
 617	osdc = osd->o_osdc;
 618	down_read(&osdc->map_sem);
 619	kick_osd_requests(osdc, osd);
 620	send_queued(osdc);
 621	up_read(&osdc->map_sem);
 622}
 623
 624/*
 625 * Track open sessions with osds.
 626 */
 627static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 628{
 629	struct ceph_osd *osd;
 630
 631	osd = kzalloc(sizeof(*osd), GFP_NOFS);
 632	if (!osd)
 633		return NULL;
 634
 635	atomic_set(&osd->o_ref, 1);
 636	osd->o_osdc = osdc;
 637	INIT_LIST_HEAD(&osd->o_requests);
 638	INIT_LIST_HEAD(&osd->o_linger_requests);
 639	INIT_LIST_HEAD(&osd->o_osd_lru);
 640	osd->o_incarnation = 1;
 641
 642	ceph_con_init(osdc->client->msgr, &osd->o_con);
 643	osd->o_con.private = osd;
 644	osd->o_con.ops = &osd_con_ops;
 645	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 646
 647	INIT_LIST_HEAD(&osd->o_keepalive_item);
 648	return osd;
 649}
 650
 651static struct ceph_osd *get_osd(struct ceph_osd *osd)
 652{
 653	if (atomic_inc_not_zero(&osd->o_ref)) {
 654		dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
 655		     atomic_read(&osd->o_ref));
 656		return osd;
 657	} else {
 658		dout("get_osd %p FAIL\n", osd);
 659		return NULL;
 660	}
 661}
 662
 663static void put_osd(struct ceph_osd *osd)
 664{
 665	dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
 666	     atomic_read(&osd->o_ref) - 1);
 667	if (atomic_dec_and_test(&osd->o_ref)) {
 668		struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
 669
 670		if (osd->o_authorizer)
 671			ac->ops->destroy_authorizer(ac, osd->o_authorizer);
 672		kfree(osd);
 673	}
 674}
 675
 676/*
 677 * remove an osd from our map
 678 */
 679static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 680{
 681	dout("__remove_osd %p\n", osd);
 682	BUG_ON(!list_empty(&osd->o_requests));
 683	rb_erase(&osd->o_node, &osdc->osds);
 684	list_del_init(&osd->o_osd_lru);
 685	ceph_con_close(&osd->o_con);
 686	put_osd(osd);
 687}
 688
 689static void remove_all_osds(struct ceph_osd_client *osdc)
 690{
 691	dout("__remove_old_osds %p\n", osdc);
 692	mutex_lock(&osdc->request_mutex);
 693	while (!RB_EMPTY_ROOT(&osdc->osds)) {
 694		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
 695						struct ceph_osd, o_node);
 696		__remove_osd(osdc, osd);
 697	}
 698	mutex_unlock(&osdc->request_mutex);
 699}
 700
 701static void __move_osd_to_lru(struct ceph_osd_client *osdc,
 702			      struct ceph_osd *osd)
 703{
 704	dout("__move_osd_to_lru %p\n", osd);
 705	BUG_ON(!list_empty(&osd->o_osd_lru));
 706	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
 707	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
 708}
 709
 710static void __remove_osd_from_lru(struct ceph_osd *osd)
 711{
 712	dout("__remove_osd_from_lru %p\n", osd);
 713	if (!list_empty(&osd->o_osd_lru))
 714		list_del_init(&osd->o_osd_lru);
 715}
 716
 717static void remove_old_osds(struct ceph_osd_client *osdc)
 718{
 719	struct ceph_osd *osd, *nosd;
 720
 721	dout("__remove_old_osds %p\n", osdc);
 722	mutex_lock(&osdc->request_mutex);
 723	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
 724		if (time_before(jiffies, osd->lru_ttl))
 725			break;
 726		__remove_osd(osdc, osd);
 727	}
 728	mutex_unlock(&osdc->request_mutex);
 729}
 730
 731/*
 732 * reset osd connect
 733 */
 734static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 735{
 736	struct ceph_osd_request *req;
 737	int ret = 0;
 738
 739	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 740	if (list_empty(&osd->o_requests) &&
 741	    list_empty(&osd->o_linger_requests)) {
 742		__remove_osd(osdc, osd);
 743	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
 744			  &osd->o_con.peer_addr,
 745			  sizeof(osd->o_con.peer_addr)) == 0 &&
 746		   !ceph_con_opened(&osd->o_con)) {
 747		dout(" osd addr hasn't changed and connection never opened,"
 748		     " letting msgr retry");
 749		/* touch each r_stamp for handle_timeout()'s benfit */
 750		list_for_each_entry(req, &osd->o_requests, r_osd_item)
 751			req->r_stamp = jiffies;
 752		ret = -EAGAIN;
 753	} else {
 754		ceph_con_close(&osd->o_con);
 755		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
 756		osd->o_incarnation++;
 757	}
 758	return ret;
 759}
 760
 761static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
 762{
 763	struct rb_node **p = &osdc->osds.rb_node;
 764	struct rb_node *parent = NULL;
 765	struct ceph_osd *osd = NULL;
 766
 767	dout("__insert_osd %p osd%d\n", new, new->o_osd);
 768	while (*p) {
 769		parent = *p;
 770		osd = rb_entry(parent, struct ceph_osd, o_node);
 771		if (new->o_osd < osd->o_osd)
 772			p = &(*p)->rb_left;
 773		else if (new->o_osd > osd->o_osd)
 774			p = &(*p)->rb_right;
 775		else
 776			BUG();
 777	}
 778
 779	rb_link_node(&new->o_node, parent, p);
 780	rb_insert_color(&new->o_node, &osdc->osds);
 781}
 782
 783static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
 784{
 785	struct ceph_osd *osd;
 786	struct rb_node *n = osdc->osds.rb_node;
 787
 788	while (n) {
 789		osd = rb_entry(n, struct ceph_osd, o_node);
 790		if (o < osd->o_osd)
 791			n = n->rb_left;
 792		else if (o > osd->o_osd)
 793			n = n->rb_right;
 794		else
 795			return osd;
 796	}
 797	return NULL;
 798}
 799
 800static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
 801{
 802	schedule_delayed_work(&osdc->timeout_work,
 803			osdc->client->options->osd_keepalive_timeout * HZ);
 804}
 805
 806static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
 807{
 808	cancel_delayed_work(&osdc->timeout_work);
 809}
 810
 811/*
 812 * Register request, assign tid.  If this is the first request, set up
 813 * the timeout event.
 814 */
 815static void __register_request(struct ceph_osd_client *osdc,
 816			       struct ceph_osd_request *req)
 817{
 818	req->r_tid = ++osdc->last_tid;
 819	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
 820	dout("__register_request %p tid %lld\n", req, req->r_tid);
 821	__insert_request(osdc, req);
 822	ceph_osdc_get_request(req);
 823	osdc->num_requests++;
 824	if (osdc->num_requests == 1) {
 825		dout(" first request, scheduling timeout\n");
 826		__schedule_osd_timeout(osdc);
 827	}
 828}
 829
 830static void register_request(struct ceph_osd_client *osdc,
 831			     struct ceph_osd_request *req)
 832{
 833	mutex_lock(&osdc->request_mutex);
 834	__register_request(osdc, req);
 835	mutex_unlock(&osdc->request_mutex);
 836}
 837
 838/*
 839 * called under osdc->request_mutex
 840 */
 841static void __unregister_request(struct ceph_osd_client *osdc,
 842				 struct ceph_osd_request *req)
 843{
 
 
 
 
 
 
 844	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 845	rb_erase(&req->r_node, &osdc->requests);
 846	osdc->num_requests--;
 847
 848	if (req->r_osd) {
 849		/* make sure the original request isn't in flight. */
 850		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 851
 852		list_del_init(&req->r_osd_item);
 853		if (list_empty(&req->r_osd->o_requests) &&
 854		    list_empty(&req->r_osd->o_linger_requests)) {
 855			dout("moving osd to %p lru\n", req->r_osd);
 856			__move_osd_to_lru(osdc, req->r_osd);
 857		}
 858		if (list_empty(&req->r_linger_item))
 859			req->r_osd = NULL;
 860	}
 861
 862	ceph_osdc_put_request(req);
 863
 864	list_del_init(&req->r_req_lru_item);
 865	if (osdc->num_requests == 0) {
 866		dout(" no requests, canceling timeout\n");
 867		__cancel_osd_timeout(osdc);
 868	}
 869}
 870
 871/*
 872 * Cancel a previously queued request message
 873 */
 874static void __cancel_request(struct ceph_osd_request *req)
 875{
 876	if (req->r_sent && req->r_osd) {
 877		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 878		req->r_sent = 0;
 879	}
 880}
 881
 882static void __register_linger_request(struct ceph_osd_client *osdc,
 883				    struct ceph_osd_request *req)
 884{
 885	dout("__register_linger_request %p\n", req);
 886	list_add_tail(&req->r_linger_item, &osdc->req_linger);
 887	list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
 888}
 889
 890static void __unregister_linger_request(struct ceph_osd_client *osdc,
 891					struct ceph_osd_request *req)
 892{
 893	dout("__unregister_linger_request %p\n", req);
 894	if (req->r_osd) {
 895		list_del_init(&req->r_linger_item);
 896		list_del_init(&req->r_linger_osd);
 897
 898		if (list_empty(&req->r_osd->o_requests) &&
 899		    list_empty(&req->r_osd->o_linger_requests)) {
 900			dout("moving osd to %p lru\n", req->r_osd);
 901			__move_osd_to_lru(osdc, req->r_osd);
 902		}
 903		if (list_empty(&req->r_osd_item))
 904			req->r_osd = NULL;
 905	}
 906}
 907
 908void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
 909					 struct ceph_osd_request *req)
 910{
 911	mutex_lock(&osdc->request_mutex);
 912	if (req->r_linger) {
 913		__unregister_linger_request(osdc, req);
 914		ceph_osdc_put_request(req);
 915	}
 916	mutex_unlock(&osdc->request_mutex);
 917}
 918EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
 919
 920void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 921				  struct ceph_osd_request *req)
 922{
 923	if (!req->r_linger) {
 924		dout("set_request_linger %p\n", req);
 925		req->r_linger = 1;
 926		/*
 927		 * caller is now responsible for calling
 928		 * unregister_linger_request
 929		 */
 930		ceph_osdc_get_request(req);
 931	}
 932}
 933EXPORT_SYMBOL(ceph_osdc_set_request_linger);
 934
 935/*
 936 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
 937 * (as needed), and set the request r_osd appropriately.  If there is
 938 * no up osd, set r_osd to NULL.  Move the request to the appropriate list
 939 * (unsent, homeless) or leave on in-flight lru.
 940 *
 941 * Return 0 if unchanged, 1 if changed, or negative on error.
 942 *
 943 * Caller should hold map_sem for read and request_mutex.
 944 */
 945static int __map_request(struct ceph_osd_client *osdc,
 946			 struct ceph_osd_request *req)
 947{
 948	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 949	struct ceph_pg pgid;
 950	int acting[CEPH_PG_MAX_SIZE];
 951	int o = -1, num = 0;
 952	int err;
 953
 954	dout("map_request %p tid %lld\n", req, req->r_tid);
 955	err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
 956				      &req->r_file_layout, osdc->osdmap);
 957	if (err) {
 958		list_move(&req->r_req_lru_item, &osdc->req_notarget);
 959		return err;
 960	}
 961	pgid = reqhead->layout.ol_pgid;
 962	req->r_pgid = pgid;
 963
 964	err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
 965	if (err > 0) {
 966		o = acting[0];
 967		num = err;
 968	}
 969
 970	if ((req->r_osd && req->r_osd->o_osd == o &&
 
 971	     req->r_sent >= req->r_osd->o_incarnation &&
 972	     req->r_num_pg_osds == num &&
 973	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
 974	    (req->r_osd == NULL && o == -1))
 975		return 0;  /* no change */
 976
 977	dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
 978	     req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
 979	     req->r_osd ? req->r_osd->o_osd : -1);
 980
 981	/* record full pg acting set */
 982	memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
 983	req->r_num_pg_osds = num;
 984
 985	if (req->r_osd) {
 986		__cancel_request(req);
 987		list_del_init(&req->r_osd_item);
 988		req->r_osd = NULL;
 989	}
 990
 991	req->r_osd = __lookup_osd(osdc, o);
 992	if (!req->r_osd && o >= 0) {
 993		err = -ENOMEM;
 994		req->r_osd = create_osd(osdc);
 995		if (!req->r_osd) {
 996			list_move(&req->r_req_lru_item, &osdc->req_notarget);
 997			goto out;
 998		}
 999
1000		dout("map_request osd %p is osd%d\n", req->r_osd, o);
1001		req->r_osd->o_osd = o;
1002		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
1003		__insert_osd(osdc, req->r_osd);
1004
1005		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
1006	}
1007
1008	if (req->r_osd) {
1009		__remove_osd_from_lru(req->r_osd);
1010		list_add(&req->r_osd_item, &req->r_osd->o_requests);
1011		list_move(&req->r_req_lru_item, &osdc->req_unsent);
1012	} else {
1013		list_move(&req->r_req_lru_item, &osdc->req_notarget);
1014	}
1015	err = 1;   /* osd or pg changed */
1016
1017out:
1018	return err;
1019}
1020
1021/*
1022 * caller should hold map_sem (for read) and request_mutex
1023 */
1024static int __send_request(struct ceph_osd_client *osdc,
1025			  struct ceph_osd_request *req)
1026{
1027	struct ceph_osd_request_head *reqhead;
1028
1029	dout("send_request %p tid %llu to osd%d flags %d\n",
1030	     req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1031
1032	reqhead = req->r_request->front.iov_base;
1033	reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1034	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
1035	reqhead->reassert_version = req->r_reassert_version;
1036
1037	req->r_stamp = jiffies;
1038	list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1039
1040	ceph_msg_get(req->r_request); /* send consumes a ref */
1041	ceph_con_send(&req->r_osd->o_con, req->r_request);
1042	req->r_sent = req->r_osd->o_incarnation;
1043	return 0;
1044}
1045
1046/*
1047 * Send any requests in the queue (req_unsent).
1048 */
1049static void send_queued(struct ceph_osd_client *osdc)
1050{
1051	struct ceph_osd_request *req, *tmp;
1052
1053	dout("send_queued\n");
1054	mutex_lock(&osdc->request_mutex);
1055	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1056		__send_request(osdc, req);
1057	}
1058	mutex_unlock(&osdc->request_mutex);
1059}
1060
1061/*
1062 * Timeout callback, called every N seconds when 1 or more osd
1063 * requests has been active for more than N seconds.  When this
1064 * happens, we ping all OSDs with requests who have timed out to
1065 * ensure any communications channel reset is detected.  Reset the
1066 * request timeouts another N seconds in the future as we go.
1067 * Reschedule the timeout event another N seconds in future (unless
1068 * there are no open requests).
1069 */
1070static void handle_timeout(struct work_struct *work)
1071{
1072	struct ceph_osd_client *osdc =
1073		container_of(work, struct ceph_osd_client, timeout_work.work);
1074	struct ceph_osd_request *req, *last_req = NULL;
1075	struct ceph_osd *osd;
1076	unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1077	unsigned long keepalive =
1078		osdc->client->options->osd_keepalive_timeout * HZ;
1079	unsigned long last_stamp = 0;
1080	struct list_head slow_osds;
1081	dout("timeout\n");
1082	down_read(&osdc->map_sem);
1083
1084	ceph_monc_request_next_osdmap(&osdc->client->monc);
1085
1086	mutex_lock(&osdc->request_mutex);
1087
1088	/*
1089	 * reset osds that appear to be _really_ unresponsive.  this
1090	 * is a failsafe measure.. we really shouldn't be getting to
1091	 * this point if the system is working properly.  the monitors
1092	 * should mark the osd as failed and we should find out about
1093	 * it from an updated osd map.
1094	 */
1095	while (timeout && !list_empty(&osdc->req_lru)) {
1096		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1097				 r_req_lru_item);
1098
1099		/* hasn't been long enough since we sent it? */
1100		if (time_before(jiffies, req->r_stamp + timeout))
1101			break;
1102
1103		/* hasn't been long enough since it was acked? */
1104		if (req->r_request->ack_stamp == 0 ||
1105		    time_before(jiffies, req->r_request->ack_stamp + timeout))
1106			break;
1107
1108		BUG_ON(req == last_req && req->r_stamp == last_stamp);
1109		last_req = req;
1110		last_stamp = req->r_stamp;
1111
1112		osd = req->r_osd;
1113		BUG_ON(!osd);
1114		pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1115			   req->r_tid, osd->o_osd);
1116		__kick_osd_requests(osdc, osd);
1117	}
1118
1119	/*
1120	 * ping osds that are a bit slow.  this ensures that if there
1121	 * is a break in the TCP connection we will notice, and reopen
1122	 * a connection with that osd (from the fault callback).
1123	 */
1124	INIT_LIST_HEAD(&slow_osds);
1125	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1126		if (time_before(jiffies, req->r_stamp + keepalive))
1127			break;
1128
1129		osd = req->r_osd;
1130		BUG_ON(!osd);
1131		dout(" tid %llu is slow, will send keepalive on osd%d\n",
1132		     req->r_tid, osd->o_osd);
1133		list_move_tail(&osd->o_keepalive_item, &slow_osds);
1134	}
1135	while (!list_empty(&slow_osds)) {
1136		osd = list_entry(slow_osds.next, struct ceph_osd,
1137				 o_keepalive_item);
1138		list_del_init(&osd->o_keepalive_item);
1139		ceph_con_keepalive(&osd->o_con);
1140	}
1141
1142	__schedule_osd_timeout(osdc);
1143	mutex_unlock(&osdc->request_mutex);
1144	send_queued(osdc);
1145	up_read(&osdc->map_sem);
1146}
1147
1148static void handle_osds_timeout(struct work_struct *work)
1149{
1150	struct ceph_osd_client *osdc =
1151		container_of(work, struct ceph_osd_client,
1152			     osds_timeout_work.work);
1153	unsigned long delay =
1154		osdc->client->options->osd_idle_ttl * HZ >> 2;
1155
1156	dout("osds timeout\n");
1157	down_read(&osdc->map_sem);
1158	remove_old_osds(osdc);
1159	up_read(&osdc->map_sem);
1160
1161	schedule_delayed_work(&osdc->osds_timeout_work,
1162			      round_jiffies_relative(delay));
1163}
1164
1165static void complete_request(struct ceph_osd_request *req)
1166{
1167	if (req->r_safe_callback)
1168		req->r_safe_callback(req, NULL);
1169	complete_all(&req->r_safe_completion);  /* fsync waiter */
1170}
1171
1172/*
1173 * handle osd op reply.  either call the callback if it is specified,
1174 * or do the completion to wake up the waiting thread.
1175 */
1176static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1177			 struct ceph_connection *con)
1178{
1179	struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1180	struct ceph_osd_request *req;
1181	u64 tid;
1182	int numops, object_len, flags;
1183	s32 result;
1184
1185	tid = le64_to_cpu(msg->hdr.tid);
1186	if (msg->front.iov_len < sizeof(*rhead))
1187		goto bad;
1188	numops = le32_to_cpu(rhead->num_ops);
1189	object_len = le32_to_cpu(rhead->object_len);
1190	result = le32_to_cpu(rhead->result);
1191	if (msg->front.iov_len != sizeof(*rhead) + object_len +
1192	    numops * sizeof(struct ceph_osd_op))
1193		goto bad;
1194	dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1195	/* lookup */
1196	mutex_lock(&osdc->request_mutex);
1197	req = __lookup_request(osdc, tid);
1198	if (req == NULL) {
1199		dout("handle_reply tid %llu dne\n", tid);
1200		mutex_unlock(&osdc->request_mutex);
1201		return;
1202	}
1203	ceph_osdc_get_request(req);
1204	flags = le32_to_cpu(rhead->flags);
1205
1206	/*
1207	 * if this connection filled our message, drop our reference now, to
1208	 * avoid a (safe but slower) revoke later.
1209	 */
1210	if (req->r_con_filling_msg == con && req->r_reply == msg) {
1211		dout(" dropping con_filling_msg ref %p\n", con);
1212		req->r_con_filling_msg = NULL;
1213		ceph_con_put(con);
1214	}
1215
1216	if (!req->r_got_reply) {
1217		unsigned bytes;
1218
1219		req->r_result = le32_to_cpu(rhead->result);
1220		bytes = le32_to_cpu(msg->hdr.data_len);
1221		dout("handle_reply result %d bytes %d\n", req->r_result,
1222		     bytes);
1223		if (req->r_result == 0)
1224			req->r_result = bytes;
1225
1226		/* in case this is a write and we need to replay, */
1227		req->r_reassert_version = rhead->reassert_version;
1228
1229		req->r_got_reply = 1;
1230	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1231		dout("handle_reply tid %llu dup ack\n", tid);
1232		mutex_unlock(&osdc->request_mutex);
1233		goto done;
1234	}
1235
1236	dout("handle_reply tid %llu flags %d\n", tid, flags);
1237
1238	if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1239		__register_linger_request(osdc, req);
1240
1241	/* either this is a read, or we got the safe response */
1242	if (result < 0 ||
1243	    (flags & CEPH_OSD_FLAG_ONDISK) ||
1244	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1245		__unregister_request(osdc, req);
1246
1247	mutex_unlock(&osdc->request_mutex);
1248
1249	if (req->r_callback)
1250		req->r_callback(req, msg);
1251	else
1252		complete_all(&req->r_completion);
1253
1254	if (flags & CEPH_OSD_FLAG_ONDISK)
1255		complete_request(req);
1256
1257done:
1258	dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1259	ceph_osdc_put_request(req);
1260	return;
1261
1262bad:
1263	pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1264	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1265	       (int)sizeof(*rhead));
1266	ceph_msg_dump(msg);
1267}
1268
1269static void reset_changed_osds(struct ceph_osd_client *osdc)
1270{
1271	struct rb_node *p, *n;
1272
1273	for (p = rb_first(&osdc->osds); p; p = n) {
1274		struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1275
1276		n = rb_next(p);
1277		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1278		    memcmp(&osd->o_con.peer_addr,
1279			   ceph_osd_addr(osdc->osdmap,
1280					 osd->o_osd),
1281			   sizeof(struct ceph_entity_addr)) != 0)
1282			__reset_osd(osdc, osd);
1283	}
1284}
1285
1286/*
1287 * Requeue requests whose mapping to an OSD has changed.  If requests map to
1288 * no osd, request a new map.
1289 *
1290 * Caller should hold map_sem for read and request_mutex.
1291 */
1292static void kick_requests(struct ceph_osd_client *osdc)
1293{
1294	struct ceph_osd_request *req, *nreq;
1295	struct rb_node *p;
1296	int needmap = 0;
1297	int err;
1298
1299	dout("kick_requests\n");
1300	mutex_lock(&osdc->request_mutex);
1301	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1302		req = rb_entry(p, struct ceph_osd_request, r_node);
1303		err = __map_request(osdc, req);
1304		if (err < 0)
1305			continue;  /* error */
1306		if (req->r_osd == NULL) {
1307			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1308			needmap++;  /* request a newer map */
1309		} else if (err > 0) {
1310			dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1311			     req->r_osd ? req->r_osd->o_osd : -1);
1312			if (!req->r_linger)
1313				req->r_flags |= CEPH_OSD_FLAG_RETRY;
1314		}
1315	}
1316
1317	list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1318				 r_linger_item) {
1319		dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1320
1321		err = __map_request(osdc, req);
1322		if (err == 0)
1323			continue;  /* no change and no osd was specified */
1324		if (err < 0)
1325			continue;  /* hrm! */
1326		if (req->r_osd == NULL) {
1327			dout("tid %llu maps to no valid osd\n", req->r_tid);
1328			needmap++;  /* request a newer map */
1329			continue;
1330		}
1331
1332		dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1333		     req->r_osd ? req->r_osd->o_osd : -1);
1334		__unregister_linger_request(osdc, req);
1335		__register_request(osdc, req);
1336	}
1337	mutex_unlock(&osdc->request_mutex);
1338
1339	if (needmap) {
1340		dout("%d requests for down osds, need new map\n", needmap);
1341		ceph_monc_request_next_osdmap(&osdc->client->monc);
1342	}
1343}
1344
1345
1346/*
1347 * Process updated osd map.
1348 *
1349 * The message contains any number of incremental and full maps, normally
1350 * indicating some sort of topology change in the cluster.  Kick requests
1351 * off to different OSDs as needed.
1352 */
1353void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1354{
1355	void *p, *end, *next;
1356	u32 nr_maps, maplen;
1357	u32 epoch;
1358	struct ceph_osdmap *newmap = NULL, *oldmap;
1359	int err;
1360	struct ceph_fsid fsid;
1361
1362	dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1363	p = msg->front.iov_base;
1364	end = p + msg->front.iov_len;
1365
1366	/* verify fsid */
1367	ceph_decode_need(&p, end, sizeof(fsid), bad);
1368	ceph_decode_copy(&p, &fsid, sizeof(fsid));
1369	if (ceph_check_fsid(osdc->client, &fsid) < 0)
1370		return;
1371
1372	down_write(&osdc->map_sem);
1373
1374	/* incremental maps */
1375	ceph_decode_32_safe(&p, end, nr_maps, bad);
1376	dout(" %d inc maps\n", nr_maps);
1377	while (nr_maps > 0) {
1378		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1379		epoch = ceph_decode_32(&p);
1380		maplen = ceph_decode_32(&p);
1381		ceph_decode_need(&p, end, maplen, bad);
1382		next = p + maplen;
1383		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1384			dout("applying incremental map %u len %d\n",
1385			     epoch, maplen);
1386			newmap = osdmap_apply_incremental(&p, next,
1387							  osdc->osdmap,
1388							  osdc->client->msgr);
1389			if (IS_ERR(newmap)) {
1390				err = PTR_ERR(newmap);
1391				goto bad;
1392			}
1393			BUG_ON(!newmap);
1394			if (newmap != osdc->osdmap) {
1395				ceph_osdmap_destroy(osdc->osdmap);
1396				osdc->osdmap = newmap;
1397			}
1398			kick_requests(osdc);
1399			reset_changed_osds(osdc);
1400		} else {
1401			dout("ignoring incremental map %u len %d\n",
1402			     epoch, maplen);
1403		}
1404		p = next;
1405		nr_maps--;
1406	}
1407	if (newmap)
1408		goto done;
1409
1410	/* full maps */
1411	ceph_decode_32_safe(&p, end, nr_maps, bad);
1412	dout(" %d full maps\n", nr_maps);
1413	while (nr_maps) {
1414		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1415		epoch = ceph_decode_32(&p);
1416		maplen = ceph_decode_32(&p);
1417		ceph_decode_need(&p, end, maplen, bad);
1418		if (nr_maps > 1) {
1419			dout("skipping non-latest full map %u len %d\n",
1420			     epoch, maplen);
1421		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1422			dout("skipping full map %u len %d, "
1423			     "older than our %u\n", epoch, maplen,
1424			     osdc->osdmap->epoch);
1425		} else {
 
 
1426			dout("taking full map %u len %d\n", epoch, maplen);
1427			newmap = osdmap_decode(&p, p+maplen);
1428			if (IS_ERR(newmap)) {
1429				err = PTR_ERR(newmap);
1430				goto bad;
1431			}
1432			BUG_ON(!newmap);
1433			oldmap = osdc->osdmap;
1434			osdc->osdmap = newmap;
1435			if (oldmap)
 
 
1436				ceph_osdmap_destroy(oldmap);
1437			kick_requests(osdc);
 
1438		}
1439		p += maplen;
1440		nr_maps--;
1441	}
1442
1443done:
1444	downgrade_write(&osdc->map_sem);
1445	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1446
1447	/*
1448	 * subscribe to subsequent osdmap updates if full to ensure
1449	 * we find out when we are no longer full and stop returning
1450	 * ENOSPC.
1451	 */
1452	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1453		ceph_monc_request_next_osdmap(&osdc->client->monc);
1454
1455	send_queued(osdc);
1456	up_read(&osdc->map_sem);
1457	wake_up_all(&osdc->client->auth_wq);
1458	return;
1459
1460bad:
1461	pr_err("osdc handle_map corrupt msg\n");
1462	ceph_msg_dump(msg);
1463	up_write(&osdc->map_sem);
1464	return;
1465}
1466
1467/*
1468 * watch/notify callback event infrastructure
1469 *
1470 * These callbacks are used both for watch and notify operations.
1471 */
1472static void __release_event(struct kref *kref)
1473{
1474	struct ceph_osd_event *event =
1475		container_of(kref, struct ceph_osd_event, kref);
1476
1477	dout("__release_event %p\n", event);
1478	kfree(event);
1479}
1480
1481static void get_event(struct ceph_osd_event *event)
1482{
1483	kref_get(&event->kref);
1484}
1485
1486void ceph_osdc_put_event(struct ceph_osd_event *event)
1487{
1488	kref_put(&event->kref, __release_event);
1489}
1490EXPORT_SYMBOL(ceph_osdc_put_event);
1491
1492static void __insert_event(struct ceph_osd_client *osdc,
1493			     struct ceph_osd_event *new)
1494{
1495	struct rb_node **p = &osdc->event_tree.rb_node;
1496	struct rb_node *parent = NULL;
1497	struct ceph_osd_event *event = NULL;
1498
1499	while (*p) {
1500		parent = *p;
1501		event = rb_entry(parent, struct ceph_osd_event, node);
1502		if (new->cookie < event->cookie)
1503			p = &(*p)->rb_left;
1504		else if (new->cookie > event->cookie)
1505			p = &(*p)->rb_right;
1506		else
1507			BUG();
1508	}
1509
1510	rb_link_node(&new->node, parent, p);
1511	rb_insert_color(&new->node, &osdc->event_tree);
1512}
1513
1514static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1515					        u64 cookie)
1516{
1517	struct rb_node **p = &osdc->event_tree.rb_node;
1518	struct rb_node *parent = NULL;
1519	struct ceph_osd_event *event = NULL;
1520
1521	while (*p) {
1522		parent = *p;
1523		event = rb_entry(parent, struct ceph_osd_event, node);
1524		if (cookie < event->cookie)
1525			p = &(*p)->rb_left;
1526		else if (cookie > event->cookie)
1527			p = &(*p)->rb_right;
1528		else
1529			return event;
1530	}
1531	return NULL;
1532}
1533
1534static void __remove_event(struct ceph_osd_event *event)
1535{
1536	struct ceph_osd_client *osdc = event->osdc;
1537
1538	if (!RB_EMPTY_NODE(&event->node)) {
1539		dout("__remove_event removed %p\n", event);
1540		rb_erase(&event->node, &osdc->event_tree);
1541		ceph_osdc_put_event(event);
1542	} else {
1543		dout("__remove_event didn't remove %p\n", event);
1544	}
1545}
1546
1547int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1548			   void (*event_cb)(u64, u64, u8, void *),
1549			   int one_shot, void *data,
1550			   struct ceph_osd_event **pevent)
1551{
1552	struct ceph_osd_event *event;
1553
1554	event = kmalloc(sizeof(*event), GFP_NOIO);
1555	if (!event)
1556		return -ENOMEM;
1557
1558	dout("create_event %p\n", event);
1559	event->cb = event_cb;
1560	event->one_shot = one_shot;
1561	event->data = data;
1562	event->osdc = osdc;
1563	INIT_LIST_HEAD(&event->osd_node);
1564	kref_init(&event->kref);   /* one ref for us */
1565	kref_get(&event->kref);    /* one ref for the caller */
1566	init_completion(&event->completion);
1567
1568	spin_lock(&osdc->event_lock);
1569	event->cookie = ++osdc->event_count;
1570	__insert_event(osdc, event);
1571	spin_unlock(&osdc->event_lock);
1572
1573	*pevent = event;
1574	return 0;
1575}
1576EXPORT_SYMBOL(ceph_osdc_create_event);
1577
1578void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1579{
1580	struct ceph_osd_client *osdc = event->osdc;
1581
1582	dout("cancel_event %p\n", event);
1583	spin_lock(&osdc->event_lock);
1584	__remove_event(event);
1585	spin_unlock(&osdc->event_lock);
1586	ceph_osdc_put_event(event); /* caller's */
1587}
1588EXPORT_SYMBOL(ceph_osdc_cancel_event);
1589
1590
1591static void do_event_work(struct work_struct *work)
1592{
1593	struct ceph_osd_event_work *event_work =
1594		container_of(work, struct ceph_osd_event_work, work);
1595	struct ceph_osd_event *event = event_work->event;
1596	u64 ver = event_work->ver;
1597	u64 notify_id = event_work->notify_id;
1598	u8 opcode = event_work->opcode;
1599
1600	dout("do_event_work completing %p\n", event);
1601	event->cb(ver, notify_id, opcode, event->data);
1602	complete(&event->completion);
1603	dout("do_event_work completed %p\n", event);
1604	ceph_osdc_put_event(event);
1605	kfree(event_work);
1606}
1607
1608
1609/*
1610 * Process osd watch notifications
1611 */
1612void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1613{
1614	void *p, *end;
1615	u8 proto_ver;
1616	u64 cookie, ver, notify_id;
1617	u8 opcode;
1618	struct ceph_osd_event *event;
1619	struct ceph_osd_event_work *event_work;
1620
1621	p = msg->front.iov_base;
1622	end = p + msg->front.iov_len;
1623
1624	ceph_decode_8_safe(&p, end, proto_ver, bad);
1625	ceph_decode_8_safe(&p, end, opcode, bad);
1626	ceph_decode_64_safe(&p, end, cookie, bad);
1627	ceph_decode_64_safe(&p, end, ver, bad);
1628	ceph_decode_64_safe(&p, end, notify_id, bad);
1629
1630	spin_lock(&osdc->event_lock);
1631	event = __find_event(osdc, cookie);
1632	if (event) {
1633		get_event(event);
1634		if (event->one_shot)
1635			__remove_event(event);
1636	}
1637	spin_unlock(&osdc->event_lock);
1638	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1639	     cookie, ver, event);
1640	if (event) {
1641		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1642		if (!event_work) {
1643			dout("ERROR: could not allocate event_work\n");
1644			goto done_err;
1645		}
1646		INIT_WORK(&event_work->work, do_event_work);
1647		event_work->event = event;
1648		event_work->ver = ver;
1649		event_work->notify_id = notify_id;
1650		event_work->opcode = opcode;
1651		if (!queue_work(osdc->notify_wq, &event_work->work)) {
1652			dout("WARNING: failed to queue notify event work\n");
1653			goto done_err;
1654		}
1655	}
1656
1657	return;
1658
1659done_err:
1660	complete(&event->completion);
1661	ceph_osdc_put_event(event);
1662	return;
1663
1664bad:
1665	pr_err("osdc handle_watch_notify corrupt msg\n");
1666	return;
1667}
1668
1669int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1670{
1671	int err;
1672
1673	dout("wait_event %p\n", event);
1674	err = wait_for_completion_interruptible_timeout(&event->completion,
1675							timeout * HZ);
1676	ceph_osdc_put_event(event);
1677	if (err > 0)
1678		err = 0;
1679	dout("wait_event %p returns %d\n", event, err);
1680	return err;
1681}
1682EXPORT_SYMBOL(ceph_osdc_wait_event);
1683
1684/*
1685 * Register request, send initial attempt.
1686 */
1687int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1688			    struct ceph_osd_request *req,
1689			    bool nofail)
1690{
1691	int rc = 0;
1692
1693	req->r_request->pages = req->r_pages;
1694	req->r_request->nr_pages = req->r_num_pages;
1695#ifdef CONFIG_BLOCK
1696	req->r_request->bio = req->r_bio;
1697#endif
1698	req->r_request->trail = req->r_trail;
1699
1700	register_request(osdc, req);
1701
1702	down_read(&osdc->map_sem);
1703	mutex_lock(&osdc->request_mutex);
1704	/*
1705	 * a racing kick_requests() may have sent the message for us
1706	 * while we dropped request_mutex above, so only send now if
1707	 * the request still han't been touched yet.
1708	 */
1709	if (req->r_sent == 0) {
1710		rc = __map_request(osdc, req);
1711		if (rc < 0) {
1712			if (nofail) {
1713				dout("osdc_start_request failed map, "
1714				     " will retry %lld\n", req->r_tid);
1715				rc = 0;
1716			}
1717			goto out_unlock;
1718		}
1719		if (req->r_osd == NULL) {
1720			dout("send_request %p no up osds in pg\n", req);
1721			ceph_monc_request_next_osdmap(&osdc->client->monc);
1722		} else {
1723			rc = __send_request(osdc, req);
1724			if (rc) {
1725				if (nofail) {
1726					dout("osdc_start_request failed send, "
1727					     " will retry %lld\n", req->r_tid);
1728					rc = 0;
1729				} else {
1730					__unregister_request(osdc, req);
1731				}
1732			}
1733		}
 
1734	}
1735
1736out_unlock:
1737	mutex_unlock(&osdc->request_mutex);
1738	up_read(&osdc->map_sem);
1739	return rc;
1740}
1741EXPORT_SYMBOL(ceph_osdc_start_request);
1742
1743/*
1744 * wait for a request to complete
1745 */
1746int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1747			   struct ceph_osd_request *req)
1748{
1749	int rc;
1750
1751	rc = wait_for_completion_interruptible(&req->r_completion);
1752	if (rc < 0) {
1753		mutex_lock(&osdc->request_mutex);
1754		__cancel_request(req);
1755		__unregister_request(osdc, req);
1756		mutex_unlock(&osdc->request_mutex);
1757		complete_request(req);
1758		dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1759		return rc;
1760	}
1761
1762	dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1763	return req->r_result;
1764}
1765EXPORT_SYMBOL(ceph_osdc_wait_request);
1766
1767/*
1768 * sync - wait for all in-flight requests to flush.  avoid starvation.
1769 */
1770void ceph_osdc_sync(struct ceph_osd_client *osdc)
1771{
1772	struct ceph_osd_request *req;
1773	u64 last_tid, next_tid = 0;
1774
1775	mutex_lock(&osdc->request_mutex);
1776	last_tid = osdc->last_tid;
1777	while (1) {
1778		req = __lookup_request_ge(osdc, next_tid);
1779		if (!req)
1780			break;
1781		if (req->r_tid > last_tid)
1782			break;
1783
1784		next_tid = req->r_tid + 1;
1785		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1786			continue;
1787
1788		ceph_osdc_get_request(req);
1789		mutex_unlock(&osdc->request_mutex);
1790		dout("sync waiting on tid %llu (last is %llu)\n",
1791		     req->r_tid, last_tid);
1792		wait_for_completion(&req->r_safe_completion);
1793		mutex_lock(&osdc->request_mutex);
1794		ceph_osdc_put_request(req);
1795	}
1796	mutex_unlock(&osdc->request_mutex);
1797	dout("sync done (thru tid %llu)\n", last_tid);
1798}
1799EXPORT_SYMBOL(ceph_osdc_sync);
1800
1801/*
1802 * init, shutdown
1803 */
1804int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1805{
1806	int err;
1807
1808	dout("init\n");
1809	osdc->client = client;
1810	osdc->osdmap = NULL;
1811	init_rwsem(&osdc->map_sem);
1812	init_completion(&osdc->map_waiters);
1813	osdc->last_requested_map = 0;
1814	mutex_init(&osdc->request_mutex);
1815	osdc->last_tid = 0;
1816	osdc->osds = RB_ROOT;
1817	INIT_LIST_HEAD(&osdc->osd_lru);
1818	osdc->requests = RB_ROOT;
1819	INIT_LIST_HEAD(&osdc->req_lru);
1820	INIT_LIST_HEAD(&osdc->req_unsent);
1821	INIT_LIST_HEAD(&osdc->req_notarget);
1822	INIT_LIST_HEAD(&osdc->req_linger);
1823	osdc->num_requests = 0;
1824	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1825	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1826	spin_lock_init(&osdc->event_lock);
1827	osdc->event_tree = RB_ROOT;
1828	osdc->event_count = 0;
1829
1830	schedule_delayed_work(&osdc->osds_timeout_work,
1831	   round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1832
1833	err = -ENOMEM;
1834	osdc->req_mempool = mempool_create_kmalloc_pool(10,
1835					sizeof(struct ceph_osd_request));
1836	if (!osdc->req_mempool)
1837		goto out;
1838
1839	err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1840				"osd_op");
1841	if (err < 0)
1842		goto out_mempool;
1843	err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1844				OSD_OPREPLY_FRONT_LEN, 10, true,
1845				"osd_op_reply");
1846	if (err < 0)
1847		goto out_msgpool;
1848
1849	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1850	if (IS_ERR(osdc->notify_wq)) {
1851		err = PTR_ERR(osdc->notify_wq);
1852		osdc->notify_wq = NULL;
1853		goto out_msgpool;
1854	}
1855	return 0;
1856
1857out_msgpool:
1858	ceph_msgpool_destroy(&osdc->msgpool_op);
1859out_mempool:
1860	mempool_destroy(osdc->req_mempool);
1861out:
1862	return err;
1863}
1864EXPORT_SYMBOL(ceph_osdc_init);
1865
1866void ceph_osdc_stop(struct ceph_osd_client *osdc)
1867{
1868	flush_workqueue(osdc->notify_wq);
1869	destroy_workqueue(osdc->notify_wq);
1870	cancel_delayed_work_sync(&osdc->timeout_work);
1871	cancel_delayed_work_sync(&osdc->osds_timeout_work);
1872	if (osdc->osdmap) {
1873		ceph_osdmap_destroy(osdc->osdmap);
1874		osdc->osdmap = NULL;
1875	}
1876	remove_all_osds(osdc);
1877	mempool_destroy(osdc->req_mempool);
1878	ceph_msgpool_destroy(&osdc->msgpool_op);
1879	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1880}
1881EXPORT_SYMBOL(ceph_osdc_stop);
1882
1883/*
1884 * Read some contiguous pages.  If we cross a stripe boundary, shorten
1885 * *plen.  Return number of bytes read, or error.
1886 */
1887int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1888			struct ceph_vino vino, struct ceph_file_layout *layout,
1889			u64 off, u64 *plen,
1890			u32 truncate_seq, u64 truncate_size,
1891			struct page **pages, int num_pages, int page_align)
1892{
1893	struct ceph_osd_request *req;
1894	int rc = 0;
1895
1896	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1897	     vino.snap, off, *plen);
1898	req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1899				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1900				    NULL, 0, truncate_seq, truncate_size, NULL,
1901				    false, 1, page_align);
1902	if (!req)
1903		return -ENOMEM;
1904
1905	/* it may be a short read due to an object boundary */
1906	req->r_pages = pages;
1907
1908	dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1909	     off, *plen, req->r_num_pages, page_align);
1910
1911	rc = ceph_osdc_start_request(osdc, req, false);
1912	if (!rc)
1913		rc = ceph_osdc_wait_request(osdc, req);
1914
1915	ceph_osdc_put_request(req);
1916	dout("readpages result %d\n", rc);
1917	return rc;
1918}
1919EXPORT_SYMBOL(ceph_osdc_readpages);
1920
1921/*
1922 * do a synchronous write on N pages
1923 */
1924int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1925			 struct ceph_file_layout *layout,
1926			 struct ceph_snap_context *snapc,
1927			 u64 off, u64 len,
1928			 u32 truncate_seq, u64 truncate_size,
1929			 struct timespec *mtime,
1930			 struct page **pages, int num_pages,
1931			 int flags, int do_sync, bool nofail)
1932{
1933	struct ceph_osd_request *req;
1934	int rc = 0;
1935	int page_align = off & ~PAGE_MASK;
1936
1937	BUG_ON(vino.snap != CEPH_NOSNAP);
1938	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1939				    CEPH_OSD_OP_WRITE,
1940				    flags | CEPH_OSD_FLAG_ONDISK |
1941					    CEPH_OSD_FLAG_WRITE,
1942				    snapc, do_sync,
1943				    truncate_seq, truncate_size, mtime,
1944				    nofail, 1, page_align);
1945	if (!req)
1946		return -ENOMEM;
1947
1948	/* it may be a short write due to an object boundary */
1949	req->r_pages = pages;
1950	dout("writepages %llu~%llu (%d pages)\n", off, len,
1951	     req->r_num_pages);
1952
1953	rc = ceph_osdc_start_request(osdc, req, nofail);
1954	if (!rc)
1955		rc = ceph_osdc_wait_request(osdc, req);
1956
1957	ceph_osdc_put_request(req);
1958	if (rc == 0)
1959		rc = len;
1960	dout("writepages result %d\n", rc);
1961	return rc;
1962}
1963EXPORT_SYMBOL(ceph_osdc_writepages);
1964
1965/*
1966 * handle incoming message
1967 */
1968static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1969{
1970	struct ceph_osd *osd = con->private;
1971	struct ceph_osd_client *osdc;
1972	int type = le16_to_cpu(msg->hdr.type);
1973
1974	if (!osd)
1975		goto out;
1976	osdc = osd->o_osdc;
1977
1978	switch (type) {
1979	case CEPH_MSG_OSD_MAP:
1980		ceph_osdc_handle_map(osdc, msg);
1981		break;
1982	case CEPH_MSG_OSD_OPREPLY:
1983		handle_reply(osdc, msg, con);
1984		break;
1985	case CEPH_MSG_WATCH_NOTIFY:
1986		handle_watch_notify(osdc, msg);
1987		break;
1988
1989	default:
1990		pr_err("received unknown message type %d %s\n", type,
1991		       ceph_msg_type_name(type));
1992	}
1993out:
1994	ceph_msg_put(msg);
1995}
1996
1997/*
1998 * lookup and return message for incoming reply.  set up reply message
1999 * pages.
2000 */
2001static struct ceph_msg *get_reply(struct ceph_connection *con,
2002				  struct ceph_msg_header *hdr,
2003				  int *skip)
2004{
2005	struct ceph_osd *osd = con->private;
2006	struct ceph_osd_client *osdc = osd->o_osdc;
2007	struct ceph_msg *m;
2008	struct ceph_osd_request *req;
2009	int front = le32_to_cpu(hdr->front_len);
2010	int data_len = le32_to_cpu(hdr->data_len);
2011	u64 tid;
2012
2013	tid = le64_to_cpu(hdr->tid);
2014	mutex_lock(&osdc->request_mutex);
2015	req = __lookup_request(osdc, tid);
2016	if (!req) {
2017		*skip = 1;
2018		m = NULL;
2019		pr_info("get_reply unknown tid %llu from osd%d\n", tid,
2020			osd->o_osd);
2021		goto out;
2022	}
2023
2024	if (req->r_con_filling_msg) {
2025		dout("get_reply revoking msg %p from old con %p\n",
2026		     req->r_reply, req->r_con_filling_msg);
2027		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
2028		ceph_con_put(req->r_con_filling_msg);
2029		req->r_con_filling_msg = NULL;
2030	}
2031
2032	if (front > req->r_reply->front.iov_len) {
2033		pr_warning("get_reply front %d > preallocated %d\n",
2034			   front, (int)req->r_reply->front.iov_len);
2035		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
2036		if (!m)
2037			goto out;
2038		ceph_msg_put(req->r_reply);
2039		req->r_reply = m;
2040	}
2041	m = ceph_msg_get(req->r_reply);
2042
2043	if (data_len > 0) {
2044		int want = calc_pages_for(req->r_page_alignment, data_len);
2045
2046		if (unlikely(req->r_num_pages < want)) {
2047			pr_warning("tid %lld reply has %d bytes %d pages, we"
2048				   " had only %d pages ready\n", tid, data_len,
2049				   want, req->r_num_pages);
2050			*skip = 1;
2051			ceph_msg_put(m);
2052			m = NULL;
2053			goto out;
2054		}
2055		m->pages = req->r_pages;
2056		m->nr_pages = req->r_num_pages;
2057		m->page_alignment = req->r_page_alignment;
2058#ifdef CONFIG_BLOCK
2059		m->bio = req->r_bio;
2060#endif
2061	}
2062	*skip = 0;
2063	req->r_con_filling_msg = ceph_con_get(con);
2064	dout("get_reply tid %lld %p\n", tid, m);
2065
2066out:
2067	mutex_unlock(&osdc->request_mutex);
2068	return m;
2069
2070}
2071
2072static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2073				  struct ceph_msg_header *hdr,
2074				  int *skip)
2075{
2076	struct ceph_osd *osd = con->private;
2077	int type = le16_to_cpu(hdr->type);
2078	int front = le32_to_cpu(hdr->front_len);
2079
2080	switch (type) {
2081	case CEPH_MSG_OSD_MAP:
2082	case CEPH_MSG_WATCH_NOTIFY:
2083		return ceph_msg_new(type, front, GFP_NOFS);
2084	case CEPH_MSG_OSD_OPREPLY:
2085		return get_reply(con, hdr, skip);
2086	default:
2087		pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2088			osd->o_osd);
2089		*skip = 1;
2090		return NULL;
2091	}
2092}
2093
2094/*
2095 * Wrappers to refcount containing ceph_osd struct
2096 */
2097static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2098{
2099	struct ceph_osd *osd = con->private;
2100	if (get_osd(osd))
2101		return con;
2102	return NULL;
2103}
2104
2105static void put_osd_con(struct ceph_connection *con)
2106{
2107	struct ceph_osd *osd = con->private;
2108	put_osd(osd);
2109}
2110
2111/*
2112 * authentication
2113 */
2114static int get_authorizer(struct ceph_connection *con,
2115			  void **buf, int *len, int *proto,
2116			  void **reply_buf, int *reply_len, int force_new)
 
 
 
2117{
2118	struct ceph_osd *o = con->private;
2119	struct ceph_osd_client *osdc = o->o_osdc;
2120	struct ceph_auth_client *ac = osdc->client->monc.auth;
2121	int ret = 0;
2122
2123	if (force_new && o->o_authorizer) {
2124		ac->ops->destroy_authorizer(ac, o->o_authorizer);
2125		o->o_authorizer = NULL;
2126	}
2127	if (o->o_authorizer == NULL) {
2128		ret = ac->ops->create_authorizer(
2129			ac, CEPH_ENTITY_TYPE_OSD,
2130			&o->o_authorizer,
2131			&o->o_authorizer_buf,
2132			&o->o_authorizer_buf_len,
2133			&o->o_authorizer_reply_buf,
2134			&o->o_authorizer_reply_buf_len);
2135		if (ret)
2136			return ret;
2137	}
 
2138
2139	*proto = ac->protocol;
2140	*buf = o->o_authorizer_buf;
2141	*len = o->o_authorizer_buf_len;
2142	*reply_buf = o->o_authorizer_reply_buf;
2143	*reply_len = o->o_authorizer_reply_buf_len;
2144	return 0;
2145}
2146
2147
2148static int verify_authorizer_reply(struct ceph_connection *con, int len)
2149{
2150	struct ceph_osd *o = con->private;
2151	struct ceph_osd_client *osdc = o->o_osdc;
2152	struct ceph_auth_client *ac = osdc->client->monc.auth;
2153
2154	return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
 
 
 
 
2155}
2156
2157static int invalidate_authorizer(struct ceph_connection *con)
2158{
2159	struct ceph_osd *o = con->private;
2160	struct ceph_osd_client *osdc = o->o_osdc;
2161	struct ceph_auth_client *ac = osdc->client->monc.auth;
2162
2163	if (ac->ops->invalidate_authorizer)
2164		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2165
2166	return ceph_monc_validate_auth(&osdc->client->monc);
2167}
2168
2169static const struct ceph_connection_operations osd_con_ops = {
2170	.get = get_osd_con,
2171	.put = put_osd_con,
2172	.dispatch = dispatch,
2173	.get_authorizer = get_authorizer,
2174	.verify_authorizer_reply = verify_authorizer_reply,
2175	.invalidate_authorizer = invalidate_authorizer,
2176	.alloc_msg = alloc_msg,
2177	.fault = osd_reset,
2178};