Linux Audio

Check our new training course

Loading...
v3.1
   1/*
   2   rbd.c -- Export ceph rados objects as a Linux block device
   3
   4
   5   based on drivers/block/osdblk.c:
   6
   7   Copyright 2009 Red Hat, Inc.
   8
   9   This program is free software; you can redistribute it and/or modify
  10   it under the terms of the GNU General Public License as published by
  11   the Free Software Foundation.
  12
  13   This program is distributed in the hope that it will be useful,
  14   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16   GNU General Public License for more details.
  17
  18   You should have received a copy of the GNU General Public License
  19   along with this program; see the file COPYING.  If not, write to
  20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  21
  22
  23
  24   For usage instructions, please refer to:
  25
  26                 Documentation/ABI/testing/sysfs-bus-rbd
  27
  28 */
  29
  30#include <linux/ceph/libceph.h>
  31#include <linux/ceph/osd_client.h>
  32#include <linux/ceph/mon_client.h>
  33#include <linux/ceph/decode.h>
  34#include <linux/parser.h>
  35
  36#include <linux/kernel.h>
  37#include <linux/device.h>
  38#include <linux/module.h>
  39#include <linux/fs.h>
  40#include <linux/blkdev.h>
  41
  42#include "rbd_types.h"
  43
  44#define DRV_NAME "rbd"
  45#define DRV_NAME_LONG "rbd (rados block device)"
 
 
 
 
 
 
 
 
 
  46
  47#define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
  48
  49#define RBD_MAX_MD_NAME_LEN	(96 + sizeof(RBD_SUFFIX))
  50#define RBD_MAX_POOL_NAME_LEN	64
  51#define RBD_MAX_SNAP_NAME_LEN	32
  52#define RBD_MAX_OPT_LEN		1024
  53
  54#define RBD_SNAP_HEAD_NAME	"-"
  55
 
 
 
 
 
 
  56#define DEV_NAME_LEN		32
 
  57
  58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
  59
  60/*
  61 * block device image metadata (in-memory version)
  62 */
  63struct rbd_image_header {
  64	u64 image_size;
  65	char block_name[32];
  66	__u8 obj_order;
  67	__u8 crypt_type;
  68	__u8 comp_type;
  69	struct rw_semaphore snap_rwsem;
  70	struct ceph_snap_context *snapc;
  71	size_t snap_names_len;
  72	u64 snap_seq;
  73	u32 total_snaps;
  74
  75	char *snap_names;
  76	u64 *snap_sizes;
  77
  78	u64 obj_version;
  79};
  80
  81struct rbd_options {
  82	int	notify_timeout;
  83};
  84
  85/*
  86 * an instance of the client.  multiple devices may share a client.
  87 */
  88struct rbd_client {
  89	struct ceph_client	*client;
  90	struct rbd_options	*rbd_opts;
  91	struct kref		kref;
  92	struct list_head	node;
  93};
  94
  95struct rbd_req_coll;
  96
  97/*
  98 * a single io request
  99 */
 100struct rbd_request {
 101	struct request		*rq;		/* blk layer request */
 102	struct bio		*bio;		/* cloned bio */
 103	struct page		**pages;	/* list of used pages */
 104	u64			len;
 105	int			coll_index;
 106	struct rbd_req_coll	*coll;
 107};
 108
 109struct rbd_req_status {
 110	int done;
 111	int rc;
 112	u64 bytes;
 113};
 114
 115/*
 116 * a collection of requests
 117 */
 118struct rbd_req_coll {
 119	int			total;
 120	int			num_done;
 121	struct kref		kref;
 122	struct rbd_req_status	status[0];
 123};
 124
 
 
 
 
 
 
 
 
 
 
 
 
 125struct rbd_snap {
 126	struct	device		dev;
 127	const char		*name;
 128	size_t			size;
 129	struct list_head	node;
 130	u64			id;
 131};
 132
 133/*
 134 * a single device
 135 */
 136struct rbd_device {
 137	int			id;		/* blkdev unique id */
 138
 139	int			major;		/* blkdev assigned major */
 140	struct gendisk		*disk;		/* blkdev's gendisk and rq */
 141	struct request_queue	*q;
 142
 143	struct ceph_client	*client;
 144	struct rbd_client	*rbd_client;
 145
 146	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 147
 148	spinlock_t		lock;		/* queue lock */
 149
 150	struct rbd_image_header	header;
 151	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
 152	int			obj_len;
 153	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
 154	char			pool_name[RBD_MAX_POOL_NAME_LEN];
 155	int			poolid;
 156
 157	struct ceph_osd_event   *watch_event;
 158	struct ceph_osd_request *watch_request;
 159
 
 
 160	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
 161	u32 cur_snap;	/* index+1 of current snapshot within snap context
 162			   0 - for the head */
 163	int read_only;
 164
 165	struct list_head	node;
 166
 167	/* list of snapshots */
 168	struct list_head	snaps;
 169
 170	/* sysfs related */
 171	struct device		dev;
 172};
 173
 174static struct bus_type rbd_bus_type = {
 175	.name		= "rbd",
 176};
 177
 178static spinlock_t node_lock;      /* protects client get/put */
 179
 180static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
 
 181static LIST_HEAD(rbd_dev_list);    /* devices */
 182static LIST_HEAD(rbd_client_list);      /* clients */
 
 
 
 183
 184static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
 185static void rbd_dev_release(struct device *dev);
 186static ssize_t rbd_snap_rollback(struct device *dev,
 187				 struct device_attribute *attr,
 188				 const char *buf,
 189				 size_t size);
 190static ssize_t rbd_snap_add(struct device *dev,
 191			    struct device_attribute *attr,
 192			    const char *buf,
 193			    size_t count);
 194static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
 195				  struct rbd_snap *snap);;
 196
 
 
 
 
 197
 198static struct rbd_device *dev_to_rbd(struct device *dev)
 
 
 
 
 
 
 
 
 
 
 
 199{
 200	return container_of(dev, struct rbd_device, dev);
 201}
 202
 
 
 
 
 
 
 203static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 204{
 205	return get_device(&rbd_dev->dev);
 206}
 207
 208static void rbd_put_dev(struct rbd_device *rbd_dev)
 209{
 210	put_device(&rbd_dev->dev);
 211}
 212
 213static int __rbd_update_snaps(struct rbd_device *rbd_dev);
 214
 215static int rbd_open(struct block_device *bdev, fmode_t mode)
 216{
 217	struct gendisk *disk = bdev->bd_disk;
 218	struct rbd_device *rbd_dev = disk->private_data;
 219
 220	rbd_get_dev(rbd_dev);
 221
 222	set_device_ro(bdev, rbd_dev->read_only);
 223
 224	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
 225		return -EROFS;
 226
 227	return 0;
 228}
 229
 230static int rbd_release(struct gendisk *disk, fmode_t mode)
 231{
 232	struct rbd_device *rbd_dev = disk->private_data;
 233
 234	rbd_put_dev(rbd_dev);
 235
 236	return 0;
 237}
 238
 239static const struct block_device_operations rbd_bd_ops = {
 240	.owner			= THIS_MODULE,
 241	.open			= rbd_open,
 242	.release		= rbd_release,
 243};
 244
 245/*
 246 * Initialize an rbd client instance.
 247 * We own *opt.
 248 */
 249static struct rbd_client *rbd_client_create(struct ceph_options *opt,
 250					    struct rbd_options *rbd_opts)
 251{
 252	struct rbd_client *rbdc;
 253	int ret = -ENOMEM;
 254
 255	dout("rbd_client_create\n");
 256	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 257	if (!rbdc)
 258		goto out_opt;
 259
 260	kref_init(&rbdc->kref);
 261	INIT_LIST_HEAD(&rbdc->node);
 262
 263	rbdc->client = ceph_create_client(opt, rbdc);
 
 
 264	if (IS_ERR(rbdc->client))
 265		goto out_rbdc;
 266	opt = NULL; /* Now rbdc->client is responsible for opt */
 267
 268	ret = ceph_open_session(rbdc->client);
 269	if (ret < 0)
 270		goto out_err;
 271
 272	rbdc->rbd_opts = rbd_opts;
 273
 274	spin_lock(&node_lock);
 275	list_add_tail(&rbdc->node, &rbd_client_list);
 276	spin_unlock(&node_lock);
 
 
 277
 278	dout("rbd_client_create created %p\n", rbdc);
 279	return rbdc;
 280
 281out_err:
 282	ceph_destroy_client(rbdc->client);
 283out_rbdc:
 
 284	kfree(rbdc);
 285out_opt:
 286	if (opt)
 287		ceph_destroy_options(opt);
 288	return ERR_PTR(ret);
 289}
 290
 291/*
 292 * Find a ceph client with specific addr and configuration.
 293 */
 294static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
 295{
 296	struct rbd_client *client_node;
 297
 298	if (opt->flags & CEPH_OPT_NOSHARE)
 299		return NULL;
 300
 301	list_for_each_entry(client_node, &rbd_client_list, node)
 302		if (ceph_compare_options(opt, client_node->client) == 0)
 303			return client_node;
 304	return NULL;
 305}
 306
 307/*
 308 * mount options
 309 */
 310enum {
 311	Opt_notify_timeout,
 312	Opt_last_int,
 313	/* int args above */
 314	Opt_last_string,
 315	/* string args above */
 316};
 317
 318static match_table_t rbdopt_tokens = {
 319	{Opt_notify_timeout, "notify_timeout=%d"},
 320	/* int args above */
 321	/* string args above */
 322	{-1, NULL}
 323};
 324
 325static int parse_rbd_opts_token(char *c, void *private)
 326{
 327	struct rbd_options *rbdopt = private;
 328	substring_t argstr[MAX_OPT_ARGS];
 329	int token, intval, ret;
 330
 331	token = match_token((char *)c, rbdopt_tokens, argstr);
 332	if (token < 0)
 333		return -EINVAL;
 334
 335	if (token < Opt_last_int) {
 336		ret = match_int(&argstr[0], &intval);
 337		if (ret < 0) {
 338			pr_err("bad mount option arg (not int) "
 339			       "at '%s'\n", c);
 340			return ret;
 341		}
 342		dout("got int token %d val %d\n", token, intval);
 343	} else if (token > Opt_last_int && token < Opt_last_string) {
 344		dout("got string token %d val %s\n", token,
 345		     argstr[0].from);
 346	} else {
 347		dout("got token %d\n", token);
 348	}
 349
 350	switch (token) {
 351	case Opt_notify_timeout:
 352		rbdopt->notify_timeout = intval;
 353		break;
 354	default:
 355		BUG_ON(token);
 356	}
 357	return 0;
 358}
 359
 360/*
 361 * Get a ceph client with specific addr and configuration, if one does
 362 * not exist create it.
 363 */
 364static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
 365			  char *options)
 
 366{
 367	struct rbd_client *rbdc;
 368	struct ceph_options *opt;
 369	int ret;
 370	struct rbd_options *rbd_opts;
 371
 372	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
 373	if (!rbd_opts)
 374		return -ENOMEM;
 375
 376	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 377
 378	ret = ceph_parse_options(&opt, options, mon_addr,
 379				 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
 380	if (ret < 0)
 381		goto done_err;
 
 
 
 382
 383	spin_lock(&node_lock);
 384	rbdc = __rbd_client_find(opt);
 385	if (rbdc) {
 386		ceph_destroy_options(opt);
 387
 388		/* using an existing client */
 389		kref_get(&rbdc->kref);
 390		rbd_dev->rbd_client = rbdc;
 391		rbd_dev->client = rbdc->client;
 392		spin_unlock(&node_lock);
 393		return 0;
 
 
 394	}
 395	spin_unlock(&node_lock);
 396
 397	rbdc = rbd_client_create(opt, rbd_opts);
 398	if (IS_ERR(rbdc)) {
 399		ret = PTR_ERR(rbdc);
 400		goto done_err;
 401	}
 402
 403	rbd_dev->rbd_client = rbdc;
 404	rbd_dev->client = rbdc->client;
 405	return 0;
 406done_err:
 407	kfree(rbd_opts);
 408	return ret;
 409}
 410
 411/*
 412 * Destroy ceph client
 
 
 413 */
 414static void rbd_client_release(struct kref *kref)
 415{
 416	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 417
 418	dout("rbd_release_client %p\n", rbdc);
 419	spin_lock(&node_lock);
 420	list_del(&rbdc->node);
 421	spin_unlock(&node_lock);
 422
 423	ceph_destroy_client(rbdc->client);
 424	kfree(rbdc->rbd_opts);
 425	kfree(rbdc);
 426}
 427
 428/*
 429 * Drop reference to ceph client node. If it's not referenced anymore, release
 430 * it.
 431 */
 432static void rbd_put_client(struct rbd_device *rbd_dev)
 433{
 434	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
 435	rbd_dev->rbd_client = NULL;
 436	rbd_dev->client = NULL;
 437}
 438
 439/*
 440 * Destroy requests collection
 441 */
 442static void rbd_coll_release(struct kref *kref)
 443{
 444	struct rbd_req_coll *coll =
 445		container_of(kref, struct rbd_req_coll, kref);
 446
 447	dout("rbd_coll_release %p\n", coll);
 448	kfree(coll);
 449}
 450
 451/*
 452 * Create a new header structure, translate header format from the on-disk
 453 * header.
 454 */
 455static int rbd_header_from_disk(struct rbd_image_header *header,
 456				 struct rbd_image_header_ondisk *ondisk,
 457				 int allocated_snaps,
 458				 gfp_t gfp_flags)
 459{
 460	int i;
 461	u32 snap_count = le32_to_cpu(ondisk->snap_count);
 462	int ret = -ENOMEM;
 463
 464	init_rwsem(&header->snap_rwsem);
 465	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 
 
 
 
 
 466	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
 467				snap_count *
 468				 sizeof(struct rbd_image_snap_ondisk),
 469				gfp_flags);
 470	if (!header->snapc)
 471		return -ENOMEM;
 
 
 472	if (snap_count) {
 473		header->snap_names = kmalloc(header->snap_names_len,
 474					     GFP_KERNEL);
 475		if (!header->snap_names)
 476			goto err_snapc;
 477		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
 478					     GFP_KERNEL);
 479		if (!header->snap_sizes)
 480			goto err_names;
 481	} else {
 482		header->snap_names = NULL;
 483		header->snap_sizes = NULL;
 484	}
 485	memcpy(header->block_name, ondisk->block_name,
 486	       sizeof(ondisk->block_name));
 487
 488	header->image_size = le64_to_cpu(ondisk->image_size);
 489	header->obj_order = ondisk->options.order;
 490	header->crypt_type = ondisk->options.crypt_type;
 491	header->comp_type = ondisk->options.comp_type;
 492
 493	atomic_set(&header->snapc->nref, 1);
 494	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
 495	header->snapc->num_snaps = snap_count;
 496	header->total_snaps = snap_count;
 497
 498	if (snap_count &&
 499	    allocated_snaps == snap_count) {
 500		for (i = 0; i < snap_count; i++) {
 501			header->snapc->snaps[i] =
 502				le64_to_cpu(ondisk->snaps[i].id);
 503			header->snap_sizes[i] =
 504				le64_to_cpu(ondisk->snaps[i].image_size);
 505		}
 506
 507		/* copy snapshot names */
 508		memcpy(header->snap_names, &ondisk->snaps[i],
 509			header->snap_names_len);
 510	}
 511
 512	return 0;
 513
 514err_names:
 515	kfree(header->snap_names);
 516err_snapc:
 517	kfree(header->snapc);
 518	return ret;
 519}
 520
 521static int snap_index(struct rbd_image_header *header, int snap_num)
 522{
 523	return header->total_snaps - snap_num;
 524}
 525
 526static u64 cur_snap_id(struct rbd_device *rbd_dev)
 527{
 528	struct rbd_image_header *header = &rbd_dev->header;
 529
 530	if (!rbd_dev->cur_snap)
 531		return 0;
 532
 533	return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
 534}
 535
 536static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
 537			u64 *seq, u64 *size)
 538{
 539	int i;
 540	char *p = header->snap_names;
 541
 542	for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
 543		if (strcmp(snap_name, p) == 0)
 544			break;
 545	}
 546	if (i == header->total_snaps)
 547		return -ENOENT;
 548	if (seq)
 549		*seq = header->snapc->snaps[i];
 550
 551	if (size)
 552		*size = header->snap_sizes[i];
 553
 554	return i;
 
 
 
 
 
 
 
 
 555}
 556
 557static int rbd_header_set_snap(struct rbd_device *dev,
 558			       const char *snap_name,
 559			       u64 *size)
 560{
 561	struct rbd_image_header *header = &dev->header;
 562	struct ceph_snap_context *snapc = header->snapc;
 563	int ret = -ENOENT;
 564
 565	down_write(&header->snap_rwsem);
 566
 567	if (!snap_name ||
 568	    !*snap_name ||
 569	    strcmp(snap_name, "-") == 0 ||
 570	    strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
 571		if (header->total_snaps)
 572			snapc->seq = header->snap_seq;
 573		else
 574			snapc->seq = 0;
 575		dev->cur_snap = 0;
 576		dev->read_only = 0;
 577		if (size)
 578			*size = header->image_size;
 579	} else {
 580		ret = snap_by_name(header, snap_name, &snapc->seq, size);
 581		if (ret < 0)
 582			goto done;
 583
 584		dev->cur_snap = header->total_snaps - ret;
 585		dev->read_only = 1;
 586	}
 587
 588	ret = 0;
 589done:
 590	up_write(&header->snap_rwsem);
 591	return ret;
 592}
 593
 594static void rbd_header_free(struct rbd_image_header *header)
 595{
 596	kfree(header->snapc);
 597	kfree(header->snap_names);
 598	kfree(header->snap_sizes);
 599}
 600
 601/*
 602 * get the actual striped segment name, offset and length
 603 */
 604static u64 rbd_get_segment(struct rbd_image_header *header,
 605			   const char *block_name,
 606			   u64 ofs, u64 len,
 607			   char *seg_name, u64 *segofs)
 608{
 609	u64 seg = ofs >> header->obj_order;
 610
 611	if (seg_name)
 612		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
 613			 "%s.%012llx", block_name, seg);
 614
 615	ofs = ofs & ((1 << header->obj_order) - 1);
 616	len = min_t(u64, len, (1 << header->obj_order) - ofs);
 617
 618	if (segofs)
 619		*segofs = ofs;
 620
 621	return len;
 622}
 623
 624static int rbd_get_num_segments(struct rbd_image_header *header,
 625				u64 ofs, u64 len)
 626{
 627	u64 start_seg = ofs >> header->obj_order;
 628	u64 end_seg = (ofs + len - 1) >> header->obj_order;
 629	return end_seg - start_seg + 1;
 630}
 631
 632/*
 633 * returns the size of an object in the image
 634 */
 635static u64 rbd_obj_bytes(struct rbd_image_header *header)
 636{
 637	return 1 << header->obj_order;
 638}
 639
 640/*
 641 * bio helpers
 642 */
 643
 644static void bio_chain_put(struct bio *chain)
 645{
 646	struct bio *tmp;
 647
 648	while (chain) {
 649		tmp = chain;
 650		chain = chain->bi_next;
 651		bio_put(tmp);
 652	}
 653}
 654
 655/*
 656 * zeros a bio chain, starting at specific offset
 657 */
 658static void zero_bio_chain(struct bio *chain, int start_ofs)
 659{
 660	struct bio_vec *bv;
 661	unsigned long flags;
 662	void *buf;
 663	int i;
 664	int pos = 0;
 665
 666	while (chain) {
 667		bio_for_each_segment(bv, chain, i) {
 668			if (pos + bv->bv_len > start_ofs) {
 669				int remainder = max(start_ofs - pos, 0);
 670				buf = bvec_kmap_irq(bv, &flags);
 671				memset(buf + remainder, 0,
 672				       bv->bv_len - remainder);
 673				bvec_kunmap_irq(buf, &flags);
 674			}
 675			pos += bv->bv_len;
 676		}
 677
 678		chain = chain->bi_next;
 679	}
 680}
 681
 682/*
 683 * bio_chain_clone - clone a chain of bios up to a certain length.
 684 * might return a bio_pair that will need to be released.
 685 */
 686static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
 687				   struct bio_pair **bp,
 688				   int len, gfp_t gfpmask)
 689{
 690	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
 691	int total = 0;
 692
 693	if (*bp) {
 694		bio_pair_release(*bp);
 695		*bp = NULL;
 696	}
 697
 698	while (old_chain && (total < len)) {
 699		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
 700		if (!tmp)
 701			goto err_out;
 702
 703		if (total + old_chain->bi_size > len) {
 704			struct bio_pair *bp;
 705
 706			/*
 707			 * this split can only happen with a single paged bio,
 708			 * split_bio will BUG_ON if this is not the case
 709			 */
 710			dout("bio_chain_clone split! total=%d remaining=%d"
 711			     "bi_size=%d\n",
 712			     (int)total, (int)len-total,
 713			     (int)old_chain->bi_size);
 714
 715			/* split the bio. We'll release it either in the next
 716			   call, or it will have to be released outside */
 717			bp = bio_split(old_chain, (len - total) / 512ULL);
 718			if (!bp)
 719				goto err_out;
 720
 721			__bio_clone(tmp, &bp->bio1);
 722
 723			*next = &bp->bio2;
 724		} else {
 725			__bio_clone(tmp, old_chain);
 726			*next = old_chain->bi_next;
 727		}
 728
 729		tmp->bi_bdev = NULL;
 730		gfpmask &= ~__GFP_WAIT;
 731		tmp->bi_next = NULL;
 732
 733		if (!new_chain) {
 734			new_chain = tail = tmp;
 735		} else {
 736			tail->bi_next = tmp;
 737			tail = tmp;
 738		}
 739		old_chain = old_chain->bi_next;
 740
 741		total += tmp->bi_size;
 742	}
 743
 744	BUG_ON(total < len);
 745
 746	if (tail)
 747		tail->bi_next = NULL;
 748
 749	*old = old_chain;
 750
 751	return new_chain;
 752
 753err_out:
 754	dout("bio_chain_clone with err\n");
 755	bio_chain_put(new_chain);
 756	return NULL;
 757}
 758
 759/*
 760 * helpers for osd request op vectors.
 761 */
 762static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
 763			    int num_ops,
 764			    int opcode,
 765			    u32 payload_len)
 766{
 767	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
 768		       GFP_NOIO);
 769	if (!*ops)
 770		return -ENOMEM;
 771	(*ops)[0].op = opcode;
 772	/*
 773	 * op extent offset and length will be set later on
 774	 * in calc_raw_layout()
 775	 */
 776	(*ops)[0].payload_len = payload_len;
 777	return 0;
 778}
 779
 780static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
 781{
 782	kfree(ops);
 783}
 784
 785static void rbd_coll_end_req_index(struct request *rq,
 786				   struct rbd_req_coll *coll,
 787				   int index,
 788				   int ret, u64 len)
 789{
 790	struct request_queue *q;
 791	int min, max, i;
 792
 793	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
 794	     coll, index, ret, len);
 795
 796	if (!rq)
 797		return;
 798
 799	if (!coll) {
 800		blk_end_request(rq, ret, len);
 801		return;
 802	}
 803
 804	q = rq->q;
 805
 806	spin_lock_irq(q->queue_lock);
 807	coll->status[index].done = 1;
 808	coll->status[index].rc = ret;
 809	coll->status[index].bytes = len;
 810	max = min = coll->num_done;
 811	while (max < coll->total && coll->status[max].done)
 812		max++;
 813
 814	for (i = min; i<max; i++) {
 815		__blk_end_request(rq, coll->status[i].rc,
 816				  coll->status[i].bytes);
 817		coll->num_done++;
 818		kref_put(&coll->kref, rbd_coll_release);
 819	}
 820	spin_unlock_irq(q->queue_lock);
 821}
 822
 823static void rbd_coll_end_req(struct rbd_request *req,
 824			     int ret, u64 len)
 825{
 826	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
 827}
 828
 829/*
 830 * Send ceph osd request
 831 */
 832static int rbd_do_request(struct request *rq,
 833			  struct rbd_device *dev,
 834			  struct ceph_snap_context *snapc,
 835			  u64 snapid,
 836			  const char *obj, u64 ofs, u64 len,
 837			  struct bio *bio,
 838			  struct page **pages,
 839			  int num_pages,
 840			  int flags,
 841			  struct ceph_osd_req_op *ops,
 842			  int num_reply,
 843			  struct rbd_req_coll *coll,
 844			  int coll_index,
 845			  void (*rbd_cb)(struct ceph_osd_request *req,
 846					 struct ceph_msg *msg),
 847			  struct ceph_osd_request **linger_req,
 848			  u64 *ver)
 849{
 850	struct ceph_osd_request *req;
 851	struct ceph_file_layout *layout;
 852	int ret;
 853	u64 bno;
 854	struct timespec mtime = CURRENT_TIME;
 855	struct rbd_request *req_data;
 856	struct ceph_osd_request_head *reqhead;
 857	struct rbd_image_header *header = &dev->header;
 858
 859	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
 860	if (!req_data) {
 861		if (coll)
 862			rbd_coll_end_req_index(rq, coll, coll_index,
 863					       -ENOMEM, len);
 864		return -ENOMEM;
 865	}
 866
 867	if (coll) {
 868		req_data->coll = coll;
 869		req_data->coll_index = coll_index;
 870	}
 871
 872	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
 873
 874	down_read(&header->snap_rwsem);
 875
 876	req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
 877				      snapc,
 878				      ops,
 879				      false,
 880				      GFP_NOIO, pages, bio);
 881	if (!req) {
 882		up_read(&header->snap_rwsem);
 883		ret = -ENOMEM;
 884		goto done_pages;
 885	}
 886
 887	req->r_callback = rbd_cb;
 888
 889	req_data->rq = rq;
 890	req_data->bio = bio;
 891	req_data->pages = pages;
 892	req_data->len = len;
 893
 894	req->r_priv = req_data;
 895
 896	reqhead = req->r_request->front.iov_base;
 897	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
 898
 899	strncpy(req->r_oid, obj, sizeof(req->r_oid));
 900	req->r_oid_len = strlen(req->r_oid);
 901
 902	layout = &req->r_file_layout;
 903	memset(layout, 0, sizeof(*layout));
 904	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 905	layout->fl_stripe_count = cpu_to_le32(1);
 906	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 907	layout->fl_pg_preferred = cpu_to_le32(-1);
 908	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
 909	ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
 910			     ofs, &len, &bno, req, ops);
 911
 912	ceph_osdc_build_request(req, ofs, &len,
 913				ops,
 914				snapc,
 915				&mtime,
 916				req->r_oid, req->r_oid_len);
 917	up_read(&header->snap_rwsem);
 918
 919	if (linger_req) {
 920		ceph_osdc_set_request_linger(&dev->client->osdc, req);
 921		*linger_req = req;
 922	}
 923
 924	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
 925	if (ret < 0)
 926		goto done_err;
 927
 928	if (!rbd_cb) {
 929		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
 930		if (ver)
 931			*ver = le64_to_cpu(req->r_reassert_version.version);
 932		dout("reassert_ver=%lld\n",
 933		     le64_to_cpu(req->r_reassert_version.version));
 934		ceph_osdc_put_request(req);
 935	}
 936	return ret;
 937
 938done_err:
 939	bio_chain_put(req_data->bio);
 940	ceph_osdc_put_request(req);
 941done_pages:
 942	rbd_coll_end_req(req_data, ret, len);
 943	kfree(req_data);
 944	return ret;
 945}
 946
 947/*
 948 * Ceph osd op callback
 949 */
 950static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
 951{
 952	struct rbd_request *req_data = req->r_priv;
 953	struct ceph_osd_reply_head *replyhead;
 954	struct ceph_osd_op *op;
 955	__s32 rc;
 956	u64 bytes;
 957	int read_op;
 958
 959	/* parse reply */
 960	replyhead = msg->front.iov_base;
 961	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
 962	op = (void *)(replyhead + 1);
 963	rc = le32_to_cpu(replyhead->result);
 964	bytes = le64_to_cpu(op->extent.length);
 965	read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
 966
 967	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
 968
 969	if (rc == -ENOENT && read_op) {
 970		zero_bio_chain(req_data->bio, 0);
 971		rc = 0;
 972	} else if (rc == 0 && read_op && bytes < req_data->len) {
 973		zero_bio_chain(req_data->bio, bytes);
 974		bytes = req_data->len;
 975	}
 976
 977	rbd_coll_end_req(req_data, rc, bytes);
 978
 979	if (req_data->bio)
 980		bio_chain_put(req_data->bio);
 981
 982	ceph_osdc_put_request(req);
 983	kfree(req_data);
 984}
 985
 986static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
 987{
 988	ceph_osdc_put_request(req);
 989}
 990
 991/*
 992 * Do a synchronous ceph osd operation
 993 */
 994static int rbd_req_sync_op(struct rbd_device *dev,
 995			   struct ceph_snap_context *snapc,
 996			   u64 snapid,
 997			   int opcode,
 998			   int flags,
 999			   struct ceph_osd_req_op *orig_ops,
1000			   int num_reply,
1001			   const char *obj,
1002			   u64 ofs, u64 len,
1003			   char *buf,
1004			   struct ceph_osd_request **linger_req,
1005			   u64 *ver)
1006{
1007	int ret;
1008	struct page **pages;
1009	int num_pages;
1010	struct ceph_osd_req_op *ops = orig_ops;
1011	u32 payload_len;
1012
1013	num_pages = calc_pages_for(ofs , len);
1014	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1015	if (IS_ERR(pages))
1016		return PTR_ERR(pages);
1017
1018	if (!orig_ops) {
1019		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1020		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1021		if (ret < 0)
1022			goto done;
1023
1024		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1025			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1026			if (ret < 0)
1027				goto done_ops;
1028		}
1029	}
1030
1031	ret = rbd_do_request(NULL, dev, snapc, snapid,
1032			  obj, ofs, len, NULL,
1033			  pages, num_pages,
1034			  flags,
1035			  ops,
1036			  2,
1037			  NULL, 0,
1038			  NULL,
1039			  linger_req, ver);
1040	if (ret < 0)
1041		goto done_ops;
1042
1043	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1044		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1045
1046done_ops:
1047	if (!orig_ops)
1048		rbd_destroy_ops(ops);
1049done:
1050	ceph_release_page_vector(pages, num_pages);
1051	return ret;
1052}
1053
1054/*
1055 * Do an asynchronous ceph osd operation
1056 */
1057static int rbd_do_op(struct request *rq,
1058		     struct rbd_device *rbd_dev ,
1059		     struct ceph_snap_context *snapc,
1060		     u64 snapid,
1061		     int opcode, int flags, int num_reply,
1062		     u64 ofs, u64 len,
1063		     struct bio *bio,
1064		     struct rbd_req_coll *coll,
1065		     int coll_index)
1066{
1067	char *seg_name;
1068	u64 seg_ofs;
1069	u64 seg_len;
1070	int ret;
1071	struct ceph_osd_req_op *ops;
1072	u32 payload_len;
1073
1074	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1075	if (!seg_name)
1076		return -ENOMEM;
1077
1078	seg_len = rbd_get_segment(&rbd_dev->header,
1079				  rbd_dev->header.block_name,
1080				  ofs, len,
1081				  seg_name, &seg_ofs);
1082
1083	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1084
1085	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1086	if (ret < 0)
1087		goto done;
1088
1089	/* we've taken care of segment sizes earlier when we
1090	   cloned the bios. We should never have a segment
1091	   truncated at this point */
1092	BUG_ON(seg_len < len);
1093
1094	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1095			     seg_name, seg_ofs, seg_len,
1096			     bio,
1097			     NULL, 0,
1098			     flags,
1099			     ops,
1100			     num_reply,
1101			     coll, coll_index,
1102			     rbd_req_cb, 0, NULL);
1103
1104	rbd_destroy_ops(ops);
1105done:
1106	kfree(seg_name);
1107	return ret;
1108}
1109
1110/*
1111 * Request async osd write
1112 */
1113static int rbd_req_write(struct request *rq,
1114			 struct rbd_device *rbd_dev,
1115			 struct ceph_snap_context *snapc,
1116			 u64 ofs, u64 len,
1117			 struct bio *bio,
1118			 struct rbd_req_coll *coll,
1119			 int coll_index)
1120{
1121	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1122			 CEPH_OSD_OP_WRITE,
1123			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1124			 2,
1125			 ofs, len, bio, coll, coll_index);
1126}
1127
1128/*
1129 * Request async osd read
1130 */
1131static int rbd_req_read(struct request *rq,
1132			 struct rbd_device *rbd_dev,
1133			 u64 snapid,
1134			 u64 ofs, u64 len,
1135			 struct bio *bio,
1136			 struct rbd_req_coll *coll,
1137			 int coll_index)
1138{
1139	return rbd_do_op(rq, rbd_dev, NULL,
1140			 (snapid ? snapid : CEPH_NOSNAP),
1141			 CEPH_OSD_OP_READ,
1142			 CEPH_OSD_FLAG_READ,
1143			 2,
1144			 ofs, len, bio, coll, coll_index);
1145}
1146
1147/*
1148 * Request sync osd read
1149 */
1150static int rbd_req_sync_read(struct rbd_device *dev,
1151			  struct ceph_snap_context *snapc,
1152			  u64 snapid,
1153			  const char *obj,
1154			  u64 ofs, u64 len,
1155			  char *buf,
1156			  u64 *ver)
1157{
1158	return rbd_req_sync_op(dev, NULL,
1159			       (snapid ? snapid : CEPH_NOSNAP),
1160			       CEPH_OSD_OP_READ,
1161			       CEPH_OSD_FLAG_READ,
1162			       NULL,
1163			       1, obj, ofs, len, buf, NULL, ver);
1164}
1165
1166/*
1167 * Request sync osd watch
1168 */
1169static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1170				   u64 ver,
1171				   u64 notify_id,
1172				   const char *obj)
1173{
1174	struct ceph_osd_req_op *ops;
1175	struct page **pages = NULL;
1176	int ret;
1177
1178	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1179	if (ret < 0)
1180		return ret;
1181
1182	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1183	ops[0].watch.cookie = notify_id;
1184	ops[0].watch.flag = 0;
1185
1186	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1187			  obj, 0, 0, NULL,
1188			  pages, 0,
1189			  CEPH_OSD_FLAG_READ,
1190			  ops,
1191			  1,
1192			  NULL, 0,
1193			  rbd_simple_req_cb, 0, NULL);
1194
1195	rbd_destroy_ops(ops);
1196	return ret;
1197}
1198
1199static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1200{
1201	struct rbd_device *dev = (struct rbd_device *)data;
1202	int rc;
1203
1204	if (!dev)
1205		return;
1206
1207	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1208		notify_id, (int)opcode);
1209	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1210	rc = __rbd_update_snaps(dev);
1211	mutex_unlock(&ctl_mutex);
1212	if (rc)
1213		pr_warning(DRV_NAME "%d got notification but failed to update"
1214			   " snaps: %d\n", dev->major, rc);
1215
1216	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1217}
1218
1219/*
1220 * Request sync osd watch
1221 */
1222static int rbd_req_sync_watch(struct rbd_device *dev,
1223			      const char *obj,
1224			      u64 ver)
1225{
1226	struct ceph_osd_req_op *ops;
1227	struct ceph_osd_client *osdc = &dev->client->osdc;
1228
1229	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1230	if (ret < 0)
1231		return ret;
1232
1233	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1234				     (void *)dev, &dev->watch_event);
1235	if (ret < 0)
1236		goto fail;
1237
1238	ops[0].watch.ver = cpu_to_le64(ver);
1239	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1240	ops[0].watch.flag = 1;
1241
1242	ret = rbd_req_sync_op(dev, NULL,
1243			      CEPH_NOSNAP,
1244			      0,
1245			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1246			      ops,
1247			      1, obj, 0, 0, NULL,
1248			      &dev->watch_request, NULL);
1249
1250	if (ret < 0)
1251		goto fail_event;
1252
1253	rbd_destroy_ops(ops);
1254	return 0;
1255
1256fail_event:
1257	ceph_osdc_cancel_event(dev->watch_event);
1258	dev->watch_event = NULL;
1259fail:
1260	rbd_destroy_ops(ops);
1261	return ret;
1262}
1263
1264/*
1265 * Request sync osd unwatch
1266 */
1267static int rbd_req_sync_unwatch(struct rbd_device *dev,
1268				const char *obj)
1269{
1270	struct ceph_osd_req_op *ops;
1271
1272	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1273	if (ret < 0)
1274		return ret;
1275
1276	ops[0].watch.ver = 0;
1277	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1278	ops[0].watch.flag = 0;
1279
1280	ret = rbd_req_sync_op(dev, NULL,
1281			      CEPH_NOSNAP,
1282			      0,
1283			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1284			      ops,
1285			      1, obj, 0, 0, NULL, NULL, NULL);
1286
1287	rbd_destroy_ops(ops);
1288	ceph_osdc_cancel_event(dev->watch_event);
1289	dev->watch_event = NULL;
1290	return ret;
1291}
1292
1293struct rbd_notify_info {
1294	struct rbd_device *dev;
1295};
1296
1297static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1298{
1299	struct rbd_device *dev = (struct rbd_device *)data;
1300	if (!dev)
1301		return;
1302
1303	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1304		notify_id, (int)opcode);
1305}
1306
1307/*
1308 * Request sync osd notify
1309 */
1310static int rbd_req_sync_notify(struct rbd_device *dev,
1311		          const char *obj)
1312{
1313	struct ceph_osd_req_op *ops;
1314	struct ceph_osd_client *osdc = &dev->client->osdc;
1315	struct ceph_osd_event *event;
1316	struct rbd_notify_info info;
1317	int payload_len = sizeof(u32) + sizeof(u32);
1318	int ret;
1319
1320	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1321	if (ret < 0)
1322		return ret;
1323
1324	info.dev = dev;
1325
1326	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1327				     (void *)&info, &event);
1328	if (ret < 0)
1329		goto fail;
1330
1331	ops[0].watch.ver = 1;
1332	ops[0].watch.flag = 1;
1333	ops[0].watch.cookie = event->cookie;
1334	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1335	ops[0].watch.timeout = 12;
1336
1337	ret = rbd_req_sync_op(dev, NULL,
1338			       CEPH_NOSNAP,
1339			       0,
1340			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1341			       ops,
1342			       1, obj, 0, 0, NULL, NULL, NULL);
1343	if (ret < 0)
1344		goto fail_event;
1345
1346	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1347	dout("ceph_osdc_wait_event returned %d\n", ret);
1348	rbd_destroy_ops(ops);
1349	return 0;
1350
1351fail_event:
1352	ceph_osdc_cancel_event(event);
1353fail:
1354	rbd_destroy_ops(ops);
1355	return ret;
1356}
1357
1358/*
1359 * Request sync osd rollback
1360 */
1361static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1362				     u64 snapid,
1363				     const char *obj)
1364{
1365	struct ceph_osd_req_op *ops;
1366	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1367	if (ret < 0)
1368		return ret;
1369
1370	ops[0].snap.snapid = snapid;
1371
1372	ret = rbd_req_sync_op(dev, NULL,
1373			       CEPH_NOSNAP,
1374			       0,
1375			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1376			       ops,
1377			       1, obj, 0, 0, NULL, NULL, NULL);
1378
1379	rbd_destroy_ops(ops);
1380
1381	return ret;
1382}
1383
1384/*
1385 * Request sync osd read
1386 */
1387static int rbd_req_sync_exec(struct rbd_device *dev,
1388			     const char *obj,
1389			     const char *cls,
1390			     const char *method,
1391			     const char *data,
1392			     int len,
1393			     u64 *ver)
1394{
1395	struct ceph_osd_req_op *ops;
1396	int cls_len = strlen(cls);
1397	int method_len = strlen(method);
1398	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1399				    cls_len + method_len + len);
1400	if (ret < 0)
1401		return ret;
1402
1403	ops[0].cls.class_name = cls;
1404	ops[0].cls.class_len = (__u8)cls_len;
1405	ops[0].cls.method_name = method;
1406	ops[0].cls.method_len = (__u8)method_len;
1407	ops[0].cls.argc = 0;
1408	ops[0].cls.indata = data;
1409	ops[0].cls.indata_len = len;
1410
1411	ret = rbd_req_sync_op(dev, NULL,
1412			       CEPH_NOSNAP,
1413			       0,
1414			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1415			       ops,
1416			       1, obj, 0, 0, NULL, NULL, ver);
1417
1418	rbd_destroy_ops(ops);
1419
1420	dout("cls_exec returned %d\n", ret);
1421	return ret;
1422}
1423
1424static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1425{
1426	struct rbd_req_coll *coll =
1427			kzalloc(sizeof(struct rbd_req_coll) +
1428			        sizeof(struct rbd_req_status) * num_reqs,
1429				GFP_ATOMIC);
1430
1431	if (!coll)
1432		return NULL;
1433	coll->total = num_reqs;
1434	kref_init(&coll->kref);
1435	return coll;
1436}
1437
1438/*
1439 * block device queue callback
1440 */
1441static void rbd_rq_fn(struct request_queue *q)
1442{
1443	struct rbd_device *rbd_dev = q->queuedata;
1444	struct request *rq;
1445	struct bio_pair *bp = NULL;
1446
1447	rq = blk_fetch_request(q);
1448
1449	while (1) {
1450		struct bio *bio;
1451		struct bio *rq_bio, *next_bio = NULL;
1452		bool do_write;
1453		int size, op_size = 0;
1454		u64 ofs;
1455		int num_segs, cur_seg = 0;
1456		struct rbd_req_coll *coll;
1457
1458		/* peek at request from block layer */
1459		if (!rq)
1460			break;
1461
1462		dout("fetched request\n");
1463
1464		/* filter out block requests we don't understand */
1465		if ((rq->cmd_type != REQ_TYPE_FS)) {
1466			__blk_end_request_all(rq, 0);
1467			goto next;
1468		}
1469
1470		/* deduce our operation (read, write) */
1471		do_write = (rq_data_dir(rq) == WRITE);
1472
1473		size = blk_rq_bytes(rq);
1474		ofs = blk_rq_pos(rq) * 512ULL;
1475		rq_bio = rq->bio;
1476		if (do_write && rbd_dev->read_only) {
1477			__blk_end_request_all(rq, -EROFS);
1478			goto next;
1479		}
1480
1481		spin_unlock_irq(q->queue_lock);
1482
1483		dout("%s 0x%x bytes at 0x%llx\n",
1484		     do_write ? "write" : "read",
1485		     size, blk_rq_pos(rq) * 512ULL);
1486
1487		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1488		coll = rbd_alloc_coll(num_segs);
1489		if (!coll) {
1490			spin_lock_irq(q->queue_lock);
1491			__blk_end_request_all(rq, -ENOMEM);
1492			goto next;
1493		}
1494
1495		do {
1496			/* a bio clone to be passed down to OSD req */
1497			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1498			op_size = rbd_get_segment(&rbd_dev->header,
1499						  rbd_dev->header.block_name,
1500						  ofs, size,
1501						  NULL, NULL);
1502			kref_get(&coll->kref);
1503			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1504					      op_size, GFP_ATOMIC);
1505			if (!bio) {
1506				rbd_coll_end_req_index(rq, coll, cur_seg,
1507						       -ENOMEM, op_size);
1508				goto next_seg;
1509			}
1510
1511
1512			/* init OSD command: write or read */
1513			if (do_write)
1514				rbd_req_write(rq, rbd_dev,
1515					      rbd_dev->header.snapc,
1516					      ofs,
1517					      op_size, bio,
1518					      coll, cur_seg);
1519			else
1520				rbd_req_read(rq, rbd_dev,
1521					     cur_snap_id(rbd_dev),
1522					     ofs,
1523					     op_size, bio,
1524					     coll, cur_seg);
1525
1526next_seg:
1527			size -= op_size;
1528			ofs += op_size;
1529
1530			cur_seg++;
1531			rq_bio = next_bio;
1532		} while (size > 0);
1533		kref_put(&coll->kref, rbd_coll_release);
1534
1535		if (bp)
1536			bio_pair_release(bp);
1537		spin_lock_irq(q->queue_lock);
1538next:
1539		rq = blk_fetch_request(q);
1540	}
1541}
1542
1543/*
1544 * a queue callback. Makes sure that we don't create a bio that spans across
1545 * multiple osd objects. One exception would be with a single page bios,
1546 * which we handle later at bio_chain_clone
1547 */
1548static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1549			  struct bio_vec *bvec)
1550{
1551	struct rbd_device *rbd_dev = q->queuedata;
1552	unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1553	sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1554	unsigned int bio_sectors = bmd->bi_size >> 9;
1555	int max;
1556
 
 
 
 
1557	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1558				 + bio_sectors)) << 9;
1559	if (max < 0)
1560		max = 0; /* bio_add cannot handle a negative return */
1561	if (max <= bvec->bv_len && bio_sectors == 0)
1562		return bvec->bv_len;
1563	return max;
1564}
1565
1566static void rbd_free_disk(struct rbd_device *rbd_dev)
1567{
1568	struct gendisk *disk = rbd_dev->disk;
1569
1570	if (!disk)
1571		return;
1572
1573	rbd_header_free(&rbd_dev->header);
1574
1575	if (disk->flags & GENHD_FL_UP)
1576		del_gendisk(disk);
1577	if (disk->queue)
1578		blk_cleanup_queue(disk->queue);
1579	put_disk(disk);
1580}
1581
1582/*
1583 * reload the ondisk the header 
1584 */
1585static int rbd_read_header(struct rbd_device *rbd_dev,
1586			   struct rbd_image_header *header)
1587{
1588	ssize_t rc;
1589	struct rbd_image_header_ondisk *dh;
1590	int snap_count = 0;
1591	u64 snap_names_len = 0;
1592	u64 ver;
 
1593
 
 
 
 
 
 
1594	while (1) {
1595		int len = sizeof(*dh) +
1596			  snap_count * sizeof(struct rbd_image_snap_ondisk) +
1597			  snap_names_len;
1598
1599		rc = -ENOMEM;
1600		dh = kmalloc(len, GFP_KERNEL);
1601		if (!dh)
1602			return -ENOMEM;
1603
1604		rc = rbd_req_sync_read(rbd_dev,
1605				       NULL, CEPH_NOSNAP,
1606				       rbd_dev->obj_md_name,
1607				       0, len,
1608				       (char *)dh, &ver);
1609		if (rc < 0)
1610			goto out_dh;
1611
1612		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1613		if (rc < 0)
 
 
 
1614			goto out_dh;
1615
1616		if (snap_count != header->total_snaps) {
1617			snap_count = header->total_snaps;
1618			snap_names_len = header->snap_names_len;
1619			rbd_header_free(header);
1620			kfree(dh);
1621			continue;
1622		}
1623		break;
 
 
 
 
 
 
 
 
 
 
1624	}
1625	header->obj_version = ver;
1626
1627out_dh:
1628	kfree(dh);
1629	return rc;
1630}
1631
1632/*
1633 * create a snapshot
1634 */
1635static int rbd_header_add_snap(struct rbd_device *dev,
1636			       const char *snap_name,
1637			       gfp_t gfp_flags)
1638{
1639	int name_len = strlen(snap_name);
1640	u64 new_snapid;
1641	int ret;
1642	void *data, *p, *e;
1643	u64 ver;
 
1644
1645	/* we should create a snapshot only if we're pointing at the head */
1646	if (dev->cur_snap)
1647		return -EINVAL;
1648
1649	ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1650				      &new_snapid);
1651	dout("created snapid=%lld\n", new_snapid);
1652	if (ret < 0)
1653		return ret;
1654
1655	data = kmalloc(name_len + 16, gfp_flags);
1656	if (!data)
1657		return -ENOMEM;
1658
1659	p = data;
1660	e = data + name_len + 16;
1661
1662	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1663	ceph_encode_64_safe(&p, e, new_snapid, bad);
1664
1665	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1666				data, p - data, &ver);
1667
1668	kfree(data);
1669
1670	if (ret < 0)
1671		return ret;
1672
1673	dev->header.snapc->seq =  new_snapid;
 
 
1674
1675	return 0;
1676bad:
1677	return -ERANGE;
1678}
1679
1680static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1681{
1682	struct rbd_snap *snap;
1683
1684	while (!list_empty(&rbd_dev->snaps)) {
1685		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1686		__rbd_remove_snap_dev(rbd_dev, snap);
1687	}
1688}
1689
1690/*
1691 * only read the first part of the ondisk header, without the snaps info
1692 */
1693static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1694{
1695	int ret;
1696	struct rbd_image_header h;
1697	u64 snap_seq;
1698	int follow_seq = 0;
1699
1700	ret = rbd_read_header(rbd_dev, &h);
1701	if (ret < 0)
1702		return ret;
1703
1704	/* resized? */
1705	set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1706
1707	down_write(&rbd_dev->header.snap_rwsem);
1708
1709	snap_seq = rbd_dev->header.snapc->seq;
1710	if (rbd_dev->header.total_snaps &&
1711	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1712		/* pointing at the head, will need to follow that
1713		   if head moves */
1714		follow_seq = 1;
1715
1716	kfree(rbd_dev->header.snapc);
1717	kfree(rbd_dev->header.snap_names);
1718	kfree(rbd_dev->header.snap_sizes);
1719
1720	rbd_dev->header.total_snaps = h.total_snaps;
1721	rbd_dev->header.snapc = h.snapc;
1722	rbd_dev->header.snap_names = h.snap_names;
1723	rbd_dev->header.snap_names_len = h.snap_names_len;
1724	rbd_dev->header.snap_sizes = h.snap_sizes;
1725	if (follow_seq)
1726		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1727	else
1728		rbd_dev->header.snapc->seq = snap_seq;
1729
1730	ret = __rbd_init_snaps_header(rbd_dev);
1731
1732	up_write(&rbd_dev->header.snap_rwsem);
1733
1734	return ret;
1735}
1736
1737static int rbd_init_disk(struct rbd_device *rbd_dev)
1738{
1739	struct gendisk *disk;
1740	struct request_queue *q;
1741	int rc;
 
1742	u64 total_size = 0;
1743
1744	/* contact OSD, request size info about the object being mapped */
1745	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1746	if (rc)
1747		return rc;
1748
1749	/* no need to lock here, as rbd_dev is not registered yet */
1750	rc = __rbd_init_snaps_header(rbd_dev);
1751	if (rc)
1752		return rc;
1753
1754	rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1755	if (rc)
1756		return rc;
1757
1758	/* create gendisk info */
1759	rc = -ENOMEM;
1760	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1761	if (!disk)
1762		goto out;
1763
1764	snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1765		 rbd_dev->id);
1766	disk->major = rbd_dev->major;
1767	disk->first_minor = 0;
1768	disk->fops = &rbd_bd_ops;
1769	disk->private_data = rbd_dev;
1770
1771	/* init rq */
1772	rc = -ENOMEM;
1773	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1774	if (!q)
1775		goto out_disk;
1776
 
 
 
1777	/* set io sizes to object size */
1778	blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1779	blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1780	blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1781	blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
 
1782
1783	blk_queue_merge_bvec(q, rbd_merge_bvec);
1784	disk->queue = q;
1785
1786	q->queuedata = rbd_dev;
1787
1788	rbd_dev->disk = disk;
1789	rbd_dev->q = q;
1790
1791	/* finally, announce the disk to the world */
1792	set_capacity(disk, total_size / 512ULL);
1793	add_disk(disk);
1794
1795	pr_info("%s: added with size 0x%llx\n",
1796		disk->disk_name, (unsigned long long)total_size);
1797	return 0;
1798
1799out_disk:
1800	put_disk(disk);
1801out:
1802	return rc;
1803}
1804
1805/*
1806  sysfs
1807*/
1808
 
 
 
 
 
1809static ssize_t rbd_size_show(struct device *dev,
1810			     struct device_attribute *attr, char *buf)
1811{
1812	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1813
1814	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1815}
1816
1817static ssize_t rbd_major_show(struct device *dev,
1818			      struct device_attribute *attr, char *buf)
1819{
1820	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1821
1822	return sprintf(buf, "%d\n", rbd_dev->major);
1823}
1824
1825static ssize_t rbd_client_id_show(struct device *dev,
1826				  struct device_attribute *attr, char *buf)
1827{
1828	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1829
1830	return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
 
1831}
1832
1833static ssize_t rbd_pool_show(struct device *dev,
1834			     struct device_attribute *attr, char *buf)
1835{
1836	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1837
1838	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1839}
1840
1841static ssize_t rbd_name_show(struct device *dev,
1842			     struct device_attribute *attr, char *buf)
1843{
1844	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1845
1846	return sprintf(buf, "%s\n", rbd_dev->obj);
1847}
1848
1849static ssize_t rbd_snap_show(struct device *dev,
1850			     struct device_attribute *attr,
1851			     char *buf)
1852{
1853	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1854
1855	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1856}
1857
1858static ssize_t rbd_image_refresh(struct device *dev,
1859				 struct device_attribute *attr,
1860				 const char *buf,
1861				 size_t size)
1862{
1863	struct rbd_device *rbd_dev = dev_to_rbd(dev);
1864	int rc;
1865	int ret = size;
1866
1867	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1868
1869	rc = __rbd_update_snaps(rbd_dev);
1870	if (rc < 0)
1871		ret = rc;
1872
1873	mutex_unlock(&ctl_mutex);
1874	return ret;
1875}
1876
1877static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1878static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1879static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1880static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1881static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1882static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1883static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1884static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1885static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1886
1887static struct attribute *rbd_attrs[] = {
1888	&dev_attr_size.attr,
1889	&dev_attr_major.attr,
1890	&dev_attr_client_id.attr,
1891	&dev_attr_pool.attr,
1892	&dev_attr_name.attr,
1893	&dev_attr_current_snap.attr,
1894	&dev_attr_refresh.attr,
1895	&dev_attr_create_snap.attr,
1896	&dev_attr_rollback_snap.attr,
1897	NULL
1898};
1899
1900static struct attribute_group rbd_attr_group = {
1901	.attrs = rbd_attrs,
1902};
1903
1904static const struct attribute_group *rbd_attr_groups[] = {
1905	&rbd_attr_group,
1906	NULL
1907};
1908
1909static void rbd_sysfs_dev_release(struct device *dev)
1910{
1911}
1912
1913static struct device_type rbd_device_type = {
1914	.name		= "rbd",
1915	.groups		= rbd_attr_groups,
1916	.release	= rbd_sysfs_dev_release,
1917};
1918
1919
1920/*
1921  sysfs - snapshots
1922*/
1923
1924static ssize_t rbd_snap_size_show(struct device *dev,
1925				  struct device_attribute *attr,
1926				  char *buf)
1927{
1928	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1929
1930	return sprintf(buf, "%lld\n", (long long)snap->size);
1931}
1932
1933static ssize_t rbd_snap_id_show(struct device *dev,
1934				struct device_attribute *attr,
1935				char *buf)
1936{
1937	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1938
1939	return sprintf(buf, "%lld\n", (long long)snap->id);
1940}
1941
1942static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1943static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1944
1945static struct attribute *rbd_snap_attrs[] = {
1946	&dev_attr_snap_size.attr,
1947	&dev_attr_snap_id.attr,
1948	NULL,
1949};
1950
1951static struct attribute_group rbd_snap_attr_group = {
1952	.attrs = rbd_snap_attrs,
1953};
1954
1955static void rbd_snap_dev_release(struct device *dev)
1956{
1957	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1958	kfree(snap->name);
1959	kfree(snap);
1960}
1961
1962static const struct attribute_group *rbd_snap_attr_groups[] = {
1963	&rbd_snap_attr_group,
1964	NULL
1965};
1966
1967static struct device_type rbd_snap_device_type = {
1968	.groups		= rbd_snap_attr_groups,
1969	.release	= rbd_snap_dev_release,
1970};
1971
1972static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1973				  struct rbd_snap *snap)
1974{
1975	list_del(&snap->node);
1976	device_unregister(&snap->dev);
1977}
1978
1979static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1980				  struct rbd_snap *snap,
1981				  struct device *parent)
1982{
1983	struct device *dev = &snap->dev;
1984	int ret;
1985
1986	dev->type = &rbd_snap_device_type;
1987	dev->parent = parent;
1988	dev->release = rbd_snap_dev_release;
1989	dev_set_name(dev, "snap_%s", snap->name);
1990	ret = device_register(dev);
1991
1992	return ret;
1993}
1994
1995static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1996			      int i, const char *name,
1997			      struct rbd_snap **snapp)
1998{
1999	int ret;
2000	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2001	if (!snap)
2002		return -ENOMEM;
2003	snap->name = kstrdup(name, GFP_KERNEL);
2004	snap->size = rbd_dev->header.snap_sizes[i];
2005	snap->id = rbd_dev->header.snapc->snaps[i];
2006	if (device_is_registered(&rbd_dev->dev)) {
2007		ret = rbd_register_snap_dev(rbd_dev, snap,
2008					     &rbd_dev->dev);
2009		if (ret < 0)
2010			goto err;
2011	}
2012	*snapp = snap;
2013	return 0;
2014err:
2015	kfree(snap->name);
2016	kfree(snap);
2017	return ret;
2018}
2019
2020/*
2021 * search for the previous snap in a null delimited string list
2022 */
2023const char *rbd_prev_snap_name(const char *name, const char *start)
2024{
2025	if (name < start + 2)
2026		return NULL;
2027
2028	name -= 2;
2029	while (*name) {
2030		if (name == start)
2031			return start;
2032		name--;
2033	}
2034	return name + 1;
2035}
2036
2037/*
2038 * compare the old list of snapshots that we have to what's in the header
2039 * and update it accordingly. Note that the header holds the snapshots
2040 * in a reverse order (from newest to oldest) and we need to go from
2041 * older to new so that we don't get a duplicate snap name when
2042 * doing the process (e.g., removed snapshot and recreated a new
2043 * one with the same name.
2044 */
2045static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2046{
2047	const char *name, *first_name;
2048	int i = rbd_dev->header.total_snaps;
2049	struct rbd_snap *snap, *old_snap = NULL;
2050	int ret;
2051	struct list_head *p, *n;
2052
2053	first_name = rbd_dev->header.snap_names;
2054	name = first_name + rbd_dev->header.snap_names_len;
2055
2056	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2057		u64 cur_id;
2058
2059		old_snap = list_entry(p, struct rbd_snap, node);
2060
2061		if (i)
2062			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2063
2064		if (!i || old_snap->id < cur_id) {
2065			/* old_snap->id was skipped, thus was removed */
2066			__rbd_remove_snap_dev(rbd_dev, old_snap);
2067			continue;
2068		}
2069		if (old_snap->id == cur_id) {
2070			/* we have this snapshot already */
2071			i--;
2072			name = rbd_prev_snap_name(name, first_name);
2073			continue;
2074		}
2075		for (; i > 0;
2076		     i--, name = rbd_prev_snap_name(name, first_name)) {
2077			if (!name) {
2078				WARN_ON(1);
2079				return -EINVAL;
2080			}
2081			cur_id = rbd_dev->header.snapc->snaps[i];
2082			/* snapshot removal? handle it above */
2083			if (cur_id >= old_snap->id)
2084				break;
2085			/* a new snapshot */
2086			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2087			if (ret < 0)
2088				return ret;
2089
2090			/* note that we add it backward so using n and not p */
2091			list_add(&snap->node, n);
2092			p = &snap->node;
2093		}
2094	}
2095	/* we're done going over the old snap list, just add what's left */
2096	for (; i > 0; i--) {
2097		name = rbd_prev_snap_name(name, first_name);
2098		if (!name) {
2099			WARN_ON(1);
2100			return -EINVAL;
2101		}
2102		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2103		if (ret < 0)
2104			return ret;
2105		list_add(&snap->node, &rbd_dev->snaps);
2106	}
2107
2108	return 0;
2109}
2110
2111
2112static void rbd_root_dev_release(struct device *dev)
2113{
2114}
2115
2116static struct device rbd_root_dev = {
2117	.init_name =    "rbd",
2118	.release =      rbd_root_dev_release,
2119};
2120
2121static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2122{
2123	int ret = -ENOMEM;
2124	struct device *dev;
2125	struct rbd_snap *snap;
2126
2127	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2128	dev = &rbd_dev->dev;
2129
2130	dev->bus = &rbd_bus_type;
2131	dev->type = &rbd_device_type;
2132	dev->parent = &rbd_root_dev;
2133	dev->release = rbd_dev_release;
2134	dev_set_name(dev, "%d", rbd_dev->id);
2135	ret = device_register(dev);
2136	if (ret < 0)
2137		goto done_free;
2138
2139	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2140		ret = rbd_register_snap_dev(rbd_dev, snap,
2141					     &rbd_dev->dev);
2142		if (ret < 0)
2143			break;
2144	}
2145
2146	mutex_unlock(&ctl_mutex);
2147	return 0;
2148done_free:
2149	mutex_unlock(&ctl_mutex);
2150	return ret;
2151}
2152
2153static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2154{
2155	device_unregister(&rbd_dev->dev);
2156}
2157
2158static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2159{
2160	int ret, rc;
2161
2162	do {
2163		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2164					 rbd_dev->header.obj_version);
2165		if (ret == -ERANGE) {
2166			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2167			rc = __rbd_update_snaps(rbd_dev);
2168			mutex_unlock(&ctl_mutex);
2169			if (rc < 0)
2170				return rc;
2171		}
2172	} while (ret == -ERANGE);
2173
2174	return ret;
2175}
2176
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2177static ssize_t rbd_add(struct bus_type *bus,
2178		       const char *buf,
2179		       size_t count)
2180{
2181	struct ceph_osd_client *osdc;
2182	struct rbd_device *rbd_dev;
2183	ssize_t rc = -ENOMEM;
2184	int irc, new_id = 0;
2185	struct list_head *tmp;
2186	char *mon_dev_name;
2187	char *options;
2188
2189	if (!try_module_get(THIS_MODULE))
2190		return -ENODEV;
2191
2192	mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2193	if (!mon_dev_name)
2194		goto err_out_mod;
2195
2196	options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2197	if (!options)
2198		goto err_mon_dev;
2199
2200	/* new rbd_device object */
2201	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2202	if (!rbd_dev)
2203		goto err_out_opt;
 
 
 
2204
2205	/* static rbd_device initialization */
2206	spin_lock_init(&rbd_dev->lock);
2207	INIT_LIST_HEAD(&rbd_dev->node);
2208	INIT_LIST_HEAD(&rbd_dev->snaps);
 
2209
2210	/* generate unique id: find highest unique id, add one */
2211	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2212
2213	list_for_each(tmp, &rbd_dev_list) {
2214		struct rbd_device *rbd_dev;
2215
2216		rbd_dev = list_entry(tmp, struct rbd_device, node);
2217		if (rbd_dev->id >= new_id)
2218			new_id = rbd_dev->id + 1;
2219	}
2220
2221	rbd_dev->id = new_id;
2222
2223	/* add to global list */
2224	list_add_tail(&rbd_dev->node, &rbd_dev_list);
 
 
2225
2226	/* parse add command */
2227	if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2228		   "%" __stringify(RBD_MAX_OPT_LEN) "s "
2229		   "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2230		   "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2231		   "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2232		   mon_dev_name, options, rbd_dev->pool_name,
2233		   rbd_dev->obj, rbd_dev->snap_name) < 4) {
2234		rc = -EINVAL;
2235		goto err_out_slot;
2236	}
2237
2238	if (rbd_dev->snap_name[0] == 0)
2239		rbd_dev->snap_name[0] = '-';
2240
2241	rbd_dev->obj_len = strlen(rbd_dev->obj);
2242	snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2243		 rbd_dev->obj, RBD_SUFFIX);
2244
2245	/* initialize rest of new object */
2246	snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2247	rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2248	if (rc < 0)
2249		goto err_out_slot;
2250
2251	mutex_unlock(&ctl_mutex);
 
 
 
 
 
2252
2253	/* pick the pool */
2254	osdc = &rbd_dev->client->osdc;
2255	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2256	if (rc < 0)
2257		goto err_out_client;
2258	rbd_dev->poolid = rc;
2259
2260	/* register our block device */
2261	irc = register_blkdev(0, rbd_dev->name);
2262	if (irc < 0) {
2263		rc = irc;
2264		goto err_out_client;
2265	}
2266	rbd_dev->major = irc;
2267
2268	rc = rbd_bus_add_dev(rbd_dev);
2269	if (rc)
2270		goto err_out_blkdev;
2271
2272	/* set up and announce blkdev mapping */
 
 
 
 
 
2273	rc = rbd_init_disk(rbd_dev);
2274	if (rc)
2275		goto err_out_bus;
2276
2277	rc = rbd_init_watch_dev(rbd_dev);
2278	if (rc)
2279		goto err_out_bus;
2280
2281	return count;
2282
2283err_out_bus:
2284	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2285	list_del_init(&rbd_dev->node);
2286	mutex_unlock(&ctl_mutex);
2287
2288	/* this will also clean up rest of rbd_dev stuff */
2289
2290	rbd_bus_del_dev(rbd_dev);
2291	kfree(options);
2292	kfree(mon_dev_name);
2293	return rc;
2294
2295err_out_blkdev:
2296	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2297err_out_client:
2298	rbd_put_client(rbd_dev);
2299	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2300err_out_slot:
2301	list_del_init(&rbd_dev->node);
2302	mutex_unlock(&ctl_mutex);
2303
2304	kfree(rbd_dev);
2305err_out_opt:
2306	kfree(options);
2307err_mon_dev:
2308	kfree(mon_dev_name);
2309err_out_mod:
2310	dout("Error adding device %s\n", buf);
2311	module_put(THIS_MODULE);
2312	return rc;
 
2313}
2314
2315static struct rbd_device *__rbd_get_dev(unsigned long id)
2316{
2317	struct list_head *tmp;
2318	struct rbd_device *rbd_dev;
2319
 
2320	list_for_each(tmp, &rbd_dev_list) {
2321		rbd_dev = list_entry(tmp, struct rbd_device, node);
2322		if (rbd_dev->id == id)
 
2323			return rbd_dev;
 
2324	}
 
2325	return NULL;
2326}
2327
2328static void rbd_dev_release(struct device *dev)
2329{
2330	struct rbd_device *rbd_dev =
2331			container_of(dev, struct rbd_device, dev);
2332
2333	if (rbd_dev->watch_request)
2334		ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
 
 
2335						    rbd_dev->watch_request);
 
2336	if (rbd_dev->watch_event)
2337		rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2338
2339	rbd_put_client(rbd_dev);
2340
2341	/* clean up and free blkdev */
2342	rbd_free_disk(rbd_dev);
2343	unregister_blkdev(rbd_dev->major, rbd_dev->name);
 
 
 
2344	kfree(rbd_dev);
2345
2346	/* release module ref */
2347	module_put(THIS_MODULE);
2348}
2349
2350static ssize_t rbd_remove(struct bus_type *bus,
2351			  const char *buf,
2352			  size_t count)
2353{
2354	struct rbd_device *rbd_dev = NULL;
2355	int target_id, rc;
2356	unsigned long ul;
2357	int ret = count;
2358
2359	rc = strict_strtoul(buf, 10, &ul);
2360	if (rc)
2361		return rc;
2362
2363	/* convert to int; abort if we lost anything in the conversion */
2364	target_id = (int) ul;
2365	if (target_id != ul)
2366		return -EINVAL;
2367
2368	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2369
2370	rbd_dev = __rbd_get_dev(target_id);
2371	if (!rbd_dev) {
2372		ret = -ENOENT;
2373		goto done;
2374	}
2375
2376	list_del_init(&rbd_dev->node);
2377
2378	__rbd_remove_all_snaps(rbd_dev);
2379	rbd_bus_del_dev(rbd_dev);
2380
2381done:
2382	mutex_unlock(&ctl_mutex);
2383	return ret;
2384}
2385
2386static ssize_t rbd_snap_add(struct device *dev,
2387			    struct device_attribute *attr,
2388			    const char *buf,
2389			    size_t count)
2390{
2391	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2392	int ret;
2393	char *name = kmalloc(count + 1, GFP_KERNEL);
2394	if (!name)
2395		return -ENOMEM;
2396
2397	snprintf(name, count, "%s", buf);
2398
2399	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2400
2401	ret = rbd_header_add_snap(rbd_dev,
2402				  name, GFP_KERNEL);
2403	if (ret < 0)
2404		goto err_unlock;
2405
2406	ret = __rbd_update_snaps(rbd_dev);
2407	if (ret < 0)
2408		goto err_unlock;
2409
2410	/* shouldn't hold ctl_mutex when notifying.. notify might
2411	   trigger a watch callback that would need to get that mutex */
2412	mutex_unlock(&ctl_mutex);
2413
2414	/* make a best effort, don't error if failed */
2415	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2416
2417	ret = count;
2418	kfree(name);
2419	return ret;
2420
2421err_unlock:
2422	mutex_unlock(&ctl_mutex);
2423	kfree(name);
2424	return ret;
2425}
2426
2427static ssize_t rbd_snap_rollback(struct device *dev,
2428				 struct device_attribute *attr,
2429				 const char *buf,
2430				 size_t count)
2431{
2432	struct rbd_device *rbd_dev = dev_to_rbd(dev);
2433	int ret;
2434	u64 snapid;
2435	u64 cur_ofs;
2436	char *seg_name = NULL;
2437	char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2438	ret = -ENOMEM;
2439	if (!snap_name)
2440		return ret;
2441
2442	/* parse snaps add command */
2443	snprintf(snap_name, count, "%s", buf);
2444	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2445	if (!seg_name)
2446		goto done;
2447
2448	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2449
2450	ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2451	if (ret < 0)
2452		goto done_unlock;
2453
2454	dout("snapid=%lld\n", snapid);
2455
2456	cur_ofs = 0;
2457	while (cur_ofs < rbd_dev->header.image_size) {
2458		cur_ofs += rbd_get_segment(&rbd_dev->header,
2459					   rbd_dev->obj,
2460					   cur_ofs, (u64)-1,
2461					   seg_name, NULL);
2462		dout("seg_name=%s\n", seg_name);
2463
2464		ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2465		if (ret < 0)
2466			pr_warning("could not roll back obj %s err=%d\n",
2467				   seg_name, ret);
2468	}
2469
2470	ret = __rbd_update_snaps(rbd_dev);
2471	if (ret < 0)
2472		goto done_unlock;
2473
2474	ret = count;
2475
2476done_unlock:
2477	mutex_unlock(&ctl_mutex);
2478done:
2479	kfree(seg_name);
2480	kfree(snap_name);
2481
2482	return ret;
2483}
2484
2485static struct bus_attribute rbd_bus_attrs[] = {
2486	__ATTR(add, S_IWUSR, NULL, rbd_add),
2487	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
2488	__ATTR_NULL
2489};
2490
2491/*
2492 * create control files in sysfs
2493 * /sys/bus/rbd/...
2494 */
2495static int rbd_sysfs_init(void)
2496{
2497	int ret;
2498
2499	rbd_bus_type.bus_attrs = rbd_bus_attrs;
2500
2501	ret = bus_register(&rbd_bus_type);
2502	 if (ret < 0)
2503		return ret;
2504
2505	ret = device_register(&rbd_root_dev);
 
 
2506
2507	return ret;
2508}
2509
2510static void rbd_sysfs_cleanup(void)
2511{
2512	device_unregister(&rbd_root_dev);
2513	bus_unregister(&rbd_bus_type);
 
2514}
2515
2516int __init rbd_init(void)
2517{
2518	int rc;
2519
2520	rc = rbd_sysfs_init();
2521	if (rc)
2522		return rc;
2523	spin_lock_init(&node_lock);
2524	pr_info("loaded " DRV_NAME_LONG "\n");
2525	return 0;
2526}
2527
2528void __exit rbd_exit(void)
2529{
2530	rbd_sysfs_cleanup();
2531}
2532
2533module_init(rbd_init);
2534module_exit(rbd_exit);
2535
2536MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2537MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2538MODULE_DESCRIPTION("rados block device");
2539
2540/* following authorship retained from original osdblk.c */
2541MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2542
2543MODULE_LICENSE("GPL");
v3.5.6
   1/*
   2   rbd.c -- Export ceph rados objects as a Linux block device
   3
   4
   5   based on drivers/block/osdblk.c:
   6
   7   Copyright 2009 Red Hat, Inc.
   8
   9   This program is free software; you can redistribute it and/or modify
  10   it under the terms of the GNU General Public License as published by
  11   the Free Software Foundation.
  12
  13   This program is distributed in the hope that it will be useful,
  14   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16   GNU General Public License for more details.
  17
  18   You should have received a copy of the GNU General Public License
  19   along with this program; see the file COPYING.  If not, write to
  20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  21
  22
  23
  24   For usage instructions, please refer to:
  25
  26                 Documentation/ABI/testing/sysfs-bus-rbd
  27
  28 */
  29
  30#include <linux/ceph/libceph.h>
  31#include <linux/ceph/osd_client.h>
  32#include <linux/ceph/mon_client.h>
  33#include <linux/ceph/decode.h>
  34#include <linux/parser.h>
  35
  36#include <linux/kernel.h>
  37#include <linux/device.h>
  38#include <linux/module.h>
  39#include <linux/fs.h>
  40#include <linux/blkdev.h>
  41
  42#include "rbd_types.h"
  43
  44/*
  45 * The basic unit of block I/O is a sector.  It is interpreted in a
  46 * number of contexts in Linux (blk, bio, genhd), but the default is
  47 * universally 512 bytes.  These symbols are just slightly more
  48 * meaningful than the bare numbers they represent.
  49 */
  50#define	SECTOR_SHIFT	9
  51#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
  52
  53#define RBD_DRV_NAME "rbd"
  54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
  55
  56#define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
  57
  58#define RBD_MAX_MD_NAME_LEN	(RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
  59#define RBD_MAX_POOL_NAME_LEN	64
  60#define RBD_MAX_SNAP_NAME_LEN	32
  61#define RBD_MAX_OPT_LEN		1024
  62
  63#define RBD_SNAP_HEAD_NAME	"-"
  64
  65/*
  66 * An RBD device name will be "rbd#", where the "rbd" comes from
  67 * RBD_DRV_NAME above, and # is a unique integer identifier.
  68 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
  69 * enough to hold all possible device names.
  70 */
  71#define DEV_NAME_LEN		32
  72#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
  73
  74#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
  75
  76/*
  77 * block device image metadata (in-memory version)
  78 */
  79struct rbd_image_header {
  80	u64 image_size;
  81	char block_name[32];
  82	__u8 obj_order;
  83	__u8 crypt_type;
  84	__u8 comp_type;
 
  85	struct ceph_snap_context *snapc;
  86	size_t snap_names_len;
  87	u64 snap_seq;
  88	u32 total_snaps;
  89
  90	char *snap_names;
  91	u64 *snap_sizes;
  92
  93	u64 obj_version;
  94};
  95
  96struct rbd_options {
  97	int	notify_timeout;
  98};
  99
 100/*
 101 * an instance of the client.  multiple devices may share an rbd client.
 102 */
 103struct rbd_client {
 104	struct ceph_client	*client;
 105	struct rbd_options	*rbd_opts;
 106	struct kref		kref;
 107	struct list_head	node;
 108};
 109
 
 
 110/*
 111 * a request completion status
 112 */
 
 
 
 
 
 
 
 
 
 113struct rbd_req_status {
 114	int done;
 115	int rc;
 116	u64 bytes;
 117};
 118
 119/*
 120 * a collection of requests
 121 */
 122struct rbd_req_coll {
 123	int			total;
 124	int			num_done;
 125	struct kref		kref;
 126	struct rbd_req_status	status[0];
 127};
 128
 129/*
 130 * a single io request
 131 */
 132struct rbd_request {
 133	struct request		*rq;		/* blk layer request */
 134	struct bio		*bio;		/* cloned bio */
 135	struct page		**pages;	/* list of used pages */
 136	u64			len;
 137	int			coll_index;
 138	struct rbd_req_coll	*coll;
 139};
 140
 141struct rbd_snap {
 142	struct	device		dev;
 143	const char		*name;
 144	u64			size;
 145	struct list_head	node;
 146	u64			id;
 147};
 148
 149/*
 150 * a single device
 151 */
 152struct rbd_device {
 153	int			id;		/* blkdev unique id */
 154
 155	int			major;		/* blkdev assigned major */
 156	struct gendisk		*disk;		/* blkdev's gendisk and rq */
 157	struct request_queue	*q;
 158
 
 159	struct rbd_client	*rbd_client;
 160
 161	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 162
 163	spinlock_t		lock;		/* queue lock */
 164
 165	struct rbd_image_header	header;
 166	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
 167	int			obj_len;
 168	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
 169	char			pool_name[RBD_MAX_POOL_NAME_LEN];
 170	int			poolid;
 171
 172	struct ceph_osd_event   *watch_event;
 173	struct ceph_osd_request *watch_request;
 174
 175	/* protects updating the header */
 176	struct rw_semaphore     header_rwsem;
 177	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
 178	u64                     snap_id;	/* current snapshot id */
 
 179	int read_only;
 180
 181	struct list_head	node;
 182
 183	/* list of snapshots */
 184	struct list_head	snaps;
 185
 186	/* sysfs related */
 187	struct device		dev;
 188};
 189
 
 
 
 
 
 
 190static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
 191
 192static LIST_HEAD(rbd_dev_list);    /* devices */
 193static DEFINE_SPINLOCK(rbd_dev_list_lock);
 194
 195static LIST_HEAD(rbd_client_list);		/* clients */
 196static DEFINE_SPINLOCK(rbd_client_list_lock);
 197
 198static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
 199static void rbd_dev_release(struct device *dev);
 
 
 
 
 200static ssize_t rbd_snap_add(struct device *dev,
 201			    struct device_attribute *attr,
 202			    const char *buf,
 203			    size_t count);
 204static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
 205				  struct rbd_snap *snap);
 206
 207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 208		       size_t count);
 209static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 210			  size_t count);
 211
 212static struct bus_attribute rbd_bus_attrs[] = {
 213	__ATTR(add, S_IWUSR, NULL, rbd_add),
 214	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
 215	__ATTR_NULL
 216};
 217
 218static struct bus_type rbd_bus_type = {
 219	.name		= "rbd",
 220	.bus_attrs	= rbd_bus_attrs,
 221};
 222
 223static void rbd_root_dev_release(struct device *dev)
 224{
 
 225}
 226
 227static struct device rbd_root_dev = {
 228	.init_name =    "rbd",
 229	.release =      rbd_root_dev_release,
 230};
 231
 232
 233static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 234{
 235	return get_device(&rbd_dev->dev);
 236}
 237
 238static void rbd_put_dev(struct rbd_device *rbd_dev)
 239{
 240	put_device(&rbd_dev->dev);
 241}
 242
 243static int __rbd_refresh_header(struct rbd_device *rbd_dev);
 244
 245static int rbd_open(struct block_device *bdev, fmode_t mode)
 246{
 247	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 
 248
 249	rbd_get_dev(rbd_dev);
 250
 251	set_device_ro(bdev, rbd_dev->read_only);
 252
 253	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
 254		return -EROFS;
 255
 256	return 0;
 257}
 258
 259static int rbd_release(struct gendisk *disk, fmode_t mode)
 260{
 261	struct rbd_device *rbd_dev = disk->private_data;
 262
 263	rbd_put_dev(rbd_dev);
 264
 265	return 0;
 266}
 267
 268static const struct block_device_operations rbd_bd_ops = {
 269	.owner			= THIS_MODULE,
 270	.open			= rbd_open,
 271	.release		= rbd_release,
 272};
 273
 274/*
 275 * Initialize an rbd client instance.
 276 * We own *opt.
 277 */
 278static struct rbd_client *rbd_client_create(struct ceph_options *opt,
 279					    struct rbd_options *rbd_opts)
 280{
 281	struct rbd_client *rbdc;
 282	int ret = -ENOMEM;
 283
 284	dout("rbd_client_create\n");
 285	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
 286	if (!rbdc)
 287		goto out_opt;
 288
 289	kref_init(&rbdc->kref);
 290	INIT_LIST_HEAD(&rbdc->node);
 291
 292	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 293
 294	rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
 295	if (IS_ERR(rbdc->client))
 296		goto out_mutex;
 297	opt = NULL; /* Now rbdc->client is responsible for opt */
 298
 299	ret = ceph_open_session(rbdc->client);
 300	if (ret < 0)
 301		goto out_err;
 302
 303	rbdc->rbd_opts = rbd_opts;
 304
 305	spin_lock(&rbd_client_list_lock);
 306	list_add_tail(&rbdc->node, &rbd_client_list);
 307	spin_unlock(&rbd_client_list_lock);
 308
 309	mutex_unlock(&ctl_mutex);
 310
 311	dout("rbd_client_create created %p\n", rbdc);
 312	return rbdc;
 313
 314out_err:
 315	ceph_destroy_client(rbdc->client);
 316out_mutex:
 317	mutex_unlock(&ctl_mutex);
 318	kfree(rbdc);
 319out_opt:
 320	if (opt)
 321		ceph_destroy_options(opt);
 322	return ERR_PTR(ret);
 323}
 324
 325/*
 326 * Find a ceph client with specific addr and configuration.
 327 */
 328static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
 329{
 330	struct rbd_client *client_node;
 331
 332	if (opt->flags & CEPH_OPT_NOSHARE)
 333		return NULL;
 334
 335	list_for_each_entry(client_node, &rbd_client_list, node)
 336		if (ceph_compare_options(opt, client_node->client) == 0)
 337			return client_node;
 338	return NULL;
 339}
 340
 341/*
 342 * mount options
 343 */
 344enum {
 345	Opt_notify_timeout,
 346	Opt_last_int,
 347	/* int args above */
 348	Opt_last_string,
 349	/* string args above */
 350};
 351
 352static match_table_t rbdopt_tokens = {
 353	{Opt_notify_timeout, "notify_timeout=%d"},
 354	/* int args above */
 355	/* string args above */
 356	{-1, NULL}
 357};
 358
 359static int parse_rbd_opts_token(char *c, void *private)
 360{
 361	struct rbd_options *rbdopt = private;
 362	substring_t argstr[MAX_OPT_ARGS];
 363	int token, intval, ret;
 364
 365	token = match_token(c, rbdopt_tokens, argstr);
 366	if (token < 0)
 367		return -EINVAL;
 368
 369	if (token < Opt_last_int) {
 370		ret = match_int(&argstr[0], &intval);
 371		if (ret < 0) {
 372			pr_err("bad mount option arg (not int) "
 373			       "at '%s'\n", c);
 374			return ret;
 375		}
 376		dout("got int token %d val %d\n", token, intval);
 377	} else if (token > Opt_last_int && token < Opt_last_string) {
 378		dout("got string token %d val %s\n", token,
 379		     argstr[0].from);
 380	} else {
 381		dout("got token %d\n", token);
 382	}
 383
 384	switch (token) {
 385	case Opt_notify_timeout:
 386		rbdopt->notify_timeout = intval;
 387		break;
 388	default:
 389		BUG_ON(token);
 390	}
 391	return 0;
 392}
 393
 394/*
 395 * Get a ceph client with specific addr and configuration, if one does
 396 * not exist create it.
 397 */
 398static struct rbd_client *rbd_get_client(const char *mon_addr,
 399					 size_t mon_addr_len,
 400					 char *options)
 401{
 402	struct rbd_client *rbdc;
 403	struct ceph_options *opt;
 
 404	struct rbd_options *rbd_opts;
 405
 406	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
 407	if (!rbd_opts)
 408		return ERR_PTR(-ENOMEM);
 409
 410	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 411
 412	opt = ceph_parse_options(options, mon_addr,
 413				mon_addr + mon_addr_len,
 414				parse_rbd_opts_token, rbd_opts);
 415	if (IS_ERR(opt)) {
 416		kfree(rbd_opts);
 417		return ERR_CAST(opt);
 418	}
 419
 420	spin_lock(&rbd_client_list_lock);
 421	rbdc = __rbd_client_find(opt);
 422	if (rbdc) {
 
 
 423		/* using an existing client */
 424		kref_get(&rbdc->kref);
 425		spin_unlock(&rbd_client_list_lock);
 426
 427		ceph_destroy_options(opt);
 428		kfree(rbd_opts);
 429
 430		return rbdc;
 431	}
 432	spin_unlock(&rbd_client_list_lock);
 433
 434	rbdc = rbd_client_create(opt, rbd_opts);
 
 
 
 
 435
 436	if (IS_ERR(rbdc))
 437		kfree(rbd_opts);
 438
 439	return rbdc;
 
 
 440}
 441
 442/*
 443 * Destroy ceph client
 444 *
 445 * Caller must hold rbd_client_list_lock.
 446 */
 447static void rbd_client_release(struct kref *kref)
 448{
 449	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 450
 451	dout("rbd_release_client %p\n", rbdc);
 452	spin_lock(&rbd_client_list_lock);
 453	list_del(&rbdc->node);
 454	spin_unlock(&rbd_client_list_lock);
 455
 456	ceph_destroy_client(rbdc->client);
 457	kfree(rbdc->rbd_opts);
 458	kfree(rbdc);
 459}
 460
 461/*
 462 * Drop reference to ceph client node. If it's not referenced anymore, release
 463 * it.
 464 */
 465static void rbd_put_client(struct rbd_device *rbd_dev)
 466{
 467	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
 468	rbd_dev->rbd_client = NULL;
 
 469}
 470
 471/*
 472 * Destroy requests collection
 473 */
 474static void rbd_coll_release(struct kref *kref)
 475{
 476	struct rbd_req_coll *coll =
 477		container_of(kref, struct rbd_req_coll, kref);
 478
 479	dout("rbd_coll_release %p\n", coll);
 480	kfree(coll);
 481}
 482
 483/*
 484 * Create a new header structure, translate header format from the on-disk
 485 * header.
 486 */
 487static int rbd_header_from_disk(struct rbd_image_header *header,
 488				 struct rbd_image_header_ondisk *ondisk,
 489				 u32 allocated_snaps,
 490				 gfp_t gfp_flags)
 491{
 492	u32 i, snap_count;
 
 
 493
 494	if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
 495		return -ENXIO;
 496
 497	snap_count = le32_to_cpu(ondisk->snap_count);
 498	if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
 499			 / sizeof (*ondisk))
 500		return -EINVAL;
 501	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
 502				snap_count * sizeof(u64),
 
 503				gfp_flags);
 504	if (!header->snapc)
 505		return -ENOMEM;
 506
 507	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 508	if (snap_count) {
 509		header->snap_names = kmalloc(header->snap_names_len,
 510					     gfp_flags);
 511		if (!header->snap_names)
 512			goto err_snapc;
 513		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
 514					     gfp_flags);
 515		if (!header->snap_sizes)
 516			goto err_names;
 517	} else {
 518		header->snap_names = NULL;
 519		header->snap_sizes = NULL;
 520	}
 521	memcpy(header->block_name, ondisk->block_name,
 522	       sizeof(ondisk->block_name));
 523
 524	header->image_size = le64_to_cpu(ondisk->image_size);
 525	header->obj_order = ondisk->options.order;
 526	header->crypt_type = ondisk->options.crypt_type;
 527	header->comp_type = ondisk->options.comp_type;
 528
 529	atomic_set(&header->snapc->nref, 1);
 530	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
 531	header->snapc->num_snaps = snap_count;
 532	header->total_snaps = snap_count;
 533
 534	if (snap_count && allocated_snaps == snap_count) {
 
 535		for (i = 0; i < snap_count; i++) {
 536			header->snapc->snaps[i] =
 537				le64_to_cpu(ondisk->snaps[i].id);
 538			header->snap_sizes[i] =
 539				le64_to_cpu(ondisk->snaps[i].image_size);
 540		}
 541
 542		/* copy snapshot names */
 543		memcpy(header->snap_names, &ondisk->snaps[i],
 544			header->snap_names_len);
 545	}
 546
 547	return 0;
 548
 549err_names:
 550	kfree(header->snap_names);
 551err_snapc:
 552	kfree(header->snapc);
 553	return -ENOMEM;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 554}
 555
 556static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
 557			u64 *seq, u64 *size)
 558{
 559	int i;
 560	char *p = header->snap_names;
 561
 562	for (i = 0; i < header->total_snaps; i++) {
 563		if (!strcmp(snap_name, p)) {
 
 
 
 
 
 
 564
 565			/* Found it.  Pass back its id and/or size */
 
 566
 567			if (seq)
 568				*seq = header->snapc->snaps[i];
 569			if (size)
 570				*size = header->snap_sizes[i];
 571			return i;
 572		}
 573		p += strlen(p) + 1;	/* Skip ahead to the next name */
 574	}
 575	return -ENOENT;
 576}
 577
 578static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
 
 
 579{
 580	struct rbd_image_header *header = &dev->header;
 581	struct ceph_snap_context *snapc = header->snapc;
 582	int ret = -ENOENT;
 583
 584	BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
 585
 586	down_write(&dev->header_rwsem);
 587
 588	if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
 589		    sizeof (RBD_SNAP_HEAD_NAME))) {
 590		if (header->total_snaps)
 591			snapc->seq = header->snap_seq;
 592		else
 593			snapc->seq = 0;
 594		dev->snap_id = CEPH_NOSNAP;
 595		dev->read_only = 0;
 596		if (size)
 597			*size = header->image_size;
 598	} else {
 599		ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
 600		if (ret < 0)
 601			goto done;
 602		dev->snap_id = snapc->seq;
 
 603		dev->read_only = 1;
 604	}
 605
 606	ret = 0;
 607done:
 608	up_write(&dev->header_rwsem);
 609	return ret;
 610}
 611
 612static void rbd_header_free(struct rbd_image_header *header)
 613{
 614	kfree(header->snapc);
 615	kfree(header->snap_names);
 616	kfree(header->snap_sizes);
 617}
 618
 619/*
 620 * get the actual striped segment name, offset and length
 621 */
 622static u64 rbd_get_segment(struct rbd_image_header *header,
 623			   const char *block_name,
 624			   u64 ofs, u64 len,
 625			   char *seg_name, u64 *segofs)
 626{
 627	u64 seg = ofs >> header->obj_order;
 628
 629	if (seg_name)
 630		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
 631			 "%s.%012llx", block_name, seg);
 632
 633	ofs = ofs & ((1 << header->obj_order) - 1);
 634	len = min_t(u64, len, (1 << header->obj_order) - ofs);
 635
 636	if (segofs)
 637		*segofs = ofs;
 638
 639	return len;
 640}
 641
 642static int rbd_get_num_segments(struct rbd_image_header *header,
 643				u64 ofs, u64 len)
 644{
 645	u64 start_seg = ofs >> header->obj_order;
 646	u64 end_seg = (ofs + len - 1) >> header->obj_order;
 647	return end_seg - start_seg + 1;
 648}
 649
 650/*
 651 * returns the size of an object in the image
 652 */
 653static u64 rbd_obj_bytes(struct rbd_image_header *header)
 654{
 655	return 1 << header->obj_order;
 656}
 657
 658/*
 659 * bio helpers
 660 */
 661
 662static void bio_chain_put(struct bio *chain)
 663{
 664	struct bio *tmp;
 665
 666	while (chain) {
 667		tmp = chain;
 668		chain = chain->bi_next;
 669		bio_put(tmp);
 670	}
 671}
 672
 673/*
 674 * zeros a bio chain, starting at specific offset
 675 */
 676static void zero_bio_chain(struct bio *chain, int start_ofs)
 677{
 678	struct bio_vec *bv;
 679	unsigned long flags;
 680	void *buf;
 681	int i;
 682	int pos = 0;
 683
 684	while (chain) {
 685		bio_for_each_segment(bv, chain, i) {
 686			if (pos + bv->bv_len > start_ofs) {
 687				int remainder = max(start_ofs - pos, 0);
 688				buf = bvec_kmap_irq(bv, &flags);
 689				memset(buf + remainder, 0,
 690				       bv->bv_len - remainder);
 691				bvec_kunmap_irq(buf, &flags);
 692			}
 693			pos += bv->bv_len;
 694		}
 695
 696		chain = chain->bi_next;
 697	}
 698}
 699
 700/*
 701 * bio_chain_clone - clone a chain of bios up to a certain length.
 702 * might return a bio_pair that will need to be released.
 703 */
 704static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
 705				   struct bio_pair **bp,
 706				   int len, gfp_t gfpmask)
 707{
 708	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
 709	int total = 0;
 710
 711	if (*bp) {
 712		bio_pair_release(*bp);
 713		*bp = NULL;
 714	}
 715
 716	while (old_chain && (total < len)) {
 717		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
 718		if (!tmp)
 719			goto err_out;
 720
 721		if (total + old_chain->bi_size > len) {
 722			struct bio_pair *bp;
 723
 724			/*
 725			 * this split can only happen with a single paged bio,
 726			 * split_bio will BUG_ON if this is not the case
 727			 */
 728			dout("bio_chain_clone split! total=%d remaining=%d"
 729			     "bi_size=%d\n",
 730			     (int)total, (int)len-total,
 731			     (int)old_chain->bi_size);
 732
 733			/* split the bio. We'll release it either in the next
 734			   call, or it will have to be released outside */
 735			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
 736			if (!bp)
 737				goto err_out;
 738
 739			__bio_clone(tmp, &bp->bio1);
 740
 741			*next = &bp->bio2;
 742		} else {
 743			__bio_clone(tmp, old_chain);
 744			*next = old_chain->bi_next;
 745		}
 746
 747		tmp->bi_bdev = NULL;
 748		gfpmask &= ~__GFP_WAIT;
 749		tmp->bi_next = NULL;
 750
 751		if (!new_chain) {
 752			new_chain = tail = tmp;
 753		} else {
 754			tail->bi_next = tmp;
 755			tail = tmp;
 756		}
 757		old_chain = old_chain->bi_next;
 758
 759		total += tmp->bi_size;
 760	}
 761
 762	BUG_ON(total < len);
 763
 764	if (tail)
 765		tail->bi_next = NULL;
 766
 767	*old = old_chain;
 768
 769	return new_chain;
 770
 771err_out:
 772	dout("bio_chain_clone with err\n");
 773	bio_chain_put(new_chain);
 774	return NULL;
 775}
 776
 777/*
 778 * helpers for osd request op vectors.
 779 */
 780static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
 781			    int num_ops,
 782			    int opcode,
 783			    u32 payload_len)
 784{
 785	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
 786		       GFP_NOIO);
 787	if (!*ops)
 788		return -ENOMEM;
 789	(*ops)[0].op = opcode;
 790	/*
 791	 * op extent offset and length will be set later on
 792	 * in calc_raw_layout()
 793	 */
 794	(*ops)[0].payload_len = payload_len;
 795	return 0;
 796}
 797
 798static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
 799{
 800	kfree(ops);
 801}
 802
 803static void rbd_coll_end_req_index(struct request *rq,
 804				   struct rbd_req_coll *coll,
 805				   int index,
 806				   int ret, u64 len)
 807{
 808	struct request_queue *q;
 809	int min, max, i;
 810
 811	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
 812	     coll, index, ret, len);
 813
 814	if (!rq)
 815		return;
 816
 817	if (!coll) {
 818		blk_end_request(rq, ret, len);
 819		return;
 820	}
 821
 822	q = rq->q;
 823
 824	spin_lock_irq(q->queue_lock);
 825	coll->status[index].done = 1;
 826	coll->status[index].rc = ret;
 827	coll->status[index].bytes = len;
 828	max = min = coll->num_done;
 829	while (max < coll->total && coll->status[max].done)
 830		max++;
 831
 832	for (i = min; i<max; i++) {
 833		__blk_end_request(rq, coll->status[i].rc,
 834				  coll->status[i].bytes);
 835		coll->num_done++;
 836		kref_put(&coll->kref, rbd_coll_release);
 837	}
 838	spin_unlock_irq(q->queue_lock);
 839}
 840
 841static void rbd_coll_end_req(struct rbd_request *req,
 842			     int ret, u64 len)
 843{
 844	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
 845}
 846
 847/*
 848 * Send ceph osd request
 849 */
 850static int rbd_do_request(struct request *rq,
 851			  struct rbd_device *dev,
 852			  struct ceph_snap_context *snapc,
 853			  u64 snapid,
 854			  const char *obj, u64 ofs, u64 len,
 855			  struct bio *bio,
 856			  struct page **pages,
 857			  int num_pages,
 858			  int flags,
 859			  struct ceph_osd_req_op *ops,
 860			  int num_reply,
 861			  struct rbd_req_coll *coll,
 862			  int coll_index,
 863			  void (*rbd_cb)(struct ceph_osd_request *req,
 864					 struct ceph_msg *msg),
 865			  struct ceph_osd_request **linger_req,
 866			  u64 *ver)
 867{
 868	struct ceph_osd_request *req;
 869	struct ceph_file_layout *layout;
 870	int ret;
 871	u64 bno;
 872	struct timespec mtime = CURRENT_TIME;
 873	struct rbd_request *req_data;
 874	struct ceph_osd_request_head *reqhead;
 875	struct ceph_osd_client *osdc;
 876
 877	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
 878	if (!req_data) {
 879		if (coll)
 880			rbd_coll_end_req_index(rq, coll, coll_index,
 881					       -ENOMEM, len);
 882		return -ENOMEM;
 883	}
 884
 885	if (coll) {
 886		req_data->coll = coll;
 887		req_data->coll_index = coll_index;
 888	}
 889
 890	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
 891
 892	down_read(&dev->header_rwsem);
 893
 894	osdc = &dev->rbd_client->client->osdc;
 895	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
 896					false, GFP_NOIO, pages, bio);
 
 
 897	if (!req) {
 898		up_read(&dev->header_rwsem);
 899		ret = -ENOMEM;
 900		goto done_pages;
 901	}
 902
 903	req->r_callback = rbd_cb;
 904
 905	req_data->rq = rq;
 906	req_data->bio = bio;
 907	req_data->pages = pages;
 908	req_data->len = len;
 909
 910	req->r_priv = req_data;
 911
 912	reqhead = req->r_request->front.iov_base;
 913	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
 914
 915	strncpy(req->r_oid, obj, sizeof(req->r_oid));
 916	req->r_oid_len = strlen(req->r_oid);
 917
 918	layout = &req->r_file_layout;
 919	memset(layout, 0, sizeof(*layout));
 920	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 921	layout->fl_stripe_count = cpu_to_le32(1);
 922	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 
 923	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
 924	ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
 925				req, ops);
 926
 927	ceph_osdc_build_request(req, ofs, &len,
 928				ops,
 929				snapc,
 930				&mtime,
 931				req->r_oid, req->r_oid_len);
 932	up_read(&dev->header_rwsem);
 933
 934	if (linger_req) {
 935		ceph_osdc_set_request_linger(osdc, req);
 936		*linger_req = req;
 937	}
 938
 939	ret = ceph_osdc_start_request(osdc, req, false);
 940	if (ret < 0)
 941		goto done_err;
 942
 943	if (!rbd_cb) {
 944		ret = ceph_osdc_wait_request(osdc, req);
 945		if (ver)
 946			*ver = le64_to_cpu(req->r_reassert_version.version);
 947		dout("reassert_ver=%lld\n",
 948		     le64_to_cpu(req->r_reassert_version.version));
 949		ceph_osdc_put_request(req);
 950	}
 951	return ret;
 952
 953done_err:
 954	bio_chain_put(req_data->bio);
 955	ceph_osdc_put_request(req);
 956done_pages:
 957	rbd_coll_end_req(req_data, ret, len);
 958	kfree(req_data);
 959	return ret;
 960}
 961
 962/*
 963 * Ceph osd op callback
 964 */
 965static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
 966{
 967	struct rbd_request *req_data = req->r_priv;
 968	struct ceph_osd_reply_head *replyhead;
 969	struct ceph_osd_op *op;
 970	__s32 rc;
 971	u64 bytes;
 972	int read_op;
 973
 974	/* parse reply */
 975	replyhead = msg->front.iov_base;
 976	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
 977	op = (void *)(replyhead + 1);
 978	rc = le32_to_cpu(replyhead->result);
 979	bytes = le64_to_cpu(op->extent.length);
 980	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
 981
 982	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
 983
 984	if (rc == -ENOENT && read_op) {
 985		zero_bio_chain(req_data->bio, 0);
 986		rc = 0;
 987	} else if (rc == 0 && read_op && bytes < req_data->len) {
 988		zero_bio_chain(req_data->bio, bytes);
 989		bytes = req_data->len;
 990	}
 991
 992	rbd_coll_end_req(req_data, rc, bytes);
 993
 994	if (req_data->bio)
 995		bio_chain_put(req_data->bio);
 996
 997	ceph_osdc_put_request(req);
 998	kfree(req_data);
 999}
1000
1001static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1002{
1003	ceph_osdc_put_request(req);
1004}
1005
1006/*
1007 * Do a synchronous ceph osd operation
1008 */
1009static int rbd_req_sync_op(struct rbd_device *dev,
1010			   struct ceph_snap_context *snapc,
1011			   u64 snapid,
1012			   int opcode,
1013			   int flags,
1014			   struct ceph_osd_req_op *orig_ops,
1015			   int num_reply,
1016			   const char *obj,
1017			   u64 ofs, u64 len,
1018			   char *buf,
1019			   struct ceph_osd_request **linger_req,
1020			   u64 *ver)
1021{
1022	int ret;
1023	struct page **pages;
1024	int num_pages;
1025	struct ceph_osd_req_op *ops = orig_ops;
1026	u32 payload_len;
1027
1028	num_pages = calc_pages_for(ofs , len);
1029	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1030	if (IS_ERR(pages))
1031		return PTR_ERR(pages);
1032
1033	if (!orig_ops) {
1034		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1035		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1036		if (ret < 0)
1037			goto done;
1038
1039		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1040			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1041			if (ret < 0)
1042				goto done_ops;
1043		}
1044	}
1045
1046	ret = rbd_do_request(NULL, dev, snapc, snapid,
1047			  obj, ofs, len, NULL,
1048			  pages, num_pages,
1049			  flags,
1050			  ops,
1051			  2,
1052			  NULL, 0,
1053			  NULL,
1054			  linger_req, ver);
1055	if (ret < 0)
1056		goto done_ops;
1057
1058	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1059		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1060
1061done_ops:
1062	if (!orig_ops)
1063		rbd_destroy_ops(ops);
1064done:
1065	ceph_release_page_vector(pages, num_pages);
1066	return ret;
1067}
1068
1069/*
1070 * Do an asynchronous ceph osd operation
1071 */
1072static int rbd_do_op(struct request *rq,
1073		     struct rbd_device *rbd_dev ,
1074		     struct ceph_snap_context *snapc,
1075		     u64 snapid,
1076		     int opcode, int flags, int num_reply,
1077		     u64 ofs, u64 len,
1078		     struct bio *bio,
1079		     struct rbd_req_coll *coll,
1080		     int coll_index)
1081{
1082	char *seg_name;
1083	u64 seg_ofs;
1084	u64 seg_len;
1085	int ret;
1086	struct ceph_osd_req_op *ops;
1087	u32 payload_len;
1088
1089	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090	if (!seg_name)
1091		return -ENOMEM;
1092
1093	seg_len = rbd_get_segment(&rbd_dev->header,
1094				  rbd_dev->header.block_name,
1095				  ofs, len,
1096				  seg_name, &seg_ofs);
1097
1098	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099
1100	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1101	if (ret < 0)
1102		goto done;
1103
1104	/* we've taken care of segment sizes earlier when we
1105	   cloned the bios. We should never have a segment
1106	   truncated at this point */
1107	BUG_ON(seg_len < len);
1108
1109	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110			     seg_name, seg_ofs, seg_len,
1111			     bio,
1112			     NULL, 0,
1113			     flags,
1114			     ops,
1115			     num_reply,
1116			     coll, coll_index,
1117			     rbd_req_cb, 0, NULL);
1118
1119	rbd_destroy_ops(ops);
1120done:
1121	kfree(seg_name);
1122	return ret;
1123}
1124
1125/*
1126 * Request async osd write
1127 */
1128static int rbd_req_write(struct request *rq,
1129			 struct rbd_device *rbd_dev,
1130			 struct ceph_snap_context *snapc,
1131			 u64 ofs, u64 len,
1132			 struct bio *bio,
1133			 struct rbd_req_coll *coll,
1134			 int coll_index)
1135{
1136	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137			 CEPH_OSD_OP_WRITE,
1138			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139			 2,
1140			 ofs, len, bio, coll, coll_index);
1141}
1142
1143/*
1144 * Request async osd read
1145 */
1146static int rbd_req_read(struct request *rq,
1147			 struct rbd_device *rbd_dev,
1148			 u64 snapid,
1149			 u64 ofs, u64 len,
1150			 struct bio *bio,
1151			 struct rbd_req_coll *coll,
1152			 int coll_index)
1153{
1154	return rbd_do_op(rq, rbd_dev, NULL,
1155			 snapid,
1156			 CEPH_OSD_OP_READ,
1157			 CEPH_OSD_FLAG_READ,
1158			 2,
1159			 ofs, len, bio, coll, coll_index);
1160}
1161
1162/*
1163 * Request sync osd read
1164 */
1165static int rbd_req_sync_read(struct rbd_device *dev,
1166			  struct ceph_snap_context *snapc,
1167			  u64 snapid,
1168			  const char *obj,
1169			  u64 ofs, u64 len,
1170			  char *buf,
1171			  u64 *ver)
1172{
1173	return rbd_req_sync_op(dev, NULL,
1174			       snapid,
1175			       CEPH_OSD_OP_READ,
1176			       CEPH_OSD_FLAG_READ,
1177			       NULL,
1178			       1, obj, ofs, len, buf, NULL, ver);
1179}
1180
1181/*
1182 * Request sync osd watch
1183 */
1184static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1185				   u64 ver,
1186				   u64 notify_id,
1187				   const char *obj)
1188{
1189	struct ceph_osd_req_op *ops;
1190	struct page **pages = NULL;
1191	int ret;
1192
1193	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1194	if (ret < 0)
1195		return ret;
1196
1197	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1198	ops[0].watch.cookie = notify_id;
1199	ops[0].watch.flag = 0;
1200
1201	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1202			  obj, 0, 0, NULL,
1203			  pages, 0,
1204			  CEPH_OSD_FLAG_READ,
1205			  ops,
1206			  1,
1207			  NULL, 0,
1208			  rbd_simple_req_cb, 0, NULL);
1209
1210	rbd_destroy_ops(ops);
1211	return ret;
1212}
1213
1214static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1215{
1216	struct rbd_device *dev = (struct rbd_device *)data;
1217	int rc;
1218
1219	if (!dev)
1220		return;
1221
1222	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1223		notify_id, (int)opcode);
1224	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1225	rc = __rbd_refresh_header(dev);
1226	mutex_unlock(&ctl_mutex);
1227	if (rc)
1228		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1229			   " update snaps: %d\n", dev->major, rc);
1230
1231	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1232}
1233
1234/*
1235 * Request sync osd watch
1236 */
1237static int rbd_req_sync_watch(struct rbd_device *dev,
1238			      const char *obj,
1239			      u64 ver)
1240{
1241	struct ceph_osd_req_op *ops;
1242	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1243
1244	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1245	if (ret < 0)
1246		return ret;
1247
1248	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249				     (void *)dev, &dev->watch_event);
1250	if (ret < 0)
1251		goto fail;
1252
1253	ops[0].watch.ver = cpu_to_le64(ver);
1254	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1255	ops[0].watch.flag = 1;
1256
1257	ret = rbd_req_sync_op(dev, NULL,
1258			      CEPH_NOSNAP,
1259			      0,
1260			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261			      ops,
1262			      1, obj, 0, 0, NULL,
1263			      &dev->watch_request, NULL);
1264
1265	if (ret < 0)
1266		goto fail_event;
1267
1268	rbd_destroy_ops(ops);
1269	return 0;
1270
1271fail_event:
1272	ceph_osdc_cancel_event(dev->watch_event);
1273	dev->watch_event = NULL;
1274fail:
1275	rbd_destroy_ops(ops);
1276	return ret;
1277}
1278
1279/*
1280 * Request sync osd unwatch
1281 */
1282static int rbd_req_sync_unwatch(struct rbd_device *dev,
1283				const char *obj)
1284{
1285	struct ceph_osd_req_op *ops;
1286
1287	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1288	if (ret < 0)
1289		return ret;
1290
1291	ops[0].watch.ver = 0;
1292	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1293	ops[0].watch.flag = 0;
1294
1295	ret = rbd_req_sync_op(dev, NULL,
1296			      CEPH_NOSNAP,
1297			      0,
1298			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299			      ops,
1300			      1, obj, 0, 0, NULL, NULL, NULL);
1301
1302	rbd_destroy_ops(ops);
1303	ceph_osdc_cancel_event(dev->watch_event);
1304	dev->watch_event = NULL;
1305	return ret;
1306}
1307
1308struct rbd_notify_info {
1309	struct rbd_device *dev;
1310};
1311
1312static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1313{
1314	struct rbd_device *dev = (struct rbd_device *)data;
1315	if (!dev)
1316		return;
1317
1318	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1319		notify_id, (int)opcode);
1320}
1321
1322/*
1323 * Request sync osd notify
1324 */
1325static int rbd_req_sync_notify(struct rbd_device *dev,
1326		          const char *obj)
1327{
1328	struct ceph_osd_req_op *ops;
1329	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1330	struct ceph_osd_event *event;
1331	struct rbd_notify_info info;
1332	int payload_len = sizeof(u32) + sizeof(u32);
1333	int ret;
1334
1335	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1336	if (ret < 0)
1337		return ret;
1338
1339	info.dev = dev;
1340
1341	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1342				     (void *)&info, &event);
1343	if (ret < 0)
1344		goto fail;
1345
1346	ops[0].watch.ver = 1;
1347	ops[0].watch.flag = 1;
1348	ops[0].watch.cookie = event->cookie;
1349	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1350	ops[0].watch.timeout = 12;
1351
1352	ret = rbd_req_sync_op(dev, NULL,
1353			       CEPH_NOSNAP,
1354			       0,
1355			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356			       ops,
1357			       1, obj, 0, 0, NULL, NULL, NULL);
1358	if (ret < 0)
1359		goto fail_event;
1360
1361	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1362	dout("ceph_osdc_wait_event returned %d\n", ret);
1363	rbd_destroy_ops(ops);
1364	return 0;
1365
1366fail_event:
1367	ceph_osdc_cancel_event(event);
1368fail:
1369	rbd_destroy_ops(ops);
1370	return ret;
1371}
1372
1373/*
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1374 * Request sync osd read
1375 */
1376static int rbd_req_sync_exec(struct rbd_device *dev,
1377			     const char *obj,
1378			     const char *cls,
1379			     const char *method,
1380			     const char *data,
1381			     int len,
1382			     u64 *ver)
1383{
1384	struct ceph_osd_req_op *ops;
1385	int cls_len = strlen(cls);
1386	int method_len = strlen(method);
1387	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1388				    cls_len + method_len + len);
1389	if (ret < 0)
1390		return ret;
1391
1392	ops[0].cls.class_name = cls;
1393	ops[0].cls.class_len = (__u8)cls_len;
1394	ops[0].cls.method_name = method;
1395	ops[0].cls.method_len = (__u8)method_len;
1396	ops[0].cls.argc = 0;
1397	ops[0].cls.indata = data;
1398	ops[0].cls.indata_len = len;
1399
1400	ret = rbd_req_sync_op(dev, NULL,
1401			       CEPH_NOSNAP,
1402			       0,
1403			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1404			       ops,
1405			       1, obj, 0, 0, NULL, NULL, ver);
1406
1407	rbd_destroy_ops(ops);
1408
1409	dout("cls_exec returned %d\n", ret);
1410	return ret;
1411}
1412
1413static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1414{
1415	struct rbd_req_coll *coll =
1416			kzalloc(sizeof(struct rbd_req_coll) +
1417			        sizeof(struct rbd_req_status) * num_reqs,
1418				GFP_ATOMIC);
1419
1420	if (!coll)
1421		return NULL;
1422	coll->total = num_reqs;
1423	kref_init(&coll->kref);
1424	return coll;
1425}
1426
1427/*
1428 * block device queue callback
1429 */
1430static void rbd_rq_fn(struct request_queue *q)
1431{
1432	struct rbd_device *rbd_dev = q->queuedata;
1433	struct request *rq;
1434	struct bio_pair *bp = NULL;
1435
1436	while ((rq = blk_fetch_request(q))) {
 
 
1437		struct bio *bio;
1438		struct bio *rq_bio, *next_bio = NULL;
1439		bool do_write;
1440		int size, op_size = 0;
1441		u64 ofs;
1442		int num_segs, cur_seg = 0;
1443		struct rbd_req_coll *coll;
1444
1445		/* peek at request from block layer */
1446		if (!rq)
1447			break;
1448
1449		dout("fetched request\n");
1450
1451		/* filter out block requests we don't understand */
1452		if ((rq->cmd_type != REQ_TYPE_FS)) {
1453			__blk_end_request_all(rq, 0);
1454			continue;
1455		}
1456
1457		/* deduce our operation (read, write) */
1458		do_write = (rq_data_dir(rq) == WRITE);
1459
1460		size = blk_rq_bytes(rq);
1461		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1462		rq_bio = rq->bio;
1463		if (do_write && rbd_dev->read_only) {
1464			__blk_end_request_all(rq, -EROFS);
1465			continue;
1466		}
1467
1468		spin_unlock_irq(q->queue_lock);
1469
1470		dout("%s 0x%x bytes at 0x%llx\n",
1471		     do_write ? "write" : "read",
1472		     size, blk_rq_pos(rq) * SECTOR_SIZE);
1473
1474		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1475		coll = rbd_alloc_coll(num_segs);
1476		if (!coll) {
1477			spin_lock_irq(q->queue_lock);
1478			__blk_end_request_all(rq, -ENOMEM);
1479			continue;
1480		}
1481
1482		do {
1483			/* a bio clone to be passed down to OSD req */
1484			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1485			op_size = rbd_get_segment(&rbd_dev->header,
1486						  rbd_dev->header.block_name,
1487						  ofs, size,
1488						  NULL, NULL);
1489			kref_get(&coll->kref);
1490			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1491					      op_size, GFP_ATOMIC);
1492			if (!bio) {
1493				rbd_coll_end_req_index(rq, coll, cur_seg,
1494						       -ENOMEM, op_size);
1495				goto next_seg;
1496			}
1497
1498
1499			/* init OSD command: write or read */
1500			if (do_write)
1501				rbd_req_write(rq, rbd_dev,
1502					      rbd_dev->header.snapc,
1503					      ofs,
1504					      op_size, bio,
1505					      coll, cur_seg);
1506			else
1507				rbd_req_read(rq, rbd_dev,
1508					     rbd_dev->snap_id,
1509					     ofs,
1510					     op_size, bio,
1511					     coll, cur_seg);
1512
1513next_seg:
1514			size -= op_size;
1515			ofs += op_size;
1516
1517			cur_seg++;
1518			rq_bio = next_bio;
1519		} while (size > 0);
1520		kref_put(&coll->kref, rbd_coll_release);
1521
1522		if (bp)
1523			bio_pair_release(bp);
1524		spin_lock_irq(q->queue_lock);
 
 
1525	}
1526}
1527
1528/*
1529 * a queue callback. Makes sure that we don't create a bio that spans across
1530 * multiple osd objects. One exception would be with a single page bios,
1531 * which we handle later at bio_chain_clone
1532 */
1533static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1534			  struct bio_vec *bvec)
1535{
1536	struct rbd_device *rbd_dev = q->queuedata;
1537	unsigned int chunk_sectors;
1538	sector_t sector;
1539	unsigned int bio_sectors;
1540	int max;
1541
1542	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1543	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1544	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1545
1546	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1547				 + bio_sectors)) << SECTOR_SHIFT;
1548	if (max < 0)
1549		max = 0; /* bio_add cannot handle a negative return */
1550	if (max <= bvec->bv_len && bio_sectors == 0)
1551		return bvec->bv_len;
1552	return max;
1553}
1554
1555static void rbd_free_disk(struct rbd_device *rbd_dev)
1556{
1557	struct gendisk *disk = rbd_dev->disk;
1558
1559	if (!disk)
1560		return;
1561
1562	rbd_header_free(&rbd_dev->header);
1563
1564	if (disk->flags & GENHD_FL_UP)
1565		del_gendisk(disk);
1566	if (disk->queue)
1567		blk_cleanup_queue(disk->queue);
1568	put_disk(disk);
1569}
1570
1571/*
1572 * reload the ondisk the header 
1573 */
1574static int rbd_read_header(struct rbd_device *rbd_dev,
1575			   struct rbd_image_header *header)
1576{
1577	ssize_t rc;
1578	struct rbd_image_header_ondisk *dh;
1579	u32 snap_count = 0;
 
1580	u64 ver;
1581	size_t len;
1582
1583	/*
1584	 * First reads the fixed-size header to determine the number
1585	 * of snapshots, then re-reads it, along with all snapshot
1586	 * records as well as their stored names.
1587	 */
1588	len = sizeof (*dh);
1589	while (1) {
 
 
 
 
 
1590		dh = kmalloc(len, GFP_KERNEL);
1591		if (!dh)
1592			return -ENOMEM;
1593
1594		rc = rbd_req_sync_read(rbd_dev,
1595				       NULL, CEPH_NOSNAP,
1596				       rbd_dev->obj_md_name,
1597				       0, len,
1598				       (char *)dh, &ver);
1599		if (rc < 0)
1600			goto out_dh;
1601
1602		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1603		if (rc < 0) {
1604			if (rc == -ENXIO)
1605				pr_warning("unrecognized header format"
1606					   " for image %s", rbd_dev->obj);
1607			goto out_dh;
 
 
 
 
 
 
 
1608		}
1609
1610		if (snap_count == header->total_snaps)
1611			break;
1612
1613		snap_count = header->total_snaps;
1614		len = sizeof (*dh) +
1615			snap_count * sizeof(struct rbd_image_snap_ondisk) +
1616			header->snap_names_len;
1617
1618		rbd_header_free(header);
1619		kfree(dh);
1620	}
1621	header->obj_version = ver;
1622
1623out_dh:
1624	kfree(dh);
1625	return rc;
1626}
1627
1628/*
1629 * create a snapshot
1630 */
1631static int rbd_header_add_snap(struct rbd_device *dev,
1632			       const char *snap_name,
1633			       gfp_t gfp_flags)
1634{
1635	int name_len = strlen(snap_name);
1636	u64 new_snapid;
1637	int ret;
1638	void *data, *p, *e;
1639	u64 ver;
1640	struct ceph_mon_client *monc;
1641
1642	/* we should create a snapshot only if we're pointing at the head */
1643	if (dev->snap_id != CEPH_NOSNAP)
1644		return -EINVAL;
1645
1646	monc = &dev->rbd_client->client->monc;
1647	ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1648	dout("created snapid=%lld\n", new_snapid);
1649	if (ret < 0)
1650		return ret;
1651
1652	data = kmalloc(name_len + 16, gfp_flags);
1653	if (!data)
1654		return -ENOMEM;
1655
1656	p = data;
1657	e = data + name_len + 16;
1658
1659	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1660	ceph_encode_64_safe(&p, e, new_snapid, bad);
1661
1662	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1663				data, p - data, &ver);
1664
1665	kfree(data);
1666
1667	if (ret < 0)
1668		return ret;
1669
1670	down_write(&dev->header_rwsem);
1671	dev->header.snapc->seq = new_snapid;
1672	up_write(&dev->header_rwsem);
1673
1674	return 0;
1675bad:
1676	return -ERANGE;
1677}
1678
1679static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1680{
1681	struct rbd_snap *snap;
1682
1683	while (!list_empty(&rbd_dev->snaps)) {
1684		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1685		__rbd_remove_snap_dev(rbd_dev, snap);
1686	}
1687}
1688
1689/*
1690 * only read the first part of the ondisk header, without the snaps info
1691 */
1692static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1693{
1694	int ret;
1695	struct rbd_image_header h;
1696	u64 snap_seq;
1697	int follow_seq = 0;
1698
1699	ret = rbd_read_header(rbd_dev, &h);
1700	if (ret < 0)
1701		return ret;
1702
1703	/* resized? */
1704	set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1705
1706	down_write(&rbd_dev->header_rwsem);
1707
1708	snap_seq = rbd_dev->header.snapc->seq;
1709	if (rbd_dev->header.total_snaps &&
1710	    rbd_dev->header.snapc->snaps[0] == snap_seq)
1711		/* pointing at the head, will need to follow that
1712		   if head moves */
1713		follow_seq = 1;
1714
1715	kfree(rbd_dev->header.snapc);
1716	kfree(rbd_dev->header.snap_names);
1717	kfree(rbd_dev->header.snap_sizes);
1718
1719	rbd_dev->header.total_snaps = h.total_snaps;
1720	rbd_dev->header.snapc = h.snapc;
1721	rbd_dev->header.snap_names = h.snap_names;
1722	rbd_dev->header.snap_names_len = h.snap_names_len;
1723	rbd_dev->header.snap_sizes = h.snap_sizes;
1724	if (follow_seq)
1725		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1726	else
1727		rbd_dev->header.snapc->seq = snap_seq;
1728
1729	ret = __rbd_init_snaps_header(rbd_dev);
1730
1731	up_write(&rbd_dev->header_rwsem);
1732
1733	return ret;
1734}
1735
1736static int rbd_init_disk(struct rbd_device *rbd_dev)
1737{
1738	struct gendisk *disk;
1739	struct request_queue *q;
1740	int rc;
1741	u64 segment_size;
1742	u64 total_size = 0;
1743
1744	/* contact OSD, request size info about the object being mapped */
1745	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1746	if (rc)
1747		return rc;
1748
1749	/* no need to lock here, as rbd_dev is not registered yet */
1750	rc = __rbd_init_snaps_header(rbd_dev);
1751	if (rc)
1752		return rc;
1753
1754	rc = rbd_header_set_snap(rbd_dev, &total_size);
1755	if (rc)
1756		return rc;
1757
1758	/* create gendisk info */
1759	rc = -ENOMEM;
1760	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1761	if (!disk)
1762		goto out;
1763
1764	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1765		 rbd_dev->id);
1766	disk->major = rbd_dev->major;
1767	disk->first_minor = 0;
1768	disk->fops = &rbd_bd_ops;
1769	disk->private_data = rbd_dev;
1770
1771	/* init rq */
1772	rc = -ENOMEM;
1773	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1774	if (!q)
1775		goto out_disk;
1776
1777	/* We use the default size, but let's be explicit about it. */
1778	blk_queue_physical_block_size(q, SECTOR_SIZE);
1779
1780	/* set io sizes to object size */
1781	segment_size = rbd_obj_bytes(&rbd_dev->header);
1782	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1783	blk_queue_max_segment_size(q, segment_size);
1784	blk_queue_io_min(q, segment_size);
1785	blk_queue_io_opt(q, segment_size);
1786
1787	blk_queue_merge_bvec(q, rbd_merge_bvec);
1788	disk->queue = q;
1789
1790	q->queuedata = rbd_dev;
1791
1792	rbd_dev->disk = disk;
1793	rbd_dev->q = q;
1794
1795	/* finally, announce the disk to the world */
1796	set_capacity(disk, total_size / SECTOR_SIZE);
1797	add_disk(disk);
1798
1799	pr_info("%s: added with size 0x%llx\n",
1800		disk->disk_name, (unsigned long long)total_size);
1801	return 0;
1802
1803out_disk:
1804	put_disk(disk);
1805out:
1806	return rc;
1807}
1808
1809/*
1810  sysfs
1811*/
1812
1813static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1814{
1815	return container_of(dev, struct rbd_device, dev);
1816}
1817
1818static ssize_t rbd_size_show(struct device *dev,
1819			     struct device_attribute *attr, char *buf)
1820{
1821	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1822
1823	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1824}
1825
1826static ssize_t rbd_major_show(struct device *dev,
1827			      struct device_attribute *attr, char *buf)
1828{
1829	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1830
1831	return sprintf(buf, "%d\n", rbd_dev->major);
1832}
1833
1834static ssize_t rbd_client_id_show(struct device *dev,
1835				  struct device_attribute *attr, char *buf)
1836{
1837	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1838
1839	return sprintf(buf, "client%lld\n",
1840			ceph_client_id(rbd_dev->rbd_client->client));
1841}
1842
1843static ssize_t rbd_pool_show(struct device *dev,
1844			     struct device_attribute *attr, char *buf)
1845{
1846	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1847
1848	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1849}
1850
1851static ssize_t rbd_name_show(struct device *dev,
1852			     struct device_attribute *attr, char *buf)
1853{
1854	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1855
1856	return sprintf(buf, "%s\n", rbd_dev->obj);
1857}
1858
1859static ssize_t rbd_snap_show(struct device *dev,
1860			     struct device_attribute *attr,
1861			     char *buf)
1862{
1863	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1864
1865	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1866}
1867
1868static ssize_t rbd_image_refresh(struct device *dev,
1869				 struct device_attribute *attr,
1870				 const char *buf,
1871				 size_t size)
1872{
1873	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874	int rc;
1875	int ret = size;
1876
1877	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1878
1879	rc = __rbd_refresh_header(rbd_dev);
1880	if (rc < 0)
1881		ret = rc;
1882
1883	mutex_unlock(&ctl_mutex);
1884	return ret;
1885}
1886
1887static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1888static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1889static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1890static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1891static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1892static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1893static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1894static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
 
1895
1896static struct attribute *rbd_attrs[] = {
1897	&dev_attr_size.attr,
1898	&dev_attr_major.attr,
1899	&dev_attr_client_id.attr,
1900	&dev_attr_pool.attr,
1901	&dev_attr_name.attr,
1902	&dev_attr_current_snap.attr,
1903	&dev_attr_refresh.attr,
1904	&dev_attr_create_snap.attr,
 
1905	NULL
1906};
1907
1908static struct attribute_group rbd_attr_group = {
1909	.attrs = rbd_attrs,
1910};
1911
1912static const struct attribute_group *rbd_attr_groups[] = {
1913	&rbd_attr_group,
1914	NULL
1915};
1916
1917static void rbd_sysfs_dev_release(struct device *dev)
1918{
1919}
1920
1921static struct device_type rbd_device_type = {
1922	.name		= "rbd",
1923	.groups		= rbd_attr_groups,
1924	.release	= rbd_sysfs_dev_release,
1925};
1926
1927
1928/*
1929  sysfs - snapshots
1930*/
1931
1932static ssize_t rbd_snap_size_show(struct device *dev,
1933				  struct device_attribute *attr,
1934				  char *buf)
1935{
1936	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1937
1938	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1939}
1940
1941static ssize_t rbd_snap_id_show(struct device *dev,
1942				struct device_attribute *attr,
1943				char *buf)
1944{
1945	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1946
1947	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1948}
1949
1950static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1951static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1952
1953static struct attribute *rbd_snap_attrs[] = {
1954	&dev_attr_snap_size.attr,
1955	&dev_attr_snap_id.attr,
1956	NULL,
1957};
1958
1959static struct attribute_group rbd_snap_attr_group = {
1960	.attrs = rbd_snap_attrs,
1961};
1962
1963static void rbd_snap_dev_release(struct device *dev)
1964{
1965	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966	kfree(snap->name);
1967	kfree(snap);
1968}
1969
1970static const struct attribute_group *rbd_snap_attr_groups[] = {
1971	&rbd_snap_attr_group,
1972	NULL
1973};
1974
1975static struct device_type rbd_snap_device_type = {
1976	.groups		= rbd_snap_attr_groups,
1977	.release	= rbd_snap_dev_release,
1978};
1979
1980static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1981				  struct rbd_snap *snap)
1982{
1983	list_del(&snap->node);
1984	device_unregister(&snap->dev);
1985}
1986
1987static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1988				  struct rbd_snap *snap,
1989				  struct device *parent)
1990{
1991	struct device *dev = &snap->dev;
1992	int ret;
1993
1994	dev->type = &rbd_snap_device_type;
1995	dev->parent = parent;
1996	dev->release = rbd_snap_dev_release;
1997	dev_set_name(dev, "snap_%s", snap->name);
1998	ret = device_register(dev);
1999
2000	return ret;
2001}
2002
2003static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2004			      int i, const char *name,
2005			      struct rbd_snap **snapp)
2006{
2007	int ret;
2008	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2009	if (!snap)
2010		return -ENOMEM;
2011	snap->name = kstrdup(name, GFP_KERNEL);
2012	snap->size = rbd_dev->header.snap_sizes[i];
2013	snap->id = rbd_dev->header.snapc->snaps[i];
2014	if (device_is_registered(&rbd_dev->dev)) {
2015		ret = rbd_register_snap_dev(rbd_dev, snap,
2016					     &rbd_dev->dev);
2017		if (ret < 0)
2018			goto err;
2019	}
2020	*snapp = snap;
2021	return 0;
2022err:
2023	kfree(snap->name);
2024	kfree(snap);
2025	return ret;
2026}
2027
2028/*
2029 * search for the previous snap in a null delimited string list
2030 */
2031const char *rbd_prev_snap_name(const char *name, const char *start)
2032{
2033	if (name < start + 2)
2034		return NULL;
2035
2036	name -= 2;
2037	while (*name) {
2038		if (name == start)
2039			return start;
2040		name--;
2041	}
2042	return name + 1;
2043}
2044
2045/*
2046 * compare the old list of snapshots that we have to what's in the header
2047 * and update it accordingly. Note that the header holds the snapshots
2048 * in a reverse order (from newest to oldest) and we need to go from
2049 * older to new so that we don't get a duplicate snap name when
2050 * doing the process (e.g., removed snapshot and recreated a new
2051 * one with the same name.
2052 */
2053static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2054{
2055	const char *name, *first_name;
2056	int i = rbd_dev->header.total_snaps;
2057	struct rbd_snap *snap, *old_snap = NULL;
2058	int ret;
2059	struct list_head *p, *n;
2060
2061	first_name = rbd_dev->header.snap_names;
2062	name = first_name + rbd_dev->header.snap_names_len;
2063
2064	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2065		u64 cur_id;
2066
2067		old_snap = list_entry(p, struct rbd_snap, node);
2068
2069		if (i)
2070			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2071
2072		if (!i || old_snap->id < cur_id) {
2073			/* old_snap->id was skipped, thus was removed */
2074			__rbd_remove_snap_dev(rbd_dev, old_snap);
2075			continue;
2076		}
2077		if (old_snap->id == cur_id) {
2078			/* we have this snapshot already */
2079			i--;
2080			name = rbd_prev_snap_name(name, first_name);
2081			continue;
2082		}
2083		for (; i > 0;
2084		     i--, name = rbd_prev_snap_name(name, first_name)) {
2085			if (!name) {
2086				WARN_ON(1);
2087				return -EINVAL;
2088			}
2089			cur_id = rbd_dev->header.snapc->snaps[i];
2090			/* snapshot removal? handle it above */
2091			if (cur_id >= old_snap->id)
2092				break;
2093			/* a new snapshot */
2094			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2095			if (ret < 0)
2096				return ret;
2097
2098			/* note that we add it backward so using n and not p */
2099			list_add(&snap->node, n);
2100			p = &snap->node;
2101		}
2102	}
2103	/* we're done going over the old snap list, just add what's left */
2104	for (; i > 0; i--) {
2105		name = rbd_prev_snap_name(name, first_name);
2106		if (!name) {
2107			WARN_ON(1);
2108			return -EINVAL;
2109		}
2110		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2111		if (ret < 0)
2112			return ret;
2113		list_add(&snap->node, &rbd_dev->snaps);
2114	}
2115
2116	return 0;
2117}
2118
 
 
 
 
 
 
 
 
 
 
2119static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2120{
2121	int ret;
2122	struct device *dev;
2123	struct rbd_snap *snap;
2124
2125	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2126	dev = &rbd_dev->dev;
2127
2128	dev->bus = &rbd_bus_type;
2129	dev->type = &rbd_device_type;
2130	dev->parent = &rbd_root_dev;
2131	dev->release = rbd_dev_release;
2132	dev_set_name(dev, "%d", rbd_dev->id);
2133	ret = device_register(dev);
2134	if (ret < 0)
2135		goto out;
2136
2137	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2138		ret = rbd_register_snap_dev(rbd_dev, snap,
2139					     &rbd_dev->dev);
2140		if (ret < 0)
2141			break;
2142	}
2143out:
 
 
 
2144	mutex_unlock(&ctl_mutex);
2145	return ret;
2146}
2147
2148static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2149{
2150	device_unregister(&rbd_dev->dev);
2151}
2152
2153static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2154{
2155	int ret, rc;
2156
2157	do {
2158		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2159					 rbd_dev->header.obj_version);
2160		if (ret == -ERANGE) {
2161			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2162			rc = __rbd_refresh_header(rbd_dev);
2163			mutex_unlock(&ctl_mutex);
2164			if (rc < 0)
2165				return rc;
2166		}
2167	} while (ret == -ERANGE);
2168
2169	return ret;
2170}
2171
2172static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2173
2174/*
2175 * Get a unique rbd identifier for the given new rbd_dev, and add
2176 * the rbd_dev to the global list.  The minimum rbd id is 1.
2177 */
2178static void rbd_id_get(struct rbd_device *rbd_dev)
2179{
2180	rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2181
2182	spin_lock(&rbd_dev_list_lock);
2183	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2184	spin_unlock(&rbd_dev_list_lock);
2185}
2186
2187/*
2188 * Remove an rbd_dev from the global list, and record that its
2189 * identifier is no longer in use.
2190 */
2191static void rbd_id_put(struct rbd_device *rbd_dev)
2192{
2193	struct list_head *tmp;
2194	int rbd_id = rbd_dev->id;
2195	int max_id;
2196
2197	BUG_ON(rbd_id < 1);
2198
2199	spin_lock(&rbd_dev_list_lock);
2200	list_del_init(&rbd_dev->node);
2201
2202	/*
2203	 * If the id being "put" is not the current maximum, there
2204	 * is nothing special we need to do.
2205	 */
2206	if (rbd_id != atomic64_read(&rbd_id_max)) {
2207		spin_unlock(&rbd_dev_list_lock);
2208		return;
2209	}
2210
2211	/*
2212	 * We need to update the current maximum id.  Search the
2213	 * list to find out what it is.  We're more likely to find
2214	 * the maximum at the end, so search the list backward.
2215	 */
2216	max_id = 0;
2217	list_for_each_prev(tmp, &rbd_dev_list) {
2218		struct rbd_device *rbd_dev;
2219
2220		rbd_dev = list_entry(tmp, struct rbd_device, node);
2221		if (rbd_id > max_id)
2222			max_id = rbd_id;
2223	}
2224	spin_unlock(&rbd_dev_list_lock);
2225
2226	/*
2227	 * The max id could have been updated by rbd_id_get(), in
2228	 * which case it now accurately reflects the new maximum.
2229	 * Be careful not to overwrite the maximum value in that
2230	 * case.
2231	 */
2232	atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2233}
2234
2235/*
2236 * Skips over white space at *buf, and updates *buf to point to the
2237 * first found non-space character (if any). Returns the length of
2238 * the token (string of non-white space characters) found.  Note
2239 * that *buf must be terminated with '\0'.
2240 */
2241static inline size_t next_token(const char **buf)
2242{
2243        /*
2244        * These are the characters that produce nonzero for
2245        * isspace() in the "C" and "POSIX" locales.
2246        */
2247        const char *spaces = " \f\n\r\t\v";
2248
2249        *buf += strspn(*buf, spaces);	/* Find start of token */
2250
2251	return strcspn(*buf, spaces);   /* Return token length */
2252}
2253
2254/*
2255 * Finds the next token in *buf, and if the provided token buffer is
2256 * big enough, copies the found token into it.  The result, if
2257 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2258 * must be terminated with '\0' on entry.
2259 *
2260 * Returns the length of the token found (not including the '\0').
2261 * Return value will be 0 if no token is found, and it will be >=
2262 * token_size if the token would not fit.
2263 *
2264 * The *buf pointer will be updated to point beyond the end of the
2265 * found token.  Note that this occurs even if the token buffer is
2266 * too small to hold it.
2267 */
2268static inline size_t copy_token(const char **buf,
2269				char *token,
2270				size_t token_size)
2271{
2272        size_t len;
2273
2274	len = next_token(buf);
2275	if (len < token_size) {
2276		memcpy(token, *buf, len);
2277		*(token + len) = '\0';
2278	}
2279	*buf += len;
2280
2281        return len;
2282}
2283
2284/*
2285 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2286 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2287 * on the list of monitor addresses and other options provided via
2288 * /sys/bus/rbd/add.
2289 */
2290static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2291			      const char *buf,
2292			      const char **mon_addrs,
2293			      size_t *mon_addrs_size,
2294			      char *options,
2295			      size_t options_size)
2296{
2297	size_t	len;
2298
2299	/* The first four tokens are required */
2300
2301	len = next_token(&buf);
2302	if (!len)
2303		return -EINVAL;
2304	*mon_addrs_size = len + 1;
2305	*mon_addrs = buf;
2306
2307	buf += len;
2308
2309	len = copy_token(&buf, options, options_size);
2310	if (!len || len >= options_size)
2311		return -EINVAL;
2312
2313	len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2314	if (!len || len >= sizeof (rbd_dev->pool_name))
2315		return -EINVAL;
2316
2317	len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2318	if (!len || len >= sizeof (rbd_dev->obj))
2319		return -EINVAL;
2320
2321	/* We have the object length in hand, save it. */
2322
2323	rbd_dev->obj_len = len;
2324
2325	BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2326				< RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2327	sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2328
2329	/*
2330	 * The snapshot name is optional, but it's an error if it's
2331	 * too long.  If no snapshot is supplied, fill in the default.
2332	 */
2333	len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2334	if (!len)
2335		memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2336			sizeof (RBD_SNAP_HEAD_NAME));
2337	else if (len >= sizeof (rbd_dev->snap_name))
2338		return -EINVAL;
2339
2340	return 0;
2341}
2342
2343static ssize_t rbd_add(struct bus_type *bus,
2344		       const char *buf,
2345		       size_t count)
2346{
 
2347	struct rbd_device *rbd_dev;
2348	const char *mon_addrs = NULL;
2349	size_t mon_addrs_size = 0;
2350	char *options = NULL;
2351	struct ceph_osd_client *osdc;
2352	int rc = -ENOMEM;
2353
2354	if (!try_module_get(THIS_MODULE))
2355		return -ENODEV;
2356
 
 
 
 
 
 
 
 
 
2357	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2358	if (!rbd_dev)
2359		goto err_nomem;
2360	options = kmalloc(count, GFP_KERNEL);
2361	if (!options)
2362		goto err_nomem;
2363
2364	/* static rbd_device initialization */
2365	spin_lock_init(&rbd_dev->lock);
2366	INIT_LIST_HEAD(&rbd_dev->node);
2367	INIT_LIST_HEAD(&rbd_dev->snaps);
2368	init_rwsem(&rbd_dev->header_rwsem);
2369
2370	init_rwsem(&rbd_dev->header_rwsem);
 
2371
2372	/* generate unique id: find highest unique id, add one */
2373	rbd_id_get(rbd_dev);
 
 
 
 
 
 
 
2374
2375	/* Fill in the device name, now that we have its id. */
2376	BUILD_BUG_ON(DEV_NAME_LEN
2377			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2378	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2379
2380	/* parse add command */
2381	rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2382				options, count);
2383	if (rc)
2384		goto err_put_id;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2385
2386	rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2387						options);
2388	if (IS_ERR(rbd_dev->rbd_client)) {
2389		rc = PTR_ERR(rbd_dev->rbd_client);
2390		goto err_put_id;
2391	}
2392
2393	/* pick the pool */
2394	osdc = &rbd_dev->rbd_client->client->osdc;
2395	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2396	if (rc < 0)
2397		goto err_out_client;
2398	rbd_dev->poolid = rc;
2399
2400	/* register our block device */
2401	rc = register_blkdev(0, rbd_dev->name);
2402	if (rc < 0)
 
2403		goto err_out_client;
2404	rbd_dev->major = rc;
 
2405
2406	rc = rbd_bus_add_dev(rbd_dev);
2407	if (rc)
2408		goto err_out_blkdev;
2409
2410	/*
2411	 * At this point cleanup in the event of an error is the job
2412	 * of the sysfs code (initiated by rbd_bus_del_dev()).
2413	 *
2414	 * Set up and announce blkdev mapping.
2415	 */
2416	rc = rbd_init_disk(rbd_dev);
2417	if (rc)
2418		goto err_out_bus;
2419
2420	rc = rbd_init_watch_dev(rbd_dev);
2421	if (rc)
2422		goto err_out_bus;
2423
2424	return count;
2425
2426err_out_bus:
 
 
 
 
2427	/* this will also clean up rest of rbd_dev stuff */
2428
2429	rbd_bus_del_dev(rbd_dev);
2430	kfree(options);
 
2431	return rc;
2432
2433err_out_blkdev:
2434	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2435err_out_client:
2436	rbd_put_client(rbd_dev);
2437err_put_id:
2438	rbd_id_put(rbd_dev);
2439err_nomem:
 
 
 
 
2440	kfree(options);
2441	kfree(rbd_dev);
2442
 
2443	dout("Error adding device %s\n", buf);
2444	module_put(THIS_MODULE);
2445
2446	return (ssize_t) rc;
2447}
2448
2449static struct rbd_device *__rbd_get_dev(unsigned long id)
2450{
2451	struct list_head *tmp;
2452	struct rbd_device *rbd_dev;
2453
2454	spin_lock(&rbd_dev_list_lock);
2455	list_for_each(tmp, &rbd_dev_list) {
2456		rbd_dev = list_entry(tmp, struct rbd_device, node);
2457		if (rbd_dev->id == id) {
2458			spin_unlock(&rbd_dev_list_lock);
2459			return rbd_dev;
2460		}
2461	}
2462	spin_unlock(&rbd_dev_list_lock);
2463	return NULL;
2464}
2465
2466static void rbd_dev_release(struct device *dev)
2467{
2468	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
2469
2470	if (rbd_dev->watch_request) {
2471		struct ceph_client *client = rbd_dev->rbd_client->client;
2472
2473		ceph_osdc_unregister_linger_request(&client->osdc,
2474						    rbd_dev->watch_request);
2475	}
2476	if (rbd_dev->watch_event)
2477		rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2478
2479	rbd_put_client(rbd_dev);
2480
2481	/* clean up and free blkdev */
2482	rbd_free_disk(rbd_dev);
2483	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2484
2485	/* done with the id, and with the rbd_dev */
2486	rbd_id_put(rbd_dev);
2487	kfree(rbd_dev);
2488
2489	/* release module ref */
2490	module_put(THIS_MODULE);
2491}
2492
2493static ssize_t rbd_remove(struct bus_type *bus,
2494			  const char *buf,
2495			  size_t count)
2496{
2497	struct rbd_device *rbd_dev = NULL;
2498	int target_id, rc;
2499	unsigned long ul;
2500	int ret = count;
2501
2502	rc = strict_strtoul(buf, 10, &ul);
2503	if (rc)
2504		return rc;
2505
2506	/* convert to int; abort if we lost anything in the conversion */
2507	target_id = (int) ul;
2508	if (target_id != ul)
2509		return -EINVAL;
2510
2511	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2512
2513	rbd_dev = __rbd_get_dev(target_id);
2514	if (!rbd_dev) {
2515		ret = -ENOENT;
2516		goto done;
2517	}
2518
 
 
2519	__rbd_remove_all_snaps(rbd_dev);
2520	rbd_bus_del_dev(rbd_dev);
2521
2522done:
2523	mutex_unlock(&ctl_mutex);
2524	return ret;
2525}
2526
2527static ssize_t rbd_snap_add(struct device *dev,
2528			    struct device_attribute *attr,
2529			    const char *buf,
2530			    size_t count)
2531{
2532	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2533	int ret;
2534	char *name = kmalloc(count + 1, GFP_KERNEL);
2535	if (!name)
2536		return -ENOMEM;
2537
2538	snprintf(name, count, "%s", buf);
2539
2540	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2541
2542	ret = rbd_header_add_snap(rbd_dev,
2543				  name, GFP_KERNEL);
2544	if (ret < 0)
2545		goto err_unlock;
2546
2547	ret = __rbd_refresh_header(rbd_dev);
2548	if (ret < 0)
2549		goto err_unlock;
2550
2551	/* shouldn't hold ctl_mutex when notifying.. notify might
2552	   trigger a watch callback that would need to get that mutex */
2553	mutex_unlock(&ctl_mutex);
2554
2555	/* make a best effort, don't error if failed */
2556	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2557
2558	ret = count;
2559	kfree(name);
2560	return ret;
2561
2562err_unlock:
2563	mutex_unlock(&ctl_mutex);
2564	kfree(name);
2565	return ret;
2566}
2567
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2568/*
2569 * create control files in sysfs
2570 * /sys/bus/rbd/...
2571 */
2572static int rbd_sysfs_init(void)
2573{
2574	int ret;
2575
2576	ret = device_register(&rbd_root_dev);
2577	if (ret < 0)
 
 
2578		return ret;
2579
2580	ret = bus_register(&rbd_bus_type);
2581	if (ret < 0)
2582		device_unregister(&rbd_root_dev);
2583
2584	return ret;
2585}
2586
2587static void rbd_sysfs_cleanup(void)
2588{
 
2589	bus_unregister(&rbd_bus_type);
2590	device_unregister(&rbd_root_dev);
2591}
2592
2593int __init rbd_init(void)
2594{
2595	int rc;
2596
2597	rc = rbd_sysfs_init();
2598	if (rc)
2599		return rc;
2600	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
 
2601	return 0;
2602}
2603
2604void __exit rbd_exit(void)
2605{
2606	rbd_sysfs_cleanup();
2607}
2608
2609module_init(rbd_init);
2610module_exit(rbd_exit);
2611
2612MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2613MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2614MODULE_DESCRIPTION("rados block device");
2615
2616/* following authorship retained from original osdblk.c */
2617MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2618
2619MODULE_LICENSE("GPL");