Linux Audio

Check our new training course

Loading...
   1/*
   2   rbd.c -- Export ceph rados objects as a Linux block device
   3
   4
   5   based on drivers/block/osdblk.c:
   6
   7   Copyright 2009 Red Hat, Inc.
   8
   9   This program is free software; you can redistribute it and/or modify
  10   it under the terms of the GNU General Public License as published by
  11   the Free Software Foundation.
  12
  13   This program is distributed in the hope that it will be useful,
  14   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16   GNU General Public License for more details.
  17
  18   You should have received a copy of the GNU General Public License
  19   along with this program; see the file COPYING.  If not, write to
  20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  21
  22
  23
  24   For usage instructions, please refer to:
  25
  26                 Documentation/ABI/testing/sysfs-bus-rbd
  27
  28 */
  29
  30#include <linux/ceph/libceph.h>
  31#include <linux/ceph/osd_client.h>
  32#include <linux/ceph/mon_client.h>
  33#include <linux/ceph/decode.h>
  34#include <linux/parser.h>
  35
  36#include <linux/kernel.h>
  37#include <linux/device.h>
  38#include <linux/module.h>
  39#include <linux/fs.h>
  40#include <linux/blkdev.h>
  41
  42#include "rbd_types.h"
  43
  44/*
  45 * The basic unit of block I/O is a sector.  It is interpreted in a
  46 * number of contexts in Linux (blk, bio, genhd), but the default is
  47 * universally 512 bytes.  These symbols are just slightly more
  48 * meaningful than the bare numbers they represent.
  49 */
  50#define	SECTOR_SHIFT	9
  51#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
  52
  53#define RBD_DRV_NAME "rbd"
  54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
  55
  56#define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
  57
  58#define RBD_MAX_MD_NAME_LEN	(RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
  59#define RBD_MAX_POOL_NAME_LEN	64
  60#define RBD_MAX_SNAP_NAME_LEN	32
  61#define RBD_MAX_OPT_LEN		1024
  62
  63#define RBD_SNAP_HEAD_NAME	"-"
  64
  65/*
  66 * An RBD device name will be "rbd#", where the "rbd" comes from
  67 * RBD_DRV_NAME above, and # is a unique integer identifier.
  68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
  69 * enough to hold all possible device names.
  70 */
  71#define DEV_NAME_LEN		32
  72#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
  73
  74#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
  75
  76/*
  77 * block device image metadata (in-memory version)
  78 */
  79struct rbd_image_header {
  80	u64 image_size;
  81	char block_name[32];
  82	__u8 obj_order;
  83	__u8 crypt_type;
  84	__u8 comp_type;
  85	struct ceph_snap_context *snapc;
  86	size_t snap_names_len;
  87	u64 snap_seq;
  88	u32 total_snaps;
  89
  90	char *snap_names;
  91	u64 *snap_sizes;
  92
  93	u64 obj_version;
  94};
  95
  96struct rbd_options {
  97	int	notify_timeout;
  98};
  99
 100/*
 101 * an instance of the client.  multiple devices may share an rbd client.
 102 */
 103struct rbd_client {
 104	struct ceph_client	*client;
 105	struct rbd_options	*rbd_opts;
 106	struct kref		kref;
 107	struct list_head	node;
 108};
 109
 110/*
 111 * a request completion status
 112 */
 113struct rbd_req_status {
 114	int done;
 115	int rc;
 116	u64 bytes;
 117};
 118
 119/*
 120 * a collection of requests
 121 */
 122struct rbd_req_coll {
 123	int			total;
 124	int			num_done;
 125	struct kref		kref;
 126	struct rbd_req_status	status[0];
 127};
 128
 129/*
 130 * a single io request
 131 */
 132struct rbd_request {
 133	struct request		*rq;		/* blk layer request */
 134	struct bio		*bio;		/* cloned bio */
 135	struct page		**pages;	/* list of used pages */
 136	u64			len;
 137	int			coll_index;
 138	struct rbd_req_coll	*coll;
 139};
 140
 141struct rbd_snap {
 142	struct	device		dev;
 143	const char		*name;
 144	u64			size;
 145	struct list_head	node;
 146	u64			id;
 147};
 148
 149/*
 150 * a single device
 151 */
 152struct rbd_device {
 153	int			id;		/* blkdev unique id */
 154
 155	int			major;		/* blkdev assigned major */
 156	struct gendisk		*disk;		/* blkdev's gendisk and rq */
 157	struct request_queue	*q;
 158
 159	struct rbd_client	*rbd_client;
 160
 161	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 162
 163	spinlock_t		lock;		/* queue lock */
 164
 165	struct rbd_image_header	header;
 166	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
 167	int			obj_len;
 168	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
 169	char			pool_name[RBD_MAX_POOL_NAME_LEN];
 170	int			poolid;
 171
 172	struct ceph_osd_event   *watch_event;
 173	struct ceph_osd_request *watch_request;
 174
 175	/* protects updating the header */
 176	struct rw_semaphore     header_rwsem;
 177	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
 178	u64                     snap_id;	/* current snapshot id */
 179	int read_only;
 180
 181	struct list_head	node;
 182
 183	/* list of snapshots */
 184	struct list_head	snaps;
 185
 186	/* sysfs related */
 187	struct device		dev;
 188};
 189
 190static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
 191
 192static LIST_HEAD(rbd_dev_list);    /* devices */
 193static DEFINE_SPINLOCK(rbd_dev_list_lock);
 194
 195static LIST_HEAD(rbd_client_list);		/* clients */
 196static DEFINE_SPINLOCK(rbd_client_list_lock);
 197
 198static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
 199static void rbd_dev_release(struct device *dev);
 200static ssize_t rbd_snap_add(struct device *dev,
 201			    struct device_attribute *attr,
 202			    const char *buf,
 203			    size_t count);
 204static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
 205				  struct rbd_snap *snap);
 206
 207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 208		       size_t count);
 209static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 210			  size_t count);
 211
 212static struct bus_attribute rbd_bus_attrs[] = {
 213	__ATTR(add, S_IWUSR, NULL, rbd_add),
 214	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
 215	__ATTR_NULL
 216};
 217
 218static struct bus_type rbd_bus_type = {
 219	.name		= "rbd",
 220	.bus_attrs	= rbd_bus_attrs,
 221};
 222
 223static void rbd_root_dev_release(struct device *dev)
 224{
 225}
 226
 227static struct device rbd_root_dev = {
 228	.init_name =    "rbd",
 229	.release =      rbd_root_dev_release,
 230};
 231
 232
 233static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 234{
 235	return get_device(&rbd_dev->dev);
 236}
 237
 238static void rbd_put_dev(struct rbd_device *rbd_dev)
 239{
 240	put_device(&rbd_dev->dev);
 241}
 242
 243static int __rbd_refresh_header(struct rbd_device *rbd_dev);
 244
 245static int rbd_open(struct block_device *bdev, fmode_t mode)
 246{
 247	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 248
 249	rbd_get_dev(rbd_dev);
 250
 251	set_device_ro(bdev, rbd_dev->read_only);
 252
 253	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
 254		return -EROFS;
 255
 256	return 0;
 257}
 258
 259static int rbd_release(struct gendisk *disk, fmode_t mode)
 260{
 261	struct rbd_device *rbd_dev = disk->private_data;
 262
 263	rbd_put_dev(rbd_dev);
 264
 265	return 0;
 266}
 267
 268static const struct block_device_operations rbd_bd_ops = {
 269	.owner			= THIS_MODULE,
 270	.open			= rbd_open,
 271	.release		= rbd_release,
 272};
 273
 274/*
 275 * Initialize an rbd client instance.
 276 * We own *opt.
 277 */
 278static struct rbd_client *rbd_client_create(struct ceph_options *opt,
 279					    struct rbd_options *rbd_opts)
 280{
 281	struct rbd_client *rbdc;
 282	int ret = -ENOMEM;
 283
 284	dout("rbd_client_create\n");
 285	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 286	if (!rbdc)
 287		goto out_opt;
 288
 289	kref_init(&rbdc->kref);
 290	INIT_LIST_HEAD(&rbdc->node);
 291
 292	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 293
 294	rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
 295	if (IS_ERR(rbdc->client))
 296		goto out_mutex;
 297	opt = NULL; /* Now rbdc->client is responsible for opt */
 298
 299	ret = ceph_open_session(rbdc->client);
 300	if (ret < 0)
 301		goto out_err;
 302
 303	rbdc->rbd_opts = rbd_opts;
 304
 305	spin_lock(&rbd_client_list_lock);
 306	list_add_tail(&rbdc->node, &rbd_client_list);
 307	spin_unlock(&rbd_client_list_lock);
 308
 309	mutex_unlock(&ctl_mutex);
 310
 311	dout("rbd_client_create created %p\n", rbdc);
 312	return rbdc;
 313
 314out_err:
 315	ceph_destroy_client(rbdc->client);
 316out_mutex:
 317	mutex_unlock(&ctl_mutex);
 318	kfree(rbdc);
 319out_opt:
 320	if (opt)
 321		ceph_destroy_options(opt);
 322	return ERR_PTR(ret);
 323}
 324
 325/*
 326 * Find a ceph client with specific addr and configuration.
 327 */
 328static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
 329{
 330	struct rbd_client *client_node;
 331
 332	if (opt->flags & CEPH_OPT_NOSHARE)
 333		return NULL;
 334
 335	list_for_each_entry(client_node, &rbd_client_list, node)
 336		if (ceph_compare_options(opt, client_node->client) == 0)
 337			return client_node;
 338	return NULL;
 339}
 340
 341/*
 342 * mount options
 343 */
 344enum {
 345	Opt_notify_timeout,
 346	Opt_last_int,
 347	/* int args above */
 348	Opt_last_string,
 349	/* string args above */
 350};
 351
 352static match_table_t rbdopt_tokens = {
 353	{Opt_notify_timeout, "notify_timeout=%d"},
 354	/* int args above */
 355	/* string args above */
 356	{-1, NULL}
 357};
 358
 359static int parse_rbd_opts_token(char *c, void *private)
 360{
 361	struct rbd_options *rbdopt = private;
 362	substring_t argstr[MAX_OPT_ARGS];
 363	int token, intval, ret;
 364
 365	token = match_token(c, rbdopt_tokens, argstr);
 366	if (token < 0)
 367		return -EINVAL;
 368
 369	if (token < Opt_last_int) {
 370		ret = match_int(&argstr[0], &intval);
 371		if (ret < 0) {
 372			pr_err("bad mount option arg (not int) "
 373			       "at '%s'\n", c);
 374			return ret;
 375		}
 376		dout("got int token %d val %d\n", token, intval);
 377	} else if (token > Opt_last_int && token < Opt_last_string) {
 378		dout("got string token %d val %s\n", token,
 379		     argstr[0].from);
 380	} else {
 381		dout("got token %d\n", token);
 382	}
 383
 384	switch (token) {
 385	case Opt_notify_timeout:
 386		rbdopt->notify_timeout = intval;
 387		break;
 388	default:
 389		BUG_ON(token);
 390	}
 391	return 0;
 392}
 393
 394/*
 395 * Get a ceph client with specific addr and configuration, if one does
 396 * not exist create it.
 397 */
 398static struct rbd_client *rbd_get_client(const char *mon_addr,
 399					 size_t mon_addr_len,
 400					 char *options)
 401{
 402	struct rbd_client *rbdc;
 403	struct ceph_options *opt;
 404	struct rbd_options *rbd_opts;
 405
 406	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
 407	if (!rbd_opts)
 408		return ERR_PTR(-ENOMEM);
 409
 410	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 411
 412	opt = ceph_parse_options(options, mon_addr,
 413				mon_addr + mon_addr_len,
 414				parse_rbd_opts_token, rbd_opts);
 415	if (IS_ERR(opt)) {
 416		kfree(rbd_opts);
 417		return ERR_CAST(opt);
 418	}
 419
 420	spin_lock(&rbd_client_list_lock);
 421	rbdc = __rbd_client_find(opt);
 422	if (rbdc) {
 423		/* using an existing client */
 424		kref_get(&rbdc->kref);
 425		spin_unlock(&rbd_client_list_lock);
 426
 427		ceph_destroy_options(opt);
 428		kfree(rbd_opts);
 429
 430		return rbdc;
 431	}
 432	spin_unlock(&rbd_client_list_lock);
 433
 434	rbdc = rbd_client_create(opt, rbd_opts);
 435
 436	if (IS_ERR(rbdc))
 437		kfree(rbd_opts);
 438
 439	return rbdc;
 440}
 441
 442/*
 443 * Destroy ceph client
 444 *
 445 * Caller must hold rbd_client_list_lock.
 446 */
 447static void rbd_client_release(struct kref *kref)
 448{
 449	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 450
 451	dout("rbd_release_client %p\n", rbdc);
 452	spin_lock(&rbd_client_list_lock);
 453	list_del(&rbdc->node);
 454	spin_unlock(&rbd_client_list_lock);
 455
 456	ceph_destroy_client(rbdc->client);
 457	kfree(rbdc->rbd_opts);
 458	kfree(rbdc);
 459}
 460
 461/*
 462 * Drop reference to ceph client node. If it's not referenced anymore, release
 463 * it.
 464 */
 465static void rbd_put_client(struct rbd_device *rbd_dev)
 466{
 467	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
 468	rbd_dev->rbd_client = NULL;
 469}
 470
 471/*
 472 * Destroy requests collection
 473 */
 474static void rbd_coll_release(struct kref *kref)
 475{
 476	struct rbd_req_coll *coll =
 477		container_of(kref, struct rbd_req_coll, kref);
 478
 479	dout("rbd_coll_release %p\n", coll);
 480	kfree(coll);
 481}
 482
 483/*
 484 * Create a new header structure, translate header format from the on-disk
 485 * header.
 486 */
 487static int rbd_header_from_disk(struct rbd_image_header *header,
 488				 struct rbd_image_header_ondisk *ondisk,
 489				 u32 allocated_snaps,
 490				 gfp_t gfp_flags)
 491{
 492	u32 i, snap_count;
 493
 494	if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
 495		return -ENXIO;
 496
 497	snap_count = le32_to_cpu(ondisk->snap_count);
 498	if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
 499			 / sizeof (*ondisk))
 500		return -EINVAL;
 501	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
 502				snap_count * sizeof(u64),
 503				gfp_flags);
 504	if (!header->snapc)
 505		return -ENOMEM;
 506
 507	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 508	if (snap_count) {
 509		header->snap_names = kmalloc(header->snap_names_len,
 510					     gfp_flags);
 511		if (!header->snap_names)
 512			goto err_snapc;
 513		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
 514					     gfp_flags);
 515		if (!header->snap_sizes)
 516			goto err_names;
 517	} else {
 518		header->snap_names = NULL;
 519		header->snap_sizes = NULL;
 520	}
 521	memcpy(header->block_name, ondisk->block_name,
 522	       sizeof(ondisk->block_name));
 523
 524	header->image_size = le64_to_cpu(ondisk->image_size);
 525	header->obj_order = ondisk->options.order;
 526	header->crypt_type = ondisk->options.crypt_type;
 527	header->comp_type = ondisk->options.comp_type;
 528
 529	atomic_set(&header->snapc->nref, 1);
 530	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
 531	header->snapc->num_snaps = snap_count;
 532	header->total_snaps = snap_count;
 533
 534	if (snap_count && allocated_snaps == snap_count) {
 535		for (i = 0; i < snap_count; i++) {
 536			header->snapc->snaps[i] =
 537				le64_to_cpu(ondisk->snaps[i].id);
 538			header->snap_sizes[i] =
 539				le64_to_cpu(ondisk->snaps[i].image_size);
 540		}
 541
 542		/* copy snapshot names */
 543		memcpy(header->snap_names, &ondisk->snaps[i],
 544			header->snap_names_len);
 545	}
 546
 547	return 0;
 548
 549err_names:
 550	kfree(header->snap_names);
 551err_snapc:
 552	kfree(header->snapc);
 553	return -ENOMEM;
 554}
 555
 556static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
 557			u64 *seq, u64 *size)
 558{
 559	int i;
 560	char *p = header->snap_names;
 561
 562	for (i = 0; i < header->total_snaps; i++) {
 563		if (!strcmp(snap_name, p)) {
 564
 565			/* Found it.  Pass back its id and/or size */
 566
 567			if (seq)
 568				*seq = header->snapc->snaps[i];
 569			if (size)
 570				*size = header->snap_sizes[i];
 571			return i;
 572		}
 573		p += strlen(p) + 1;	/* Skip ahead to the next name */
 574	}
 575	return -ENOENT;
 576}
 577
 578static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
 579{
 580	struct rbd_image_header *header = &dev->header;
 581	struct ceph_snap_context *snapc = header->snapc;
 582	int ret = -ENOENT;
 583
 584	BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
 585
 586	down_write(&dev->header_rwsem);
 587
 588	if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
 589		    sizeof (RBD_SNAP_HEAD_NAME))) {
 590		if (header->total_snaps)
 591			snapc->seq = header->snap_seq;
 592		else
 593			snapc->seq = 0;
 594		dev->snap_id = CEPH_NOSNAP;
 595		dev->read_only = 0;
 596		if (size)
 597			*size = header->image_size;
 598	} else {
 599		ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
 600		if (ret < 0)
 601			goto done;
 602		dev->snap_id = snapc->seq;
 603		dev->read_only = 1;
 604	}
 605
 606	ret = 0;
 607done:
 608	up_write(&dev->header_rwsem);
 609	return ret;
 610}
 611
 612static void rbd_header_free(struct rbd_image_header *header)
 613{
 614	kfree(header->snapc);
 615	kfree(header->snap_names);
 616	kfree(header->snap_sizes);
 617}
 618
 619/*
 620 * get the actual striped segment name, offset and length
 621 */
 622static u64 rbd_get_segment(struct rbd_image_header *header,
 623			   const char *block_name,
 624			   u64 ofs, u64 len,
 625			   char *seg_name, u64 *segofs)
 626{
 627	u64 seg = ofs >> header->obj_order;
 628
 629	if (seg_name)
 630		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
 631			 "%s.%012llx", block_name, seg);
 632
 633	ofs = ofs & ((1 << header->obj_order) - 1);
 634	len = min_t(u64, len, (1 << header->obj_order) - ofs);
 635
 636	if (segofs)
 637		*segofs = ofs;
 638
 639	return len;
 640}
 641
 642static int rbd_get_num_segments(struct rbd_image_header *header,
 643				u64 ofs, u64 len)
 644{
 645	u64 start_seg = ofs >> header->obj_order;
 646	u64 end_seg = (ofs + len - 1) >> header->obj_order;
 647	return end_seg - start_seg + 1;
 648}
 649
 650/*
 651 * returns the size of an object in the image
 652 */
 653static u64 rbd_obj_bytes(struct rbd_image_header *header)
 654{
 655	return 1 << header->obj_order;
 656}
 657
 658/*
 659 * bio helpers
 660 */
 661
 662static void bio_chain_put(struct bio *chain)
 663{
 664	struct bio *tmp;
 665
 666	while (chain) {
 667		tmp = chain;
 668		chain = chain->bi_next;
 669		bio_put(tmp);
 670	}
 671}
 672
 673/*
 674 * zeros a bio chain, starting at specific offset
 675 */
 676static void zero_bio_chain(struct bio *chain, int start_ofs)
 677{
 678	struct bio_vec *bv;
 679	unsigned long flags;
 680	void *buf;
 681	int i;
 682	int pos = 0;
 683
 684	while (chain) {
 685		bio_for_each_segment(bv, chain, i) {
 686			if (pos + bv->bv_len > start_ofs) {
 687				int remainder = max(start_ofs - pos, 0);
 688				buf = bvec_kmap_irq(bv, &flags);
 689				memset(buf + remainder, 0,
 690				       bv->bv_len - remainder);
 691				bvec_kunmap_irq(buf, &flags);
 692			}
 693			pos += bv->bv_len;
 694		}
 695
 696		chain = chain->bi_next;
 697	}
 698}
 699
 700/*
 701 * bio_chain_clone - clone a chain of bios up to a certain length.
 702 * might return a bio_pair that will need to be released.
 703 */
 704static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
 705				   struct bio_pair **bp,
 706				   int len, gfp_t gfpmask)
 707{
 708	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
 709	int total = 0;
 710
 711	if (*bp) {
 712		bio_pair_release(*bp);
 713		*bp = NULL;
 714	}
 715
 716	while (old_chain && (total < len)) {
 717		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
 718		if (!tmp)
 719			goto err_out;
 720
 721		if (total + old_chain->bi_size > len) {
 722			struct bio_pair *bp;
 723
 724			/*
 725			 * this split can only happen with a single paged bio,
 726			 * split_bio will BUG_ON if this is not the case
 727			 */
 728			dout("bio_chain_clone split! total=%d remaining=%d"
 729			     "bi_size=%d\n",
 730			     (int)total, (int)len-total,
 731			     (int)old_chain->bi_size);
 732
 733			/* split the bio. We'll release it either in the next
 734			   call, or it will have to be released outside */
 735			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
 736			if (!bp)
 737				goto err_out;
 738
 739			__bio_clone(tmp, &bp->bio1);
 740
 741			*next = &bp->bio2;
 742		} else {
 743			__bio_clone(tmp, old_chain);
 744			*next = old_chain->bi_next;
 745		}
 746
 747		tmp->bi_bdev = NULL;
 748		gfpmask &= ~__GFP_WAIT;
 749		tmp->bi_next = NULL;
 750
 751		if (!new_chain) {
 752			new_chain = tail = tmp;
 753		} else {
 754			tail->bi_next = tmp;
 755			tail = tmp;
 756		}
 757		old_chain = old_chain->bi_next;
 758
 759		total += tmp->bi_size;
 760	}
 761
 762	BUG_ON(total < len);
 763
 764	if (tail)
 765		tail->bi_next = NULL;
 766
 767	*old = old_chain;
 768
 769	return new_chain;
 770
 771err_out:
 772	dout("bio_chain_clone with err\n");
 773	bio_chain_put(new_chain);
 774	return NULL;
 775}
 776
 777/*
 778 * helpers for osd request op vectors.
 779 */
 780static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
 781			    int num_ops,
 782			    int opcode,
 783			    u32 payload_len)
 784{
 785	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
 786		       GFP_NOIO);
 787	if (!*ops)
 788		return -ENOMEM;
 789	(*ops)[0].op = opcode;
 790	/*
 791	 * op extent offset and length will be set later on
 792	 * in calc_raw_layout()
 793	 */
 794	(*ops)[0].payload_len = payload_len;
 795	return 0;
 796}
 797
 798static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
 799{
 800	kfree(ops);
 801}
 802
 803static void rbd_coll_end_req_index(struct request *rq,
 804				   struct rbd_req_coll *coll,
 805				   int index,
 806				   int ret, u64 len)
 807{
 808	struct request_queue *q;
 809	int min, max, i;
 810
 811	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
 812	     coll, index, ret, len);
 813
 814	if (!rq)
 815		return;
 816
 817	if (!coll) {
 818		blk_end_request(rq, ret, len);
 819		return;
 820	}
 821
 822	q = rq->q;
 823
 824	spin_lock_irq(q->queue_lock);
 825	coll->status[index].done = 1;
 826	coll->status[index].rc = ret;
 827	coll->status[index].bytes = len;
 828	max = min = coll->num_done;
 829	while (max < coll->total && coll->status[max].done)
 830		max++;
 831
 832	for (i = min; i<max; i++) {
 833		__blk_end_request(rq, coll->status[i].rc,
 834				  coll->status[i].bytes);
 835		coll->num_done++;
 836		kref_put(&coll->kref, rbd_coll_release);
 837	}
 838	spin_unlock_irq(q->queue_lock);
 839}
 840
 841static void rbd_coll_end_req(struct rbd_request *req,
 842			     int ret, u64 len)
 843{
 844	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
 845}
 846
 847/*
 848 * Send ceph osd request
 849 */
 850static int rbd_do_request(struct request *rq,
 851			  struct rbd_device *dev,
 852			  struct ceph_snap_context *snapc,
 853			  u64 snapid,
 854			  const char *obj, u64 ofs, u64 len,
 855			  struct bio *bio,
 856			  struct page **pages,
 857			  int num_pages,
 858			  int flags,
 859			  struct ceph_osd_req_op *ops,
 860			  int num_reply,
 861			  struct rbd_req_coll *coll,
 862			  int coll_index,
 863			  void (*rbd_cb)(struct ceph_osd_request *req,
 864					 struct ceph_msg *msg),
 865			  struct ceph_osd_request **linger_req,
 866			  u64 *ver)
 867{
 868	struct ceph_osd_request *req;
 869	struct ceph_file_layout *layout;
 870	int ret;
 871	u64 bno;
 872	struct timespec mtime = CURRENT_TIME;
 873	struct rbd_request *req_data;
 874	struct ceph_osd_request_head *reqhead;
 875	struct ceph_osd_client *osdc;
 876
 877	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
 878	if (!req_data) {
 879		if (coll)
 880			rbd_coll_end_req_index(rq, coll, coll_index,
 881					       -ENOMEM, len);
 882		return -ENOMEM;
 883	}
 884
 885	if (coll) {
 886		req_data->coll = coll;
 887		req_data->coll_index = coll_index;
 888	}
 889
 890	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
 891
 892	down_read(&dev->header_rwsem);
 893
 894	osdc = &dev->rbd_client->client->osdc;
 895	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
 896					false, GFP_NOIO, pages, bio);
 897	if (!req) {
 898		up_read(&dev->header_rwsem);
 899		ret = -ENOMEM;
 900		goto done_pages;
 901	}
 902
 903	req->r_callback = rbd_cb;
 904
 905	req_data->rq = rq;
 906	req_data->bio = bio;
 907	req_data->pages = pages;
 908	req_data->len = len;
 909
 910	req->r_priv = req_data;
 911
 912	reqhead = req->r_request->front.iov_base;
 913	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
 914
 915	strncpy(req->r_oid, obj, sizeof(req->r_oid));
 916	req->r_oid_len = strlen(req->r_oid);
 917
 918	layout = &req->r_file_layout;
 919	memset(layout, 0, sizeof(*layout));
 920	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 921	layout->fl_stripe_count = cpu_to_le32(1);
 922	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 923	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
 924	ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
 925				req, ops);
 926
 927	ceph_osdc_build_request(req, ofs, &len,
 928				ops,
 929				snapc,
 930				&mtime,
 931				req->r_oid, req->r_oid_len);
 932	up_read(&dev->header_rwsem);
 933
 934	if (linger_req) {
 935		ceph_osdc_set_request_linger(osdc, req);
 936		*linger_req = req;
 937	}
 938
 939	ret = ceph_osdc_start_request(osdc, req, false);
 940	if (ret < 0)
 941		goto done_err;
 942
 943	if (!rbd_cb) {
 944		ret = ceph_osdc_wait_request(osdc, req);
 945		if (ver)
 946			*ver = le64_to_cpu(req->r_reassert_version.version);
 947		dout("reassert_ver=%lld\n",
 948		     le64_to_cpu(req->r_reassert_version.version));
 949		ceph_osdc_put_request(req);
 950	}
 951	return ret;
 952
 953done_err:
 954	bio_chain_put(req_data->bio);
 955	ceph_osdc_put_request(req);
 956done_pages:
 957	rbd_coll_end_req(req_data, ret, len);
 958	kfree(req_data);
 959	return ret;
 960}
 961
 962/*
 963 * Ceph osd op callback
 964 */
 965static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
 966{
 967	struct rbd_request *req_data = req->r_priv;
 968	struct ceph_osd_reply_head *replyhead;
 969	struct ceph_osd_op *op;
 970	__s32 rc;
 971	u64 bytes;
 972	int read_op;
 973
 974	/* parse reply */
 975	replyhead = msg->front.iov_base;
 976	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
 977	op = (void *)(replyhead + 1);
 978	rc = le32_to_cpu(replyhead->result);
 979	bytes = le64_to_cpu(op->extent.length);
 980	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
 981
 982	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
 983
 984	if (rc == -ENOENT && read_op) {
 985		zero_bio_chain(req_data->bio, 0);
 986		rc = 0;
 987	} else if (rc == 0 && read_op && bytes < req_data->len) {
 988		zero_bio_chain(req_data->bio, bytes);
 989		bytes = req_data->len;
 990	}
 991
 992	rbd_coll_end_req(req_data, rc, bytes);
 993
 994	if (req_data->bio)
 995		bio_chain_put(req_data->bio);
 996
 997	ceph_osdc_put_request(req);
 998	kfree(req_data);
 999}
1000
1001static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1002{
1003	ceph_osdc_put_request(req);
1004}
1005
1006/*
1007 * Do a synchronous ceph osd operation
1008 */
1009static int rbd_req_sync_op(struct rbd_device *dev,
1010			   struct ceph_snap_context *snapc,
1011			   u64 snapid,
1012			   int opcode,
1013			   int flags,
1014			   struct ceph_osd_req_op *orig_ops,
1015			   int num_reply,
1016			   const char *obj,
1017			   u64 ofs, u64 len,
1018			   char *buf,
1019			   struct ceph_osd_request **linger_req,
1020			   u64 *ver)
1021{
1022	int ret;
1023	struct page **pages;
1024	int num_pages;
1025	struct ceph_osd_req_op *ops = orig_ops;
1026	u32 payload_len;
1027
1028	num_pages = calc_pages_for(ofs , len);
1029	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1030	if (IS_ERR(pages))
1031		return PTR_ERR(pages);
1032
1033	if (!orig_ops) {
1034		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1035		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1036		if (ret < 0)
1037			goto done;
1038
1039		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1040			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1041			if (ret < 0)
1042				goto done_ops;
1043		}
1044	}
1045
1046	ret = rbd_do_request(NULL, dev, snapc, snapid,
1047			  obj, ofs, len, NULL,
1048			  pages, num_pages,
1049			  flags,
1050			  ops,
1051			  2,
1052			  NULL, 0,
1053			  NULL,
1054			  linger_req, ver);
1055	if (ret < 0)
1056		goto done_ops;
1057
1058	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1059		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1060
1061done_ops:
1062	if (!orig_ops)
1063		rbd_destroy_ops(ops);
1064done:
1065	ceph_release_page_vector(pages, num_pages);
1066	return ret;
1067}
1068
1069/*
1070 * Do an asynchronous ceph osd operation
1071 */
1072static int rbd_do_op(struct request *rq,
1073		     struct rbd_device *rbd_dev ,
1074		     struct ceph_snap_context *snapc,
1075		     u64 snapid,
1076		     int opcode, int flags, int num_reply,
1077		     u64 ofs, u64 len,
1078		     struct bio *bio,
1079		     struct rbd_req_coll *coll,
1080		     int coll_index)
1081{
1082	char *seg_name;
1083	u64 seg_ofs;
1084	u64 seg_len;
1085	int ret;
1086	struct ceph_osd_req_op *ops;
1087	u32 payload_len;
1088
1089	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090	if (!seg_name)
1091		return -ENOMEM;
1092
1093	seg_len = rbd_get_segment(&rbd_dev->header,
1094				  rbd_dev->header.block_name,
1095				  ofs, len,
1096				  seg_name, &seg_ofs);
1097
1098	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099
1100	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1101	if (ret < 0)
1102		goto done;
1103
1104	/* we've taken care of segment sizes earlier when we
1105	   cloned the bios. We should never have a segment
1106	   truncated at this point */
1107	BUG_ON(seg_len < len);
1108
1109	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110			     seg_name, seg_ofs, seg_len,
1111			     bio,
1112			     NULL, 0,
1113			     flags,
1114			     ops,
1115			     num_reply,
1116			     coll, coll_index,
1117			     rbd_req_cb, 0, NULL);
1118
1119	rbd_destroy_ops(ops);
1120done:
1121	kfree(seg_name);
1122	return ret;
1123}
1124
1125/*
1126 * Request async osd write
1127 */
1128static int rbd_req_write(struct request *rq,
1129			 struct rbd_device *rbd_dev,
1130			 struct ceph_snap_context *snapc,
1131			 u64 ofs, u64 len,
1132			 struct bio *bio,
1133			 struct rbd_req_coll *coll,
1134			 int coll_index)
1135{
1136	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137			 CEPH_OSD_OP_WRITE,
1138			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139			 2,
1140			 ofs, len, bio, coll, coll_index);
1141}
1142
1143/*
1144 * Request async osd read
1145 */
1146static int rbd_req_read(struct request *rq,
1147			 struct rbd_device *rbd_dev,
1148			 u64 snapid,
1149			 u64 ofs, u64 len,
1150			 struct bio *bio,
1151			 struct rbd_req_coll *coll,
1152			 int coll_index)
1153{
1154	return rbd_do_op(rq, rbd_dev, NULL,
1155			 snapid,
1156			 CEPH_OSD_OP_READ,
1157			 CEPH_OSD_FLAG_READ,
1158			 2,
1159			 ofs, len, bio, coll, coll_index);
1160}
1161
1162/*
1163 * Request sync osd read
1164 */
1165static int rbd_req_sync_read(struct rbd_device *dev,
1166			  struct ceph_snap_context *snapc,
1167			  u64 snapid,
1168			  const char *obj,
1169			  u64 ofs, u64 len,
1170			  char *buf,
1171			  u64 *ver)
1172{
1173	return rbd_req_sync_op(dev, NULL,
1174			       snapid,
1175			       CEPH_OSD_OP_READ,
1176			       CEPH_OSD_FLAG_READ,
1177			       NULL,
1178			       1, obj, ofs, len, buf, NULL, ver);
1179}
1180
1181/*
1182 * Request sync osd watch
1183 */
1184static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1185				   u64 ver,
1186				   u64 notify_id,
1187				   const char *obj)
1188{
1189	struct ceph_osd_req_op *ops;
1190	struct page **pages = NULL;
1191	int ret;
1192
1193	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1194	if (ret < 0)
1195		return ret;
1196
1197	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1198	ops[0].watch.cookie = notify_id;
1199	ops[0].watch.flag = 0;
1200
1201	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1202			  obj, 0, 0, NULL,
1203			  pages, 0,
1204			  CEPH_OSD_FLAG_READ,
1205			  ops,
1206			  1,
1207			  NULL, 0,
1208			  rbd_simple_req_cb, 0, NULL);
1209
1210	rbd_destroy_ops(ops);
1211	return ret;
1212}
1213
1214static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1215{
1216	struct rbd_device *dev = (struct rbd_device *)data;
1217	int rc;
1218
1219	if (!dev)
1220		return;
1221
1222	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1223		notify_id, (int)opcode);
1224	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1225	rc = __rbd_refresh_header(dev);
1226	mutex_unlock(&ctl_mutex);
1227	if (rc)
1228		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1229			   " update snaps: %d\n", dev->major, rc);
1230
1231	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1232}
1233
1234/*
1235 * Request sync osd watch
1236 */
1237static int rbd_req_sync_watch(struct rbd_device *dev,
1238			      const char *obj,
1239			      u64 ver)
1240{
1241	struct ceph_osd_req_op *ops;
1242	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1243
1244	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1245	if (ret < 0)
1246		return ret;
1247
1248	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249				     (void *)dev, &dev->watch_event);
1250	if (ret < 0)
1251		goto fail;
1252
1253	ops[0].watch.ver = cpu_to_le64(ver);
1254	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1255	ops[0].watch.flag = 1;
1256
1257	ret = rbd_req_sync_op(dev, NULL,
1258			      CEPH_NOSNAP,
1259			      0,
1260			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261			      ops,
1262			      1, obj, 0, 0, NULL,
1263			      &dev->watch_request, NULL);
1264
1265	if (ret < 0)
1266		goto fail_event;
1267
1268	rbd_destroy_ops(ops);
1269	return 0;
1270
1271fail_event:
1272	ceph_osdc_cancel_event(dev->watch_event);
1273	dev->watch_event = NULL;
1274fail:
1275	rbd_destroy_ops(ops);
1276	return ret;
1277}
1278
1279/*
1280 * Request sync osd unwatch
1281 */
1282static int rbd_req_sync_unwatch(struct rbd_device *dev,
1283				const char *obj)
1284{
1285	struct ceph_osd_req_op *ops;
1286
1287	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1288	if (ret < 0)
1289		return ret;
1290
1291	ops[0].watch.ver = 0;
1292	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1293	ops[0].watch.flag = 0;
1294
1295	ret = rbd_req_sync_op(dev, NULL,
1296			      CEPH_NOSNAP,
1297			      0,
1298			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299			      ops,
1300			      1, obj, 0, 0, NULL, NULL, NULL);
1301
1302	rbd_destroy_ops(ops);
1303	ceph_osdc_cancel_event(dev->watch_event);
1304	dev->watch_event = NULL;
1305	return ret;
1306}
1307
1308struct rbd_notify_info {
1309	struct rbd_device *dev;
1310};
1311
1312static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1313{
1314	struct rbd_device *dev = (struct rbd_device *)data;
1315	if (!dev)
1316		return;
1317
1318	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1319		notify_id, (int)opcode);
1320}
1321
1322/*
1323 * Request sync osd notify
1324 */
1325static int rbd_req_sync_notify(struct rbd_device *dev,
1326		          const char *obj)
1327{
1328	struct ceph_osd_req_op *ops;
1329	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1330	struct ceph_osd_event *event;
1331	struct rbd_notify_info info;
1332	int payload_len = sizeof(u32) + sizeof(u32);
1333	int ret;
1334
1335	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1336	if (ret < 0)
1337		return ret;
1338
1339	info.dev = dev;
1340
1341	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1342				     (void *)&info, &event);
1343	if (ret < 0)
1344		goto fail;
1345
1346	ops[0].watch.ver = 1;
1347	ops[0].watch.flag = 1;
1348	ops[0].watch.cookie = event->cookie;
1349	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1350	ops[0].watch.timeout = 12;
1351
1352	ret = rbd_req_sync_op(dev, NULL,
1353			       CEPH_NOSNAP,
1354			       0,
1355			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356			       ops,
1357			       1, obj, 0, 0, NULL, NULL, NULL);
1358	if (ret < 0)
1359		goto fail_event;
1360
1361	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1362	dout("ceph_osdc_wait_event returned %d\n", ret);
1363	rbd_destroy_ops(ops);
1364	return 0;
1365
1366fail_event:
1367	ceph_osdc_cancel_event(event);
1368fail:
1369	rbd_destroy_ops(ops);
1370	return ret;
1371}
1372
1373/*
1374 * Request sync osd read
1375 */
1376static int rbd_req_sync_exec(struct rbd_device *dev,
1377			     const char *obj,
1378			     const char *cls,
1379			     const char *method,
1380			     const char *data,
1381			     int len,
1382			     u64 *ver)
1383{
1384	struct ceph_osd_req_op *ops;
1385	int cls_len = strlen(cls);
1386	int method_len = strlen(method);
1387	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1388				    cls_len + method_len + len);
1389	if (ret < 0)
1390		return ret;
1391
1392	ops[0].cls.class_name = cls;
1393	ops[0].cls.class_len = (__u8)cls_len;
1394	ops[0].cls.method_name = method;
1395	ops[0].cls.method_len = (__u8)method_len;
1396	ops[0].cls.argc = 0;
1397	ops[0].cls.indata = data;
1398	ops[0].cls.indata_len = len;
1399
1400	ret = rbd_req_sync_op(dev, NULL,
1401			       CEPH_NOSNAP,
1402			       0,
1403			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1404			       ops,
1405			       1, obj, 0, 0, NULL, NULL, ver);
1406
1407	rbd_destroy_ops(ops);
1408
1409	dout("cls_exec returned %d\n", ret);
1410	return ret;
1411}
1412
1413static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1414{
1415	struct rbd_req_coll *coll =
1416			kzalloc(sizeof(struct rbd_req_coll) +
1417			        sizeof(struct rbd_req_status) * num_reqs,
1418				GFP_ATOMIC);
1419
1420	if (!coll)
1421		return NULL;
1422	coll->total = num_reqs;
1423	kref_init(&coll->kref);
1424	return coll;
1425}
1426
1427/*
1428 * block device queue callback
1429 */
1430static void rbd_rq_fn(struct request_queue *q)
1431{
1432	struct rbd_device *rbd_dev = q->queuedata;
1433	struct request *rq;
1434	struct bio_pair *bp = NULL;
1435
1436	while ((rq = blk_fetch_request(q))) {
1437		struct bio *bio;
1438		struct bio *rq_bio, *next_bio = NULL;
1439		bool do_write;
1440		int size, op_size = 0;
1441		u64 ofs;
1442		int num_segs, cur_seg = 0;
1443		struct rbd_req_coll *coll;
1444
1445		/* peek at request from block layer */
1446		if (!rq)
1447			break;
1448
1449		dout("fetched request\n");
1450
1451		/* filter out block requests we don't understand */
1452		if ((rq->cmd_type != REQ_TYPE_FS)) {
1453			__blk_end_request_all(rq, 0);
1454			continue;
1455		}
1456
1457		/* deduce our operation (read, write) */
1458		do_write = (rq_data_dir(rq) == WRITE);
1459
1460		size = blk_rq_bytes(rq);
1461		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1462		rq_bio = rq->bio;
1463		if (do_write && rbd_dev->read_only) {
1464			__blk_end_request_all(rq, -EROFS);
1465			continue;
1466		}
1467
1468		spin_unlock_irq(q->queue_lock);
1469
1470		dout("%s 0x%x bytes at 0x%llx\n",
1471		     do_write ? "write" : "read",
1472		     size, blk_rq_pos(rq) * SECTOR_SIZE);
1473
1474		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1475		coll = rbd_alloc_coll(num_segs);
1476		if (!coll) {
1477			spin_lock_irq(q->queue_lock);
1478			__blk_end_request_all(rq, -ENOMEM);
1479			continue;
1480		}
1481
1482		do {
1483			/* a bio clone to be passed down to OSD req */
1484			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1485			op_size = rbd_get_segment(&rbd_dev->header,
1486						  rbd_dev->header.block_name,
1487						  ofs, size,
1488						  NULL, NULL);
1489			kref_get(&coll->kref);
1490			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1491					      op_size, GFP_ATOMIC);
1492			if (!bio) {
1493				rbd_coll_end_req_index(rq, coll, cur_seg,
1494						       -ENOMEM, op_size);
1495				goto next_seg;
1496			}
1497
1498
1499			/* init OSD command: write or read */
1500			if (do_write)
1501				rbd_req_write(rq, rbd_dev,
1502					      rbd_dev->header.snapc,
1503					      ofs,
1504					      op_size, bio,
1505					      coll, cur_seg);
1506			else
1507				rbd_req_read(rq, rbd_dev,
1508					     rbd_dev->snap_id,
1509					     ofs,
1510					     op_size, bio,
1511					     coll, cur_seg);
1512
1513next_seg:
1514			size -= op_size;
1515			ofs += op_size;
1516
1517			cur_seg++;
1518			rq_bio = next_bio;
1519		} while (size > 0);
1520		kref_put(&coll->kref, rbd_coll_release);
1521
1522		if (bp)
1523			bio_pair_release(bp);
1524		spin_lock_irq(q->queue_lock);
1525	}
1526}
1527
1528/*
1529 * a queue callback. Makes sure that we don't create a bio that spans across
1530 * multiple osd objects. One exception would be with a single page bios,
1531 * which we handle later at bio_chain_clone
1532 */
1533static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1534			  struct bio_vec *bvec)
1535{
1536	struct rbd_device *rbd_dev = q->queuedata;
1537	unsigned int chunk_sectors;
1538	sector_t sector;
1539	unsigned int bio_sectors;
1540	int max;
1541
1542	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1543	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1544	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1545
1546	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1547				 + bio_sectors)) << SECTOR_SHIFT;
1548	if (max < 0)
1549		max = 0; /* bio_add cannot handle a negative return */
1550	if (max <= bvec->bv_len && bio_sectors == 0)
1551		return bvec->bv_len;
1552	return max;
1553}
1554
1555static void rbd_free_disk(struct rbd_device *rbd_dev)
1556{
1557	struct gendisk *disk = rbd_dev->disk;
1558
1559	if (!disk)
1560		return;
1561
1562	rbd_header_free(&rbd_dev->header);
1563
1564	if (disk->flags & GENHD_FL_UP)
1565		del_gendisk(disk);
1566	if (disk->queue)
1567		blk_cleanup_queue(disk->queue);
1568	put_disk(disk);
1569}
1570
1571/*
1572 * reload the ondisk the header 
1573 */
1574static int rbd_read_header(struct rbd_device *rbd_dev,
1575			   struct rbd_image_header *header)
1576{
1577	ssize_t rc;
1578	struct rbd_image_header_ondisk *dh;
1579	u32 snap_count = 0;
1580	u64 ver;
1581	size_t len;
1582
1583	/*
1584	 * First reads the fixed-size header to determine the number
1585	 * of snapshots, then re-reads it, along with all snapshot
1586	 * records as well as their stored names.
1587	 */
1588	len = sizeof (*dh);
1589	while (1) {
1590		dh = kmalloc(len, GFP_KERNEL);
1591		if (!dh)
1592			return -ENOMEM;
1593
1594		rc = rbd_req_sync_read(rbd_dev,
1595				       NULL, CEPH_NOSNAP,
1596				       rbd_dev->obj_md_name,
1597				       0, len,
1598				       (char *)dh, &ver);
1599		if (rc < 0)
1600			goto out_dh;
1601
1602		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1603		if (rc < 0) {
1604			if (rc == -ENXIO)
1605				pr_warning("unrecognized header format"
1606					   " for image %s", rbd_dev->obj);
1607			goto out_dh;
1608		}
1609
1610		if (snap_count == header->total_snaps)
1611			break;
1612
1613		snap_count = header->total_snaps;
1614		len = sizeof (*dh) +
1615			snap_count * sizeof(struct rbd_image_snap_ondisk) +
1616			header->snap_names_len;
1617
1618		rbd_header_free(header);
1619		kfree(dh);
1620	}
1621	header->obj_version = ver;
1622
1623out_dh:
1624	kfree(dh);
1625	return rc;
1626}
1627
1628/*
1629 * create a snapshot
1630 */
1631static int rbd_header_add_snap(struct rbd_device *dev,
1632			       const char *snap_name,
1633			       gfp_t gfp_flags)
1634{
1635	int name_len = strlen(snap_name);
1636	u64 new_snapid;
1637	int ret;
1638	void *data, *p, *e;
1639	u64 ver;
1640	struct ceph_mon_client *monc;
1641
1642	/* we should create a snapshot only if we're pointing at the head */
1643	if (dev->snap_id != CEPH_NOSNAP)
1644		return -EINVAL;
1645
1646	monc = &dev->rbd_client->client->monc;
1647	ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1648	dout("created snapid=%lld\n", new_snapid);
1649	if (ret < 0)
1650		return ret;
1651
1652	data = kmalloc(name_len + 16, gfp_flags);
1653	if (!data)
1654		return -ENOMEM;
1655
1656	p = data;
1657	e = data + name_len + 16;
1658
1659	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1660	ceph_encode_64_safe(&p, e, new_snapid, bad);
1661
1662	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1663				data, p - data, &ver);
1664
1665	kfree(data);
1666
1667	if (ret < 0)
1668		return ret;
1669
1670	down_write(&dev->header_rwsem);
1671	dev->header.snapc->seq = new_snapid;
1672	up_write(&dev->header_rwsem);
1673
1674	return 0;
1675bad:
1676	return -ERANGE;
1677}
1678
1679static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1680{
1681	struct rbd_snap *snap;
1682
1683	while (!list_empty(&rbd_dev->snaps)) {
1684		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1685		__rbd_remove_snap_dev(rbd_dev, snap);
1686	}
1687}
1688
1689/*
1690 * only read the first part of the ondisk header, without the snaps info
1691 */
1692static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1693{
1694	int ret;
1695	struct rbd_image_header h;
1696	u64 snap_seq;
1697	int follow_seq = 0;
1698
1699	ret = rbd_read_header(rbd_dev, &h);
1700	if (ret < 0)
1701		return ret;
1702
1703	/* resized? */
1704	set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1705
1706	down_write(&rbd_dev->header_rwsem);
1707
1708	snap_seq = rbd_dev->header.snapc->seq;
1709	if (rbd_dev->header.total_snaps &&
1710	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1711		/* pointing at the head, will need to follow that
1712		   if head moves */
1713		follow_seq = 1;
1714
1715	kfree(rbd_dev->header.snapc);
1716	kfree(rbd_dev->header.snap_names);
1717	kfree(rbd_dev->header.snap_sizes);
1718
1719	rbd_dev->header.total_snaps = h.total_snaps;
1720	rbd_dev->header.snapc = h.snapc;
1721	rbd_dev->header.snap_names = h.snap_names;
1722	rbd_dev->header.snap_names_len = h.snap_names_len;
1723	rbd_dev->header.snap_sizes = h.snap_sizes;
1724	if (follow_seq)
1725		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1726	else
1727		rbd_dev->header.snapc->seq = snap_seq;
1728
1729	ret = __rbd_init_snaps_header(rbd_dev);
1730
1731	up_write(&rbd_dev->header_rwsem);
1732
1733	return ret;
1734}
1735
1736static int rbd_init_disk(struct rbd_device *rbd_dev)
1737{
1738	struct gendisk *disk;
1739	struct request_queue *q;
1740	int rc;
1741	u64 segment_size;
1742	u64 total_size = 0;
1743
1744	/* contact OSD, request size info about the object being mapped */
1745	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1746	if (rc)
1747		return rc;
1748
1749	/* no need to lock here, as rbd_dev is not registered yet */
1750	rc = __rbd_init_snaps_header(rbd_dev);
1751	if (rc)
1752		return rc;
1753
1754	rc = rbd_header_set_snap(rbd_dev, &total_size);
1755	if (rc)
1756		return rc;
1757
1758	/* create gendisk info */
1759	rc = -ENOMEM;
1760	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1761	if (!disk)
1762		goto out;
1763
1764	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1765		 rbd_dev->id);
1766	disk->major = rbd_dev->major;
1767	disk->first_minor = 0;
1768	disk->fops = &rbd_bd_ops;
1769	disk->private_data = rbd_dev;
1770
1771	/* init rq */
1772	rc = -ENOMEM;
1773	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1774	if (!q)
1775		goto out_disk;
1776
1777	/* We use the default size, but let's be explicit about it. */
1778	blk_queue_physical_block_size(q, SECTOR_SIZE);
1779
1780	/* set io sizes to object size */
1781	segment_size = rbd_obj_bytes(&rbd_dev->header);
1782	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1783	blk_queue_max_segment_size(q, segment_size);
1784	blk_queue_io_min(q, segment_size);
1785	blk_queue_io_opt(q, segment_size);
1786
1787	blk_queue_merge_bvec(q, rbd_merge_bvec);
1788	disk->queue = q;
1789
1790	q->queuedata = rbd_dev;
1791
1792	rbd_dev->disk = disk;
1793	rbd_dev->q = q;
1794
1795	/* finally, announce the disk to the world */
1796	set_capacity(disk, total_size / SECTOR_SIZE);
1797	add_disk(disk);
1798
1799	pr_info("%s: added with size 0x%llx\n",
1800		disk->disk_name, (unsigned long long)total_size);
1801	return 0;
1802
1803out_disk:
1804	put_disk(disk);
1805out:
1806	return rc;
1807}
1808
1809/*
1810  sysfs
1811*/
1812
1813static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1814{
1815	return container_of(dev, struct rbd_device, dev);
1816}
1817
1818static ssize_t rbd_size_show(struct device *dev,
1819			     struct device_attribute *attr, char *buf)
1820{
1821	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1822
1823	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1824}
1825
1826static ssize_t rbd_major_show(struct device *dev,
1827			      struct device_attribute *attr, char *buf)
1828{
1829	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1830
1831	return sprintf(buf, "%d\n", rbd_dev->major);
1832}
1833
1834static ssize_t rbd_client_id_show(struct device *dev,
1835				  struct device_attribute *attr, char *buf)
1836{
1837	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1838
1839	return sprintf(buf, "client%lld\n",
1840			ceph_client_id(rbd_dev->rbd_client->client));
1841}
1842
1843static ssize_t rbd_pool_show(struct device *dev,
1844			     struct device_attribute *attr, char *buf)
1845{
1846	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1847
1848	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1849}
1850
1851static ssize_t rbd_name_show(struct device *dev,
1852			     struct device_attribute *attr, char *buf)
1853{
1854	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1855
1856	return sprintf(buf, "%s\n", rbd_dev->obj);
1857}
1858
1859static ssize_t rbd_snap_show(struct device *dev,
1860			     struct device_attribute *attr,
1861			     char *buf)
1862{
1863	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1864
1865	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1866}
1867
1868static ssize_t rbd_image_refresh(struct device *dev,
1869				 struct device_attribute *attr,
1870				 const char *buf,
1871				 size_t size)
1872{
1873	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874	int rc;
1875	int ret = size;
1876
1877	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1878
1879	rc = __rbd_refresh_header(rbd_dev);
1880	if (rc < 0)
1881		ret = rc;
1882
1883	mutex_unlock(&ctl_mutex);
1884	return ret;
1885}
1886
1887static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1888static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1889static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1890static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1891static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1892static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1893static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1894static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1895
1896static struct attribute *rbd_attrs[] = {
1897	&dev_attr_size.attr,
1898	&dev_attr_major.attr,
1899	&dev_attr_client_id.attr,
1900	&dev_attr_pool.attr,
1901	&dev_attr_name.attr,
1902	&dev_attr_current_snap.attr,
1903	&dev_attr_refresh.attr,
1904	&dev_attr_create_snap.attr,
1905	NULL
1906};
1907
1908static struct attribute_group rbd_attr_group = {
1909	.attrs = rbd_attrs,
1910};
1911
1912static const struct attribute_group *rbd_attr_groups[] = {
1913	&rbd_attr_group,
1914	NULL
1915};
1916
1917static void rbd_sysfs_dev_release(struct device *dev)
1918{
1919}
1920
1921static struct device_type rbd_device_type = {
1922	.name		= "rbd",
1923	.groups		= rbd_attr_groups,
1924	.release	= rbd_sysfs_dev_release,
1925};
1926
1927
1928/*
1929  sysfs - snapshots
1930*/
1931
1932static ssize_t rbd_snap_size_show(struct device *dev,
1933				  struct device_attribute *attr,
1934				  char *buf)
1935{
1936	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1937
1938	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1939}
1940
1941static ssize_t rbd_snap_id_show(struct device *dev,
1942				struct device_attribute *attr,
1943				char *buf)
1944{
1945	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1946
1947	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1948}
1949
1950static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1951static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1952
1953static struct attribute *rbd_snap_attrs[] = {
1954	&dev_attr_snap_size.attr,
1955	&dev_attr_snap_id.attr,
1956	NULL,
1957};
1958
1959static struct attribute_group rbd_snap_attr_group = {
1960	.attrs = rbd_snap_attrs,
1961};
1962
1963static void rbd_snap_dev_release(struct device *dev)
1964{
1965	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966	kfree(snap->name);
1967	kfree(snap);
1968}
1969
1970static const struct attribute_group *rbd_snap_attr_groups[] = {
1971	&rbd_snap_attr_group,
1972	NULL
1973};
1974
1975static struct device_type rbd_snap_device_type = {
1976	.groups		= rbd_snap_attr_groups,
1977	.release	= rbd_snap_dev_release,
1978};
1979
1980static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1981				  struct rbd_snap *snap)
1982{
1983	list_del(&snap->node);
1984	device_unregister(&snap->dev);
1985}
1986
1987static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1988				  struct rbd_snap *snap,
1989				  struct device *parent)
1990{
1991	struct device *dev = &snap->dev;
1992	int ret;
1993
1994	dev->type = &rbd_snap_device_type;
1995	dev->parent = parent;
1996	dev->release = rbd_snap_dev_release;
1997	dev_set_name(dev, "snap_%s", snap->name);
1998	ret = device_register(dev);
1999
2000	return ret;
2001}
2002
2003static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2004			      int i, const char *name,
2005			      struct rbd_snap **snapp)
2006{
2007	int ret;
2008	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2009	if (!snap)
2010		return -ENOMEM;
2011	snap->name = kstrdup(name, GFP_KERNEL);
2012	snap->size = rbd_dev->header.snap_sizes[i];
2013	snap->id = rbd_dev->header.snapc->snaps[i];
2014	if (device_is_registered(&rbd_dev->dev)) {
2015		ret = rbd_register_snap_dev(rbd_dev, snap,
2016					     &rbd_dev->dev);
2017		if (ret < 0)
2018			goto err;
2019	}
2020	*snapp = snap;
2021	return 0;
2022err:
2023	kfree(snap->name);
2024	kfree(snap);
2025	return ret;
2026}
2027
2028/*
2029 * search for the previous snap in a null delimited string list
2030 */
2031const char *rbd_prev_snap_name(const char *name, const char *start)
2032{
2033	if (name < start + 2)
2034		return NULL;
2035
2036	name -= 2;
2037	while (*name) {
2038		if (name == start)
2039			return start;
2040		name--;
2041	}
2042	return name + 1;
2043}
2044
2045/*
2046 * compare the old list of snapshots that we have to what's in the header
2047 * and update it accordingly. Note that the header holds the snapshots
2048 * in a reverse order (from newest to oldest) and we need to go from
2049 * older to new so that we don't get a duplicate snap name when
2050 * doing the process (e.g., removed snapshot and recreated a new
2051 * one with the same name.
2052 */
2053static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2054{
2055	const char *name, *first_name;
2056	int i = rbd_dev->header.total_snaps;
2057	struct rbd_snap *snap, *old_snap = NULL;
2058	int ret;
2059	struct list_head *p, *n;
2060
2061	first_name = rbd_dev->header.snap_names;
2062	name = first_name + rbd_dev->header.snap_names_len;
2063
2064	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2065		u64 cur_id;
2066
2067		old_snap = list_entry(p, struct rbd_snap, node);
2068
2069		if (i)
2070			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2071
2072		if (!i || old_snap->id < cur_id) {
2073			/* old_snap->id was skipped, thus was removed */
2074			__rbd_remove_snap_dev(rbd_dev, old_snap);
2075			continue;
2076		}
2077		if (old_snap->id == cur_id) {
2078			/* we have this snapshot already */
2079			i--;
2080			name = rbd_prev_snap_name(name, first_name);
2081			continue;
2082		}
2083		for (; i > 0;
2084		     i--, name = rbd_prev_snap_name(name, first_name)) {
2085			if (!name) {
2086				WARN_ON(1);
2087				return -EINVAL;
2088			}
2089			cur_id = rbd_dev->header.snapc->snaps[i];
2090			/* snapshot removal? handle it above */
2091			if (cur_id >= old_snap->id)
2092				break;
2093			/* a new snapshot */
2094			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2095			if (ret < 0)
2096				return ret;
2097
2098			/* note that we add it backward so using n and not p */
2099			list_add(&snap->node, n);
2100			p = &snap->node;
2101		}
2102	}
2103	/* we're done going over the old snap list, just add what's left */
2104	for (; i > 0; i--) {
2105		name = rbd_prev_snap_name(name, first_name);
2106		if (!name) {
2107			WARN_ON(1);
2108			return -EINVAL;
2109		}
2110		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2111		if (ret < 0)
2112			return ret;
2113		list_add(&snap->node, &rbd_dev->snaps);
2114	}
2115
2116	return 0;
2117}
2118
2119static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2120{
2121	int ret;
2122	struct device *dev;
2123	struct rbd_snap *snap;
2124
2125	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2126	dev = &rbd_dev->dev;
2127
2128	dev->bus = &rbd_bus_type;
2129	dev->type = &rbd_device_type;
2130	dev->parent = &rbd_root_dev;
2131	dev->release = rbd_dev_release;
2132	dev_set_name(dev, "%d", rbd_dev->id);
2133	ret = device_register(dev);
2134	if (ret < 0)
2135		goto out;
2136
2137	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2138		ret = rbd_register_snap_dev(rbd_dev, snap,
2139					     &rbd_dev->dev);
2140		if (ret < 0)
2141			break;
2142	}
2143out:
2144	mutex_unlock(&ctl_mutex);
2145	return ret;
2146}
2147
2148static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2149{
2150	device_unregister(&rbd_dev->dev);
2151}
2152
2153static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2154{
2155	int ret, rc;
2156
2157	do {
2158		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2159					 rbd_dev->header.obj_version);
2160		if (ret == -ERANGE) {
2161			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2162			rc = __rbd_refresh_header(rbd_dev);
2163			mutex_unlock(&ctl_mutex);
2164			if (rc < 0)
2165				return rc;
2166		}
2167	} while (ret == -ERANGE);
2168
2169	return ret;
2170}
2171
2172static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2173
2174/*
2175 * Get a unique rbd identifier for the given new rbd_dev, and add
2176 * the rbd_dev to the global list.  The minimum rbd id is 1.
2177 */
2178static void rbd_id_get(struct rbd_device *rbd_dev)
2179{
2180	rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2181
2182	spin_lock(&rbd_dev_list_lock);
2183	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2184	spin_unlock(&rbd_dev_list_lock);
2185}
2186
2187/*
2188 * Remove an rbd_dev from the global list, and record that its
2189 * identifier is no longer in use.
2190 */
2191static void rbd_id_put(struct rbd_device *rbd_dev)
2192{
2193	struct list_head *tmp;
2194	int rbd_id = rbd_dev->id;
2195	int max_id;
2196
2197	BUG_ON(rbd_id < 1);
2198
2199	spin_lock(&rbd_dev_list_lock);
2200	list_del_init(&rbd_dev->node);
2201
2202	/*
2203	 * If the id being "put" is not the current maximum, there
2204	 * is nothing special we need to do.
2205	 */
2206	if (rbd_id != atomic64_read(&rbd_id_max)) {
2207		spin_unlock(&rbd_dev_list_lock);
2208		return;
2209	}
2210
2211	/*
2212	 * We need to update the current maximum id.  Search the
2213	 * list to find out what it is.  We're more likely to find
2214	 * the maximum at the end, so search the list backward.
2215	 */
2216	max_id = 0;
2217	list_for_each_prev(tmp, &rbd_dev_list) {
2218		struct rbd_device *rbd_dev;
2219
2220		rbd_dev = list_entry(tmp, struct rbd_device, node);
2221		if (rbd_id > max_id)
2222			max_id = rbd_id;
2223	}
2224	spin_unlock(&rbd_dev_list_lock);
2225
2226	/*
2227	 * The max id could have been updated by rbd_id_get(), in
2228	 * which case it now accurately reflects the new maximum.
2229	 * Be careful not to overwrite the maximum value in that
2230	 * case.
2231	 */
2232	atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2233}
2234
2235/*
2236 * Skips over white space at *buf, and updates *buf to point to the
2237 * first found non-space character (if any). Returns the length of
2238 * the token (string of non-white space characters) found.  Note
2239 * that *buf must be terminated with '\0'.
2240 */
2241static inline size_t next_token(const char **buf)
2242{
2243        /*
2244        * These are the characters that produce nonzero for
2245        * isspace() in the "C" and "POSIX" locales.
2246        */
2247        const char *spaces = " \f\n\r\t\v";
2248
2249        *buf += strspn(*buf, spaces);	/* Find start of token */
2250
2251	return strcspn(*buf, spaces);   /* Return token length */
2252}
2253
2254/*
2255 * Finds the next token in *buf, and if the provided token buffer is
2256 * big enough, copies the found token into it.  The result, if
2257 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2258 * must be terminated with '\0' on entry.
2259 *
2260 * Returns the length of the token found (not including the '\0').
2261 * Return value will be 0 if no token is found, and it will be >=
2262 * token_size if the token would not fit.
2263 *
2264 * The *buf pointer will be updated to point beyond the end of the
2265 * found token.  Note that this occurs even if the token buffer is
2266 * too small to hold it.
2267 */
2268static inline size_t copy_token(const char **buf,
2269				char *token,
2270				size_t token_size)
2271{
2272        size_t len;
2273
2274	len = next_token(buf);
2275	if (len < token_size) {
2276		memcpy(token, *buf, len);
2277		*(token + len) = '\0';
2278	}
2279	*buf += len;
2280
2281        return len;
2282}
2283
2284/*
2285 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2286 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2287 * on the list of monitor addresses and other options provided via
2288 * /sys/bus/rbd/add.
2289 */
2290static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2291			      const char *buf,
2292			      const char **mon_addrs,
2293			      size_t *mon_addrs_size,
2294			      char *options,
2295			      size_t options_size)
2296{
2297	size_t	len;
2298
2299	/* The first four tokens are required */
2300
2301	len = next_token(&buf);
2302	if (!len)
2303		return -EINVAL;
2304	*mon_addrs_size = len + 1;
2305	*mon_addrs = buf;
2306
2307	buf += len;
2308
2309	len = copy_token(&buf, options, options_size);
2310	if (!len || len >= options_size)
2311		return -EINVAL;
2312
2313	len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2314	if (!len || len >= sizeof (rbd_dev->pool_name))
2315		return -EINVAL;
2316
2317	len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2318	if (!len || len >= sizeof (rbd_dev->obj))
2319		return -EINVAL;
2320
2321	/* We have the object length in hand, save it. */
2322
2323	rbd_dev->obj_len = len;
2324
2325	BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2326				< RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2327	sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2328
2329	/*
2330	 * The snapshot name is optional, but it's an error if it's
2331	 * too long.  If no snapshot is supplied, fill in the default.
2332	 */
2333	len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2334	if (!len)
2335		memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2336			sizeof (RBD_SNAP_HEAD_NAME));
2337	else if (len >= sizeof (rbd_dev->snap_name))
2338		return -EINVAL;
2339
2340	return 0;
2341}
2342
2343static ssize_t rbd_add(struct bus_type *bus,
2344		       const char *buf,
2345		       size_t count)
2346{
2347	struct rbd_device *rbd_dev;
2348	const char *mon_addrs = NULL;
2349	size_t mon_addrs_size = 0;
2350	char *options = NULL;
2351	struct ceph_osd_client *osdc;
2352	int rc = -ENOMEM;
2353
2354	if (!try_module_get(THIS_MODULE))
2355		return -ENODEV;
2356
2357	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2358	if (!rbd_dev)
2359		goto err_nomem;
2360	options = kmalloc(count, GFP_KERNEL);
2361	if (!options)
2362		goto err_nomem;
2363
2364	/* static rbd_device initialization */
2365	spin_lock_init(&rbd_dev->lock);
2366	INIT_LIST_HEAD(&rbd_dev->node);
2367	INIT_LIST_HEAD(&rbd_dev->snaps);
2368	init_rwsem(&rbd_dev->header_rwsem);
2369
2370	init_rwsem(&rbd_dev->header_rwsem);
2371
2372	/* generate unique id: find highest unique id, add one */
2373	rbd_id_get(rbd_dev);
2374
2375	/* Fill in the device name, now that we have its id. */
2376	BUILD_BUG_ON(DEV_NAME_LEN
2377			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2378	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2379
2380	/* parse add command */
2381	rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2382				options, count);
2383	if (rc)
2384		goto err_put_id;
2385
2386	rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2387						options);
2388	if (IS_ERR(rbd_dev->rbd_client)) {
2389		rc = PTR_ERR(rbd_dev->rbd_client);
2390		goto err_put_id;
2391	}
2392
2393	/* pick the pool */
2394	osdc = &rbd_dev->rbd_client->client->osdc;
2395	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2396	if (rc < 0)
2397		goto err_out_client;
2398	rbd_dev->poolid = rc;
2399
2400	/* register our block device */
2401	rc = register_blkdev(0, rbd_dev->name);
2402	if (rc < 0)
2403		goto err_out_client;
2404	rbd_dev->major = rc;
2405
2406	rc = rbd_bus_add_dev(rbd_dev);
2407	if (rc)
2408		goto err_out_blkdev;
2409
2410	/*
2411	 * At this point cleanup in the event of an error is the job
2412	 * of the sysfs code (initiated by rbd_bus_del_dev()).
2413	 *
2414	 * Set up and announce blkdev mapping.
2415	 */
2416	rc = rbd_init_disk(rbd_dev);
2417	if (rc)
2418		goto err_out_bus;
2419
2420	rc = rbd_init_watch_dev(rbd_dev);
2421	if (rc)
2422		goto err_out_bus;
2423
2424	return count;
2425
2426err_out_bus:
2427	/* this will also clean up rest of rbd_dev stuff */
2428
2429	rbd_bus_del_dev(rbd_dev);
2430	kfree(options);
2431	return rc;
2432
2433err_out_blkdev:
2434	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2435err_out_client:
2436	rbd_put_client(rbd_dev);
2437err_put_id:
2438	rbd_id_put(rbd_dev);
2439err_nomem:
2440	kfree(options);
2441	kfree(rbd_dev);
2442
2443	dout("Error adding device %s\n", buf);
2444	module_put(THIS_MODULE);
2445
2446	return (ssize_t) rc;
2447}
2448
2449static struct rbd_device *__rbd_get_dev(unsigned long id)
2450{
2451	struct list_head *tmp;
2452	struct rbd_device *rbd_dev;
2453
2454	spin_lock(&rbd_dev_list_lock);
2455	list_for_each(tmp, &rbd_dev_list) {
2456		rbd_dev = list_entry(tmp, struct rbd_device, node);
2457		if (rbd_dev->id == id) {
2458			spin_unlock(&rbd_dev_list_lock);
2459			return rbd_dev;
2460		}
2461	}
2462	spin_unlock(&rbd_dev_list_lock);
2463	return NULL;
2464}
2465
2466static void rbd_dev_release(struct device *dev)
2467{
2468	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2469
2470	if (rbd_dev->watch_request) {
2471		struct ceph_client *client = rbd_dev->rbd_client->client;
2472
2473		ceph_osdc_unregister_linger_request(&client->osdc,
2474						    rbd_dev->watch_request);
2475	}
2476	if (rbd_dev->watch_event)
2477		rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2478
2479	rbd_put_client(rbd_dev);
2480
2481	/* clean up and free blkdev */
2482	rbd_free_disk(rbd_dev);
2483	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2484
2485	/* done with the id, and with the rbd_dev */
2486	rbd_id_put(rbd_dev);
2487	kfree(rbd_dev);
2488
2489	/* release module ref */
2490	module_put(THIS_MODULE);
2491}
2492
2493static ssize_t rbd_remove(struct bus_type *bus,
2494			  const char *buf,
2495			  size_t count)
2496{
2497	struct rbd_device *rbd_dev = NULL;
2498	int target_id, rc;
2499	unsigned long ul;
2500	int ret = count;
2501
2502	rc = strict_strtoul(buf, 10, &ul);
2503	if (rc)
2504		return rc;
2505
2506	/* convert to int; abort if we lost anything in the conversion */
2507	target_id = (int) ul;
2508	if (target_id != ul)
2509		return -EINVAL;
2510
2511	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2512
2513	rbd_dev = __rbd_get_dev(target_id);
2514	if (!rbd_dev) {
2515		ret = -ENOENT;
2516		goto done;
2517	}
2518
2519	__rbd_remove_all_snaps(rbd_dev);
2520	rbd_bus_del_dev(rbd_dev);
2521
2522done:
2523	mutex_unlock(&ctl_mutex);
2524	return ret;
2525}
2526
2527static ssize_t rbd_snap_add(struct device *dev,
2528			    struct device_attribute *attr,
2529			    const char *buf,
2530			    size_t count)
2531{
2532	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2533	int ret;
2534	char *name = kmalloc(count + 1, GFP_KERNEL);
2535	if (!name)
2536		return -ENOMEM;
2537
2538	snprintf(name, count, "%s", buf);
2539
2540	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2541
2542	ret = rbd_header_add_snap(rbd_dev,
2543				  name, GFP_KERNEL);
2544	if (ret < 0)
2545		goto err_unlock;
2546
2547	ret = __rbd_refresh_header(rbd_dev);
2548	if (ret < 0)
2549		goto err_unlock;
2550
2551	/* shouldn't hold ctl_mutex when notifying.. notify might
2552	   trigger a watch callback that would need to get that mutex */
2553	mutex_unlock(&ctl_mutex);
2554
2555	/* make a best effort, don't error if failed */
2556	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2557
2558	ret = count;
2559	kfree(name);
2560	return ret;
2561
2562err_unlock:
2563	mutex_unlock(&ctl_mutex);
2564	kfree(name);
2565	return ret;
2566}
2567
2568/*
2569 * create control files in sysfs
2570 * /sys/bus/rbd/...
2571 */
2572static int rbd_sysfs_init(void)
2573{
2574	int ret;
2575
2576	ret = device_register(&rbd_root_dev);
2577	if (ret < 0)
2578		return ret;
2579
2580	ret = bus_register(&rbd_bus_type);
2581	if (ret < 0)
2582		device_unregister(&rbd_root_dev);
2583
2584	return ret;
2585}
2586
2587static void rbd_sysfs_cleanup(void)
2588{
2589	bus_unregister(&rbd_bus_type);
2590	device_unregister(&rbd_root_dev);
2591}
2592
2593int __init rbd_init(void)
2594{
2595	int rc;
2596
2597	rc = rbd_sysfs_init();
2598	if (rc)
2599		return rc;
2600	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2601	return 0;
2602}
2603
2604void __exit rbd_exit(void)
2605{
2606	rbd_sysfs_cleanup();
2607}
2608
2609module_init(rbd_init);
2610module_exit(rbd_exit);
2611
2612MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2613MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2614MODULE_DESCRIPTION("rados block device");
2615
2616/* following authorship retained from original osdblk.c */
2617MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2618
2619MODULE_LICENSE("GPL");