Linux Audio

Check our new training course

Loading...
v4.6
   1/*
   2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
   3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
   4 *
   5 * This software is available to you under a choice of one of two
   6 * licenses.  You may choose to be licensed under the terms of the GNU
   7 * General Public License (GPL) Version 2, available from the file
   8 * COPYING in the main directory of this source tree, or the BSD-type
   9 * license below:
  10 *
  11 * Redistribution and use in source and binary forms, with or without
  12 * modification, are permitted provided that the following conditions
  13 * are met:
  14 *
  15 *      Redistributions of source code must retain the above copyright
  16 *      notice, this list of conditions and the following disclaimer.
  17 *
  18 *      Redistributions in binary form must reproduce the above
  19 *      copyright notice, this list of conditions and the following
  20 *      disclaimer in the documentation and/or other materials provided
  21 *      with the distribution.
  22 *
  23 *      Neither the name of the Network Appliance, Inc. nor the names of
  24 *      its contributors may be used to endorse or promote products
  25 *      derived from this software without specific prior written
  26 *      permission.
  27 *
  28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39 *
  40 * Author: Tom Tucker <tom@opengridcomputing.com>
  41 */
  42
  43#include <linux/sunrpc/svc_xprt.h>
  44#include <linux/sunrpc/debug.h>
  45#include <linux/sunrpc/rpc_rdma.h>
  46#include <linux/interrupt.h>
  47#include <linux/sched.h>
  48#include <linux/slab.h>
  49#include <linux/spinlock.h>
  50#include <linux/workqueue.h>
  51#include <rdma/ib_verbs.h>
  52#include <rdma/rdma_cm.h>
  53#include <linux/sunrpc/svc_rdma.h>
  54#include <linux/export.h>
  55#include "xprt_rdma.h"
  56
  57#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
  58
  59static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
  60static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
  61					struct net *net,
  62					struct sockaddr *sa, int salen,
  63					int flags);
  64static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
  65static void svc_rdma_release_rqst(struct svc_rqst *);
 
  66static void svc_rdma_detach(struct svc_xprt *xprt);
  67static void svc_rdma_free(struct svc_xprt *xprt);
  68static int svc_rdma_has_wspace(struct svc_xprt *xprt);
  69static int svc_rdma_secure_port(struct svc_rqst *);
 
 
 
 
 
  70
  71static struct svc_xprt_ops svc_rdma_ops = {
  72	.xpo_create = svc_rdma_create,
  73	.xpo_recvfrom = svc_rdma_recvfrom,
  74	.xpo_sendto = svc_rdma_sendto,
  75	.xpo_release_rqst = svc_rdma_release_rqst,
  76	.xpo_detach = svc_rdma_detach,
  77	.xpo_free = svc_rdma_free,
  78	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
  79	.xpo_has_wspace = svc_rdma_has_wspace,
  80	.xpo_accept = svc_rdma_accept,
  81	.xpo_secure_port = svc_rdma_secure_port,
  82};
  83
  84struct svc_xprt_class svc_rdma_class = {
  85	.xcl_name = "rdma",
  86	.xcl_owner = THIS_MODULE,
  87	.xcl_ops = &svc_rdma_ops,
  88	.xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA,
  89	.xcl_ident = XPRT_TRANSPORT_RDMA,
  90};
  91
  92#if defined(CONFIG_SUNRPC_BACKCHANNEL)
  93static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
  94					   struct sockaddr *, int, int);
  95static void svc_rdma_bc_detach(struct svc_xprt *);
  96static void svc_rdma_bc_free(struct svc_xprt *);
  97
  98static struct svc_xprt_ops svc_rdma_bc_ops = {
  99	.xpo_create = svc_rdma_bc_create,
 100	.xpo_detach = svc_rdma_bc_detach,
 101	.xpo_free = svc_rdma_bc_free,
 102	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
 103	.xpo_secure_port = svc_rdma_secure_port,
 104};
 105
 106struct svc_xprt_class svc_rdma_bc_class = {
 107	.xcl_name = "rdma-bc",
 108	.xcl_owner = THIS_MODULE,
 109	.xcl_ops = &svc_rdma_bc_ops,
 110	.xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
 111};
 112
 113static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
 114					   struct net *net,
 115					   struct sockaddr *sa, int salen,
 116					   int flags)
 117{
 118	struct svcxprt_rdma *cma_xprt;
 119	struct svc_xprt *xprt;
 120
 121	cma_xprt = rdma_create_xprt(serv, 0);
 122	if (!cma_xprt)
 123		return ERR_PTR(-ENOMEM);
 124	xprt = &cma_xprt->sc_xprt;
 125
 126	svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
 127	serv->sv_bc_xprt = xprt;
 128
 129	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
 130	return xprt;
 131}
 132
 133static void svc_rdma_bc_detach(struct svc_xprt *xprt)
 134{
 135	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
 136}
 137
 138static void svc_rdma_bc_free(struct svc_xprt *xprt)
 139{
 140	struct svcxprt_rdma *rdma =
 141		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 142
 143	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
 144	if (xprt)
 145		kfree(rdma);
 146}
 147#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
 148
 149static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
 150					   gfp_t flags)
 151{
 152	struct svc_rdma_op_ctxt *ctxt;
 153
 154	ctxt = kmalloc(sizeof(*ctxt), flags);
 155	if (ctxt) {
 156		ctxt->xprt = xprt;
 157		INIT_LIST_HEAD(&ctxt->free);
 158		INIT_LIST_HEAD(&ctxt->dto_q);
 159	}
 160	return ctxt;
 161}
 162
 163static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
 164{
 165	unsigned int i;
 166
 167	/* Each RPC/RDMA credit can consume a number of send
 168	 * and receive WQEs. One ctxt is allocated for each.
 169	 */
 170	i = xprt->sc_sq_depth + xprt->sc_rq_depth;
 171
 172	while (i--) {
 173		struct svc_rdma_op_ctxt *ctxt;
 174
 175		ctxt = alloc_ctxt(xprt, GFP_KERNEL);
 176		if (!ctxt) {
 177			dprintk("svcrdma: No memory for RDMA ctxt\n");
 178			return false;
 179		}
 180		list_add(&ctxt->free, &xprt->sc_ctxts);
 181	}
 182	return true;
 183}
 184
 185struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
 186{
 187	struct svc_rdma_op_ctxt *ctxt = NULL;
 188
 189	spin_lock_bh(&xprt->sc_ctxt_lock);
 190	xprt->sc_ctxt_used++;
 191	if (list_empty(&xprt->sc_ctxts))
 192		goto out_empty;
 193
 194	ctxt = list_first_entry(&xprt->sc_ctxts,
 195				struct svc_rdma_op_ctxt, free);
 196	list_del_init(&ctxt->free);
 197	spin_unlock_bh(&xprt->sc_ctxt_lock);
 198
 199out:
 200	ctxt->count = 0;
 201	ctxt->frmr = NULL;
 
 202	return ctxt;
 203
 204out_empty:
 205	/* Either pre-allocation missed the mark, or send
 206	 * queue accounting is broken.
 207	 */
 208	spin_unlock_bh(&xprt->sc_ctxt_lock);
 209
 210	ctxt = alloc_ctxt(xprt, GFP_NOIO);
 211	if (ctxt)
 212		goto out;
 213
 214	spin_lock_bh(&xprt->sc_ctxt_lock);
 215	xprt->sc_ctxt_used--;
 216	spin_unlock_bh(&xprt->sc_ctxt_lock);
 217	WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
 218	return NULL;
 219}
 220
 221void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
 222{
 223	struct svcxprt_rdma *xprt = ctxt->xprt;
 224	int i;
 225	for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
 226		/*
 227		 * Unmap the DMA addr in the SGE if the lkey matches
 228		 * the local_dma_lkey, otherwise, ignore it since it is
 229		 * an FRMR lkey and will be unmapped later when the
 230		 * last WR that uses it completes.
 231		 */
 232		if (ctxt->sge[i].lkey == xprt->sc_pd->local_dma_lkey) {
 233			atomic_dec(&xprt->sc_dma_used);
 234			ib_dma_unmap_page(xprt->sc_cm_id->device,
 235					    ctxt->sge[i].addr,
 236					    ctxt->sge[i].length,
 237					    ctxt->direction);
 238		}
 239	}
 240}
 241
 242void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
 243{
 244	struct svcxprt_rdma *xprt = ctxt->xprt;
 245	int i;
 246
 
 
 247	if (free_pages)
 248		for (i = 0; i < ctxt->count; i++)
 249			put_page(ctxt->pages[i]);
 250
 251	spin_lock_bh(&xprt->sc_ctxt_lock);
 252	xprt->sc_ctxt_used--;
 253	list_add(&ctxt->free, &xprt->sc_ctxts);
 254	spin_unlock_bh(&xprt->sc_ctxt_lock);
 255}
 256
 257static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
 258{
 259	while (!list_empty(&xprt->sc_ctxts)) {
 260		struct svc_rdma_op_ctxt *ctxt;
 261
 262		ctxt = list_first_entry(&xprt->sc_ctxts,
 263					struct svc_rdma_op_ctxt, free);
 264		list_del(&ctxt->free);
 265		kfree(ctxt);
 266	}
 267}
 268
 269static struct svc_rdma_req_map *alloc_req_map(gfp_t flags)
 
 
 
 
 
 270{
 271	struct svc_rdma_req_map *map;
 272
 273	map = kmalloc(sizeof(*map), flags);
 274	if (map)
 275		INIT_LIST_HEAD(&map->free);
 276	return map;
 277}
 278
 279static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt)
 280{
 281	unsigned int i;
 282
 283	/* One for each receive buffer on this connection. */
 284	i = xprt->sc_max_requests;
 285
 286	while (i--) {
 287		struct svc_rdma_req_map *map;
 288
 289		map = alloc_req_map(GFP_KERNEL);
 290		if (!map) {
 291			dprintk("svcrdma: No memory for request map\n");
 292			return false;
 293		}
 294		list_add(&map->free, &xprt->sc_maps);
 295	}
 296	return true;
 297}
 298
 299struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt)
 300{
 301	struct svc_rdma_req_map *map = NULL;
 302
 303	spin_lock(&xprt->sc_map_lock);
 304	if (list_empty(&xprt->sc_maps))
 305		goto out_empty;
 306
 307	map = list_first_entry(&xprt->sc_maps,
 308			       struct svc_rdma_req_map, free);
 309	list_del_init(&map->free);
 310	spin_unlock(&xprt->sc_map_lock);
 311
 312out:
 313	map->count = 0;
 
 314	return map;
 315
 316out_empty:
 317	spin_unlock(&xprt->sc_map_lock);
 318
 319	/* Pre-allocation amount was incorrect */
 320	map = alloc_req_map(GFP_NOIO);
 321	if (map)
 322		goto out;
 323
 324	WARN_ONCE(1, "svcrdma: empty request map list?\n");
 325	return NULL;
 326}
 327
 328void svc_rdma_put_req_map(struct svcxprt_rdma *xprt,
 329			  struct svc_rdma_req_map *map)
 330{
 331	spin_lock(&xprt->sc_map_lock);
 332	list_add(&map->free, &xprt->sc_maps);
 333	spin_unlock(&xprt->sc_map_lock);
 334}
 335
 336static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
 
 337{
 338	while (!list_empty(&xprt->sc_maps)) {
 339		struct svc_rdma_req_map *map;
 340
 341		map = list_first_entry(&xprt->sc_maps,
 342				       struct svc_rdma_req_map, free);
 343		list_del(&map->free);
 344		kfree(map);
 345	}
 346}
 347
 348/* QP event handler */
 349static void qp_event_handler(struct ib_event *event, void *context)
 350{
 351	struct svc_xprt *xprt = context;
 352
 353	switch (event->event) {
 354	/* These are considered benign events */
 355	case IB_EVENT_PATH_MIG:
 356	case IB_EVENT_COMM_EST:
 357	case IB_EVENT_SQ_DRAINED:
 358	case IB_EVENT_QP_LAST_WQE_REACHED:
 359		dprintk("svcrdma: QP event %s (%d) received for QP=%p\n",
 360			ib_event_msg(event->event), event->event,
 361			event->element.qp);
 362		break;
 363	/* These are considered fatal events */
 364	case IB_EVENT_PATH_MIG_ERR:
 365	case IB_EVENT_QP_FATAL:
 366	case IB_EVENT_QP_REQ_ERR:
 367	case IB_EVENT_QP_ACCESS_ERR:
 368	case IB_EVENT_DEVICE_FATAL:
 369	default:
 370		dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, "
 371			"closing transport\n",
 372			ib_event_msg(event->event), event->event,
 373			event->element.qp);
 374		set_bit(XPT_CLOSE, &xprt->xpt_flags);
 375		break;
 376	}
 377}
 378
 379/**
 380 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 381 * @cq:        completion queue
 382 * @wc:        completed WR
 383 *
 
 
 
 
 
 384 */
 385static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 386{
 387	struct svcxprt_rdma *xprt = cq->cq_context;
 388	struct ib_cqe *cqe = wc->wr_cqe;
 389	struct svc_rdma_op_ctxt *ctxt;
 390
 391	/* WARNING: Only wc->wr_cqe and wc->status are reliable */
 392	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
 393	ctxt->wc_status = wc->status;
 394	svc_rdma_unmap_dma(ctxt);
 395
 396	if (wc->status != IB_WC_SUCCESS)
 397		goto flushed;
 398
 399	/* All wc fields are now known to be valid */
 400	ctxt->byte_len = wc->byte_len;
 401	spin_lock(&xprt->sc_rq_dto_lock);
 402	list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
 403	spin_unlock(&xprt->sc_rq_dto_lock);
 404
 405	set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
 406	if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
 407		goto out;
 408	svc_xprt_enqueue(&xprt->sc_xprt);
 409	goto out;
 410
 411flushed:
 412	if (wc->status != IB_WC_WR_FLUSH_ERR)
 413		pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
 414			ib_wc_status_msg(wc->status),
 415			wc->status, wc->vendor_err);
 416	set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 417	svc_rdma_put_context(ctxt, 1);
 418
 419out:
 420	svc_xprt_put(&xprt->sc_xprt);
 
 
 421}
 422
 423static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
 424				    struct ib_wc *wc,
 425				    const char *opname)
 
 
 
 
 426{
 427	if (wc->status != IB_WC_SUCCESS)
 428		goto err;
 429
 430out:
 431	atomic_dec(&xprt->sc_sq_count);
 432	wake_up(&xprt->sc_send_wait);
 433	return;
 434
 435err:
 436	set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 437	if (wc->status != IB_WC_WR_FLUSH_ERR)
 438		pr_err("svcrdma: %s: %s (%u/0x%x)\n",
 439		       opname, ib_wc_status_msg(wc->status),
 440		       wc->status, wc->vendor_err);
 441	goto out;
 442}
 443
 444static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
 445					const char *opname)
 446{
 447	struct svcxprt_rdma *xprt = cq->cq_context;
 
 
 
 
 
 
 448
 449	svc_rdma_send_wc_common(xprt, wc, opname);
 450	svc_xprt_put(&xprt->sc_xprt);
 451}
 452
 453/**
 454 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
 455 * @cq:        completion queue
 456 * @wc:        completed WR
 457 *
 
 
 
 
 458 */
 459void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 460{
 461	struct ib_cqe *cqe = wc->wr_cqe;
 462	struct svc_rdma_op_ctxt *ctxt;
 
 463
 464	svc_rdma_send_wc_common_put(cq, wc, "send");
 
 465
 466	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
 467	svc_rdma_unmap_dma(ctxt);
 468	svc_rdma_put_context(ctxt, 1);
 469}
 470
 471/**
 472 * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
 473 * @cq:        completion queue
 474 * @wc:        completed WR
 475 *
 476 */
 477void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
 478{
 479	struct ib_cqe *cqe = wc->wr_cqe;
 480	struct svc_rdma_op_ctxt *ctxt;
 
 
 
 
 
 
 
 
 481
 482	svc_rdma_send_wc_common_put(cq, wc, "write");
 
 483
 484	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
 485	svc_rdma_unmap_dma(ctxt);
 486	svc_rdma_put_context(ctxt, 0);
 
 
 
 
 
 487}
 488
 489/**
 490 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
 491 * @cq:        completion queue
 492 * @wc:        completed WR
 493 *
 494 */
 495void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
 
 496{
 497	svc_rdma_send_wc_common_put(cq, wc, "fastreg");
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 498}
 499
 500/**
 501 * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
 502 * @cq:        completion queue
 503 * @wc:        completed WR
 504 *
 
 505 */
 506void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
 507{
 508	struct svcxprt_rdma *xprt = cq->cq_context;
 509	struct ib_cqe *cqe = wc->wr_cqe;
 510	struct svc_rdma_op_ctxt *ctxt;
 511
 512	svc_rdma_send_wc_common(xprt, wc, "read");
 513
 514	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
 515	svc_rdma_unmap_dma(ctxt);
 516	svc_rdma_put_frmr(xprt, ctxt->frmr);
 517
 518	if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
 519		struct svc_rdma_op_ctxt *read_hdr;
 
 
 
 
 520
 521		read_hdr = ctxt->read_hdr;
 522		spin_lock(&xprt->sc_rq_dto_lock);
 523		list_add_tail(&read_hdr->dto_q,
 524			      &xprt->sc_read_complete_q);
 525		spin_unlock(&xprt->sc_rq_dto_lock);
 
 
 526
 527		set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
 528		svc_xprt_enqueue(&xprt->sc_xprt);
 529	}
 530
 531	svc_rdma_put_context(ctxt, 0);
 532	svc_xprt_put(&xprt->sc_xprt);
 533}
 534
 535/**
 536 * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
 537 * @cq:        completion queue
 538 * @wc:        completed WR
 539 *
 540 */
 541void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
 542{
 543	svc_rdma_send_wc_common_put(cq, wc, "localInv");
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 544}
 545
 546static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
 547					     int listener)
 548{
 549	struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
 550
 551	if (!cma_xprt)
 552		return NULL;
 553	svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
 554	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
 555	INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
 556	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
 557	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 558	INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
 559	INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
 560	INIT_LIST_HEAD(&cma_xprt->sc_maps);
 561	init_waitqueue_head(&cma_xprt->sc_send_wait);
 562
 563	spin_lock_init(&cma_xprt->sc_lock);
 564	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 565	spin_lock_init(&cma_xprt->sc_frmr_q_lock);
 566	spin_lock_init(&cma_xprt->sc_ctxt_lock);
 567	spin_lock_init(&cma_xprt->sc_map_lock);
 
 
 
 
 
 
 568
 569	if (listener)
 570		set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
 571
 572	return cma_xprt;
 573}
 574
 575int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
 
 
 
 
 
 
 
 
 
 
 
 
 
 576{
 577	struct ib_recv_wr recv_wr, *bad_recv_wr;
 578	struct svc_rdma_op_ctxt *ctxt;
 579	struct page *page;
 580	dma_addr_t pa;
 581	int sge_no;
 582	int buflen;
 583	int ret;
 584
 585	ctxt = svc_rdma_get_context(xprt);
 586	buflen = 0;
 587	ctxt->direction = DMA_FROM_DEVICE;
 588	ctxt->cqe.done = svc_rdma_wc_receive;
 589	for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
 590		if (sge_no >= xprt->sc_max_sge) {
 591			pr_err("svcrdma: Too many sges (%d)\n", sge_no);
 592			goto err_put_ctxt;
 593		}
 594		page = alloc_page(flags);
 595		if (!page)
 596			goto err_put_ctxt;
 597		ctxt->pages[sge_no] = page;
 598		pa = ib_dma_map_page(xprt->sc_cm_id->device,
 599				     page, 0, PAGE_SIZE,
 600				     DMA_FROM_DEVICE);
 601		if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
 602			goto err_put_ctxt;
 603		atomic_inc(&xprt->sc_dma_used);
 604		ctxt->sge[sge_no].addr = pa;
 605		ctxt->sge[sge_no].length = PAGE_SIZE;
 606		ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
 607		ctxt->count = sge_no + 1;
 608		buflen += PAGE_SIZE;
 609	}
 610	recv_wr.next = NULL;
 611	recv_wr.sg_list = &ctxt->sge[0];
 612	recv_wr.num_sge = ctxt->count;
 613	recv_wr.wr_cqe = &ctxt->cqe;
 614
 615	svc_xprt_get(&xprt->sc_xprt);
 616	ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
 617	if (ret) {
 618		svc_rdma_unmap_dma(ctxt);
 619		svc_rdma_put_context(ctxt, 1);
 620		svc_xprt_put(&xprt->sc_xprt);
 621	}
 622	return ret;
 623
 624 err_put_ctxt:
 625	svc_rdma_unmap_dma(ctxt);
 626	svc_rdma_put_context(ctxt, 1);
 627	return -ENOMEM;
 628}
 629
 630int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
 631{
 632	int ret = 0;
 633
 634	ret = svc_rdma_post_recv(xprt, flags);
 635	if (ret) {
 636		pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
 637		       ret);
 638		pr_err("svcrdma: closing transport %p.\n", xprt);
 639		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 640		ret = -ENOTCONN;
 641	}
 642	return ret;
 643}
 644
 645/*
 646 * This function handles the CONNECT_REQUEST event on a listening
 647 * endpoint. It is passed the cma_id for the _new_ connection. The context in
 648 * this cma_id is inherited from the listening cma_id and is the svc_xprt
 649 * structure for the listening endpoint.
 650 *
 651 * This function creates a new xprt for the new connection and enqueues it on
 652 * the accept queue for the listent xprt. When the listen thread is kicked, it
 653 * will call the recvfrom method on the listen xprt which will accept the new
 654 * connection.
 655 */
 656static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
 657{
 658	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
 659	struct svcxprt_rdma *newxprt;
 660	struct sockaddr *sa;
 661
 662	/* Create a new transport */
 663	newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
 664	if (!newxprt) {
 665		dprintk("svcrdma: failed to create new transport\n");
 666		return;
 667	}
 668	newxprt->sc_cm_id = new_cma_id;
 669	new_cma_id->context = newxprt;
 670	dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
 671		newxprt, newxprt->sc_cm_id, listen_xprt);
 672
 673	/* Save client advertised inbound read limit for use later in accept. */
 674	newxprt->sc_ord = client_ird;
 675
 676	/* Set the local and remote addresses in the transport */
 677	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
 678	svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 679	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
 680	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 681
 682	/*
 683	 * Enqueue the new transport on the accept queue of the listening
 684	 * transport
 685	 */
 686	spin_lock_bh(&listen_xprt->sc_lock);
 687	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
 688	spin_unlock_bh(&listen_xprt->sc_lock);
 689
 
 
 
 
 690	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
 691	svc_xprt_enqueue(&listen_xprt->sc_xprt);
 692}
 693
 694/*
 695 * Handles events generated on the listening endpoint. These events will be
 696 * either be incoming connect requests or adapter removal  events.
 697 */
 698static int rdma_listen_handler(struct rdma_cm_id *cma_id,
 699			       struct rdma_cm_event *event)
 700{
 701	struct svcxprt_rdma *xprt = cma_id->context;
 702	int ret = 0;
 703
 704	switch (event->event) {
 705	case RDMA_CM_EVENT_CONNECT_REQUEST:
 706		dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
 707			"event = %s (%d)\n", cma_id, cma_id->context,
 708			rdma_event_msg(event->event), event->event);
 709		handle_connect_req(cma_id,
 710				   event->param.conn.initiator_depth);
 711		break;
 712
 713	case RDMA_CM_EVENT_ESTABLISHED:
 714		/* Accept complete */
 715		dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
 716			"cm_id=%p\n", xprt, cma_id);
 717		break;
 718
 719	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 720		dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
 721			xprt, cma_id);
 722		if (xprt)
 723			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 724		break;
 725
 726	default:
 727		dprintk("svcrdma: Unexpected event on listening endpoint %p, "
 728			"event = %s (%d)\n", cma_id,
 729			rdma_event_msg(event->event), event->event);
 730		break;
 731	}
 732
 733	return ret;
 734}
 735
 736static int rdma_cma_handler(struct rdma_cm_id *cma_id,
 737			    struct rdma_cm_event *event)
 738{
 739	struct svc_xprt *xprt = cma_id->context;
 740	struct svcxprt_rdma *rdma =
 741		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 742	switch (event->event) {
 743	case RDMA_CM_EVENT_ESTABLISHED:
 744		/* Accept complete */
 745		svc_xprt_get(xprt);
 746		dprintk("svcrdma: Connection completed on DTO xprt=%p, "
 747			"cm_id=%p\n", xprt, cma_id);
 748		clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
 749		svc_xprt_enqueue(xprt);
 750		break;
 751	case RDMA_CM_EVENT_DISCONNECTED:
 752		dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
 753			xprt, cma_id);
 754		if (xprt) {
 755			set_bit(XPT_CLOSE, &xprt->xpt_flags);
 756			svc_xprt_enqueue(xprt);
 757			svc_xprt_put(xprt);
 758		}
 759		break;
 760	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 761		dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
 762			"event = %s (%d)\n", cma_id, xprt,
 763			rdma_event_msg(event->event), event->event);
 764		if (xprt) {
 765			set_bit(XPT_CLOSE, &xprt->xpt_flags);
 766			svc_xprt_enqueue(xprt);
 767			svc_xprt_put(xprt);
 768		}
 769		break;
 770	default:
 771		dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
 772			"event = %s (%d)\n", cma_id,
 773			rdma_event_msg(event->event), event->event);
 774		break;
 775	}
 776	return 0;
 777}
 778
 779/*
 780 * Create a listening RDMA service endpoint.
 781 */
 782static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 783					struct net *net,
 784					struct sockaddr *sa, int salen,
 785					int flags)
 786{
 787	struct rdma_cm_id *listen_id;
 788	struct svcxprt_rdma *cma_xprt;
 
 789	int ret;
 790
 791	dprintk("svcrdma: Creating RDMA socket\n");
 792	if (sa->sa_family != AF_INET) {
 793		dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family);
 794		return ERR_PTR(-EAFNOSUPPORT);
 795	}
 796	cma_xprt = rdma_create_xprt(serv, 1);
 797	if (!cma_xprt)
 798		return ERR_PTR(-ENOMEM);
 
 799
 800	listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt,
 801				   RDMA_PS_TCP, IB_QPT_RC);
 802	if (IS_ERR(listen_id)) {
 803		ret = PTR_ERR(listen_id);
 804		dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
 805		goto err0;
 806	}
 807
 808	ret = rdma_bind_addr(listen_id, sa);
 809	if (ret) {
 810		dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
 811		goto err1;
 812	}
 813	cma_xprt->sc_cm_id = listen_id;
 814
 815	ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
 816	if (ret) {
 817		dprintk("svcrdma: rdma_listen failed = %d\n", ret);
 818		goto err1;
 819	}
 820
 821	/*
 822	 * We need to use the address from the cm_id in case the
 823	 * caller specified 0 for the port number.
 824	 */
 825	sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
 826	svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
 827
 828	return &cma_xprt->sc_xprt;
 829
 830 err1:
 831	rdma_destroy_id(listen_id);
 832 err0:
 833	kfree(cma_xprt);
 834	return ERR_PTR(ret);
 835}
 836
 837static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
 838{
 839	struct ib_mr *mr;
 840	struct scatterlist *sg;
 841	struct svc_rdma_fastreg_mr *frmr;
 842	u32 num_sg;
 843
 844	frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);
 845	if (!frmr)
 846		goto err;
 847
 848	num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len);
 849	mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg);
 850	if (IS_ERR(mr))
 851		goto err_free_frmr;
 852
 853	sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL);
 854	if (!sg)
 
 855		goto err_free_mr;
 856
 857	sg_init_table(sg, RPCSVC_MAXPAGES);
 858
 859	frmr->mr = mr;
 860	frmr->sg = sg;
 861	INIT_LIST_HEAD(&frmr->frmr_list);
 862	return frmr;
 863
 864 err_free_mr:
 865	ib_dereg_mr(mr);
 866 err_free_frmr:
 867	kfree(frmr);
 868 err:
 869	return ERR_PTR(-ENOMEM);
 870}
 871
 872static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
 873{
 874	struct svc_rdma_fastreg_mr *frmr;
 875
 876	while (!list_empty(&xprt->sc_frmr_q)) {
 877		frmr = list_entry(xprt->sc_frmr_q.next,
 878				  struct svc_rdma_fastreg_mr, frmr_list);
 879		list_del_init(&frmr->frmr_list);
 880		kfree(frmr->sg);
 881		ib_dereg_mr(frmr->mr);
 
 882		kfree(frmr);
 883	}
 884}
 885
 886struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
 887{
 888	struct svc_rdma_fastreg_mr *frmr = NULL;
 889
 890	spin_lock_bh(&rdma->sc_frmr_q_lock);
 891	if (!list_empty(&rdma->sc_frmr_q)) {
 892		frmr = list_entry(rdma->sc_frmr_q.next,
 893				  struct svc_rdma_fastreg_mr, frmr_list);
 894		list_del_init(&frmr->frmr_list);
 895		frmr->sg_nents = 0;
 
 896	}
 897	spin_unlock_bh(&rdma->sc_frmr_q_lock);
 898	if (frmr)
 899		return frmr;
 900
 901	return rdma_alloc_frmr(rdma);
 902}
 903
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 904void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
 905		       struct svc_rdma_fastreg_mr *frmr)
 906{
 907	if (frmr) {
 908		ib_dma_unmap_sg(rdma->sc_cm_id->device,
 909				frmr->sg, frmr->sg_nents, frmr->direction);
 910		atomic_dec(&rdma->sc_dma_used);
 911		spin_lock_bh(&rdma->sc_frmr_q_lock);
 912		WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
 913		list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
 914		spin_unlock_bh(&rdma->sc_frmr_q_lock);
 915	}
 916}
 917
 918/*
 919 * This is the xpo_recvfrom function for listening endpoints. Its
 920 * purpose is to accept incoming connections. The CMA callback handler
 921 * has already created a new transport and attached it to the new CMA
 922 * ID.
 923 *
 924 * There is a queue of pending connections hung on the listening
 925 * transport. This queue contains the new svc_xprt structure. This
 926 * function takes svc_xprt structures off the accept_q and completes
 927 * the connection.
 928 */
 929static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 930{
 931	struct svcxprt_rdma *listen_rdma;
 932	struct svcxprt_rdma *newxprt = NULL;
 933	struct rdma_conn_param conn_param;
 934	struct ib_qp_init_attr qp_attr;
 935	struct ib_device *dev;
 936	unsigned int i;
 937	int ret = 0;
 
 
 938
 939	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
 940	clear_bit(XPT_CONN, &xprt->xpt_flags);
 941	/* Get the next entry off the accept list */
 942	spin_lock_bh(&listen_rdma->sc_lock);
 943	if (!list_empty(&listen_rdma->sc_accept_q)) {
 944		newxprt = list_entry(listen_rdma->sc_accept_q.next,
 945				     struct svcxprt_rdma, sc_accept_q);
 946		list_del_init(&newxprt->sc_accept_q);
 947	}
 948	if (!list_empty(&listen_rdma->sc_accept_q))
 949		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
 950	spin_unlock_bh(&listen_rdma->sc_lock);
 951	if (!newxprt)
 952		return NULL;
 953
 954	dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
 955		newxprt, newxprt->sc_cm_id);
 956
 957	dev = newxprt->sc_cm_id->device;
 
 
 
 
 
 958
 959	/* Qualify the transport resource defaults with the
 960	 * capabilities of this particular device */
 961	newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge,
 962				  (size_t)RPCSVC_MAXPAGES);
 963	newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd,
 964				       RPCSVC_MAXPAGES);
 965	newxprt->sc_max_req_size = svcrdma_max_req_size;
 966	newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr,
 967					 svcrdma_max_requests);
 968	newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr,
 969					    svcrdma_max_bc_requests);
 970	newxprt->sc_rq_depth = newxprt->sc_max_requests +
 971			       newxprt->sc_max_bc_requests;
 972	newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth;
 973
 974	if (!svc_rdma_prealloc_ctxts(newxprt))
 975		goto errout;
 976	if (!svc_rdma_prealloc_maps(newxprt))
 977		goto errout;
 978
 979	/*
 980	 * Limit ORD based on client limit, local device limit, and
 981	 * configured svcrdma limit.
 982	 */
 983	newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord);
 984	newxprt->sc_ord = min_t(size_t,	svcrdma_ord, newxprt->sc_ord);
 985
 986	newxprt->sc_pd = ib_alloc_pd(dev);
 987	if (IS_ERR(newxprt->sc_pd)) {
 988		dprintk("svcrdma: error creating PD for connect request\n");
 989		goto errout;
 990	}
 991	newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
 992					0, IB_POLL_SOFTIRQ);
 
 
 
 
 993	if (IS_ERR(newxprt->sc_sq_cq)) {
 994		dprintk("svcrdma: error creating SQ CQ for connect request\n");
 995		goto errout;
 996	}
 997	newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
 998					0, IB_POLL_SOFTIRQ);
 
 
 
 
 999	if (IS_ERR(newxprt->sc_rq_cq)) {
1000		dprintk("svcrdma: error creating RQ CQ for connect request\n");
1001		goto errout;
1002	}
1003
1004	memset(&qp_attr, 0, sizeof qp_attr);
1005	qp_attr.event_handler = qp_event_handler;
1006	qp_attr.qp_context = &newxprt->sc_xprt;
1007	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
1008	qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
1009	qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
1010	qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
1011	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1012	qp_attr.qp_type = IB_QPT_RC;
1013	qp_attr.send_cq = newxprt->sc_sq_cq;
1014	qp_attr.recv_cq = newxprt->sc_rq_cq;
1015	dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
1016		"    cm_id->device=%p, sc_pd->device=%p\n"
1017		"    cap.max_send_wr = %d\n"
1018		"    cap.max_recv_wr = %d\n"
1019		"    cap.max_send_sge = %d\n"
1020		"    cap.max_recv_sge = %d\n",
1021		newxprt->sc_cm_id, newxprt->sc_pd,
1022		dev, newxprt->sc_pd->device,
1023		qp_attr.cap.max_send_wr,
1024		qp_attr.cap.max_recv_wr,
1025		qp_attr.cap.max_send_sge,
1026		qp_attr.cap.max_recv_sge);
1027
1028	ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
1029	if (ret) {
1030		dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
1031		goto errout;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1032	}
1033	newxprt->sc_qp = newxprt->sc_cm_id->qp;
1034
1035	/*
1036	 * Use the most secure set of MR resources based on the
1037	 * transport type and available memory management features in
1038	 * the device. Here's the table implemented below:
1039	 *
1040	 *		Fast	Global	DMA	Remote WR
1041	 *		Reg	LKEY	MR	Access
1042	 *		Sup'd	Sup'd	Needed	Needed
1043	 *
1044	 * IWARP	N	N	Y	Y
1045	 *		N	Y	Y	Y
1046	 *		Y	N	Y	N
1047	 *		Y	Y	N	-
1048	 *
1049	 * IB		N	N	Y	N
1050	 *		N	Y	N	-
1051	 *		Y	N	Y	N
1052	 *		Y	Y	N	-
1053	 *
1054	 * NB:	iWARP requires remote write access for the data sink
1055	 *	of an RDMA_READ. IB does not.
1056	 */
1057	newxprt->sc_reader = rdma_read_chunk_lcl;
1058	if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
1059		newxprt->sc_frmr_pg_list_len =
1060			dev->attrs.max_fast_reg_page_list_len;
1061		newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
1062		newxprt->sc_reader = rdma_read_chunk_frmr;
1063	}
1064
1065	/*
1066	 * Determine if a DMA MR is required and if so, what privs are required
1067	 */
1068	if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) &&
1069	    !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1070		goto errout;
 
1071
1072	if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num))
1073		newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
 
 
 
 
 
 
 
 
 
 
 
 
1074
1075	/* Post receive buffers */
1076	for (i = 0; i < newxprt->sc_rq_depth; i++) {
1077		ret = svc_rdma_post_recv(newxprt, GFP_KERNEL);
1078		if (ret) {
1079			dprintk("svcrdma: failure posting receive buffers\n");
1080			goto errout;
1081		}
1082	}
1083
1084	/* Swap out the handler */
1085	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
1086
 
 
 
 
 
 
 
1087	/* Accept Connection */
1088	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
1089	memset(&conn_param, 0, sizeof conn_param);
1090	conn_param.responder_resources = 0;
1091	conn_param.initiator_depth = newxprt->sc_ord;
1092	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
1093	if (ret) {
1094		dprintk("svcrdma: failed to accept new connection, ret=%d\n",
1095		       ret);
1096		goto errout;
1097	}
1098
1099	dprintk("svcrdma: new connection %p accepted with the following "
1100		"attributes:\n"
1101		"    local_ip        : %pI4\n"
1102		"    local_port	     : %d\n"
1103		"    remote_ip       : %pI4\n"
1104		"    remote_port     : %d\n"
1105		"    max_sge         : %d\n"
1106		"    max_sge_rd      : %d\n"
1107		"    sq_depth        : %d\n"
1108		"    max_requests    : %d\n"
1109		"    ord             : %d\n",
1110		newxprt,
1111		&((struct sockaddr_in *)&newxprt->sc_cm_id->
1112			 route.addr.src_addr)->sin_addr.s_addr,
1113		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
1114		       route.addr.src_addr)->sin_port),
1115		&((struct sockaddr_in *)&newxprt->sc_cm_id->
1116			 route.addr.dst_addr)->sin_addr.s_addr,
1117		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
1118		       route.addr.dst_addr)->sin_port),
1119		newxprt->sc_max_sge,
1120		newxprt->sc_max_sge_rd,
1121		newxprt->sc_sq_depth,
1122		newxprt->sc_max_requests,
1123		newxprt->sc_ord);
1124
1125	return &newxprt->sc_xprt;
1126
1127 errout:
1128	dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
1129	/* Take a reference in case the DTO handler runs */
1130	svc_xprt_get(&newxprt->sc_xprt);
1131	if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
1132		ib_destroy_qp(newxprt->sc_qp);
1133	rdma_destroy_id(newxprt->sc_cm_id);
1134	/* This call to put will destroy the transport */
1135	svc_xprt_put(&newxprt->sc_xprt);
1136	return NULL;
1137}
1138
1139static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
1140{
1141}
1142
1143/*
1144 * When connected, an svc_xprt has at least two references:
1145 *
1146 * - A reference held by the cm_id between the ESTABLISHED and
1147 *   DISCONNECTED events. If the remote peer disconnected first, this
1148 *   reference could be gone.
1149 *
1150 * - A reference held by the svc_recv code that called this function
1151 *   as part of close processing.
1152 *
1153 * At a minimum one references should still be held.
1154 */
1155static void svc_rdma_detach(struct svc_xprt *xprt)
1156{
1157	struct svcxprt_rdma *rdma =
1158		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1159	dprintk("svc: svc_rdma_detach(%p)\n", xprt);
1160
1161	/* Disconnect and flush posted WQE */
1162	rdma_disconnect(rdma->sc_cm_id);
1163}
1164
1165static void __svc_rdma_free(struct work_struct *work)
1166{
1167	struct svcxprt_rdma *rdma =
1168		container_of(work, struct svcxprt_rdma, sc_work);
1169	struct svc_xprt *xprt = &rdma->sc_xprt;
1170
1171	dprintk("svcrdma: %s(%p)\n", __func__, rdma);
1172
1173	/* We should only be called from kref_put */
1174	if (atomic_read(&xprt->xpt_ref.refcount) != 0)
1175		pr_err("svcrdma: sc_xprt still in use? (%d)\n",
1176		       atomic_read(&xprt->xpt_ref.refcount));
1177
1178	/*
1179	 * Destroy queued, but not processed read completions. Note
1180	 * that this cleanup has to be done before destroying the
1181	 * cm_id because the device ptr is needed to unmap the dma in
1182	 * svc_rdma_put_context.
1183	 */
1184	while (!list_empty(&rdma->sc_read_complete_q)) {
1185		struct svc_rdma_op_ctxt *ctxt;
1186		ctxt = list_entry(rdma->sc_read_complete_q.next,
1187				  struct svc_rdma_op_ctxt,
1188				  dto_q);
1189		list_del_init(&ctxt->dto_q);
1190		svc_rdma_put_context(ctxt, 1);
1191	}
1192
1193	/* Destroy queued, but not processed recv completions */
1194	while (!list_empty(&rdma->sc_rq_dto_q)) {
1195		struct svc_rdma_op_ctxt *ctxt;
1196		ctxt = list_entry(rdma->sc_rq_dto_q.next,
1197				  struct svc_rdma_op_ctxt,
1198				  dto_q);
1199		list_del_init(&ctxt->dto_q);
1200		svc_rdma_put_context(ctxt, 1);
1201	}
1202
1203	/* Warn if we leaked a resource or under-referenced */
1204	if (rdma->sc_ctxt_used != 0)
1205		pr_err("svcrdma: ctxt still in use? (%d)\n",
1206		       rdma->sc_ctxt_used);
1207	if (atomic_read(&rdma->sc_dma_used) != 0)
1208		pr_err("svcrdma: dma still in use? (%d)\n",
1209		       atomic_read(&rdma->sc_dma_used));
1210
1211	/* Final put of backchannel client transport */
1212	if (xprt->xpt_bc_xprt) {
1213		xprt_put(xprt->xpt_bc_xprt);
1214		xprt->xpt_bc_xprt = NULL;
1215	}
1216
 
1217	rdma_dealloc_frmr_q(rdma);
1218	svc_rdma_destroy_ctxts(rdma);
1219	svc_rdma_destroy_maps(rdma);
1220
1221	/* Destroy the QP if present (not a listener) */
1222	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
1223		ib_destroy_qp(rdma->sc_qp);
1224
1225	if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
1226		ib_free_cq(rdma->sc_sq_cq);
1227
1228	if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
1229		ib_free_cq(rdma->sc_rq_cq);
 
 
 
1230
1231	if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
1232		ib_dealloc_pd(rdma->sc_pd);
1233
1234	/* Destroy the CM ID */
1235	rdma_destroy_id(rdma->sc_cm_id);
1236
1237	kfree(rdma);
1238}
1239
1240static void svc_rdma_free(struct svc_xprt *xprt)
1241{
1242	struct svcxprt_rdma *rdma =
1243		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1244	INIT_WORK(&rdma->sc_work, __svc_rdma_free);
1245	queue_work(svc_rdma_wq, &rdma->sc_work);
1246}
1247
1248static int svc_rdma_has_wspace(struct svc_xprt *xprt)
1249{
1250	struct svcxprt_rdma *rdma =
1251		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1252
1253	/*
1254	 * If there are already waiters on the SQ,
 
 
 
 
 
 
 
1255	 * return false.
1256	 */
1257	if (waitqueue_active(&rdma->sc_send_wait))
1258		return 0;
1259
1260	/* Otherwise return true. */
1261	return 1;
1262}
1263
1264static int svc_rdma_secure_port(struct svc_rqst *rqstp)
 
 
 
 
 
 
 
 
 
 
 
1265{
1266	return 1;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1267}
1268
1269int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1270{
1271	struct ib_send_wr *bad_wr, *n_wr;
1272	int wr_count;
1273	int i;
1274	int ret;
1275
1276	if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1277		return -ENOTCONN;
1278
 
1279	wr_count = 1;
1280	for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
1281		wr_count++;
1282
1283	/* If the SQ is full, wait until an SQ entry is available */
1284	while (1) {
1285		spin_lock_bh(&xprt->sc_lock);
1286		if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) {
1287			spin_unlock_bh(&xprt->sc_lock);
1288			atomic_inc(&rdma_stat_sq_starve);
1289
 
 
 
1290			/* Wait until SQ WR available if SQ still full */
1291			wait_event(xprt->sc_send_wait,
1292				   atomic_read(&xprt->sc_sq_count) <
1293				   xprt->sc_sq_depth);
1294			if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1295				return -ENOTCONN;
1296			continue;
1297		}
1298		/* Take a transport ref for each WR posted */
1299		for (i = 0; i < wr_count; i++)
1300			svc_xprt_get(&xprt->sc_xprt);
1301
1302		/* Bump used SQ WR count and post */
1303		atomic_add(wr_count, &xprt->sc_sq_count);
1304		ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
1305		if (ret) {
1306			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
1307			atomic_sub(wr_count, &xprt->sc_sq_count);
1308			for (i = 0; i < wr_count; i ++)
1309				svc_xprt_put(&xprt->sc_xprt);
1310			dprintk("svcrdma: failed to post SQ WR rc=%d, "
1311			       "sc_sq_count=%d, sc_sq_depth=%d\n",
1312			       ret, atomic_read(&xprt->sc_sq_count),
1313			       xprt->sc_sq_depth);
1314		}
1315		spin_unlock_bh(&xprt->sc_lock);
1316		if (ret)
1317			wake_up(&xprt->sc_send_wait);
1318		break;
1319	}
1320	return ret;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1321}
v3.1
   1/*
 
   2 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the BSD-type
   8 * license below:
   9 *
  10 * Redistribution and use in source and binary forms, with or without
  11 * modification, are permitted provided that the following conditions
  12 * are met:
  13 *
  14 *      Redistributions of source code must retain the above copyright
  15 *      notice, this list of conditions and the following disclaimer.
  16 *
  17 *      Redistributions in binary form must reproduce the above
  18 *      copyright notice, this list of conditions and the following
  19 *      disclaimer in the documentation and/or other materials provided
  20 *      with the distribution.
  21 *
  22 *      Neither the name of the Network Appliance, Inc. nor the names of
  23 *      its contributors may be used to endorse or promote products
  24 *      derived from this software without specific prior written
  25 *      permission.
  26 *
  27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38 *
  39 * Author: Tom Tucker <tom@opengridcomputing.com>
  40 */
  41
  42#include <linux/sunrpc/svc_xprt.h>
  43#include <linux/sunrpc/debug.h>
  44#include <linux/sunrpc/rpc_rdma.h>
  45#include <linux/interrupt.h>
  46#include <linux/sched.h>
  47#include <linux/slab.h>
  48#include <linux/spinlock.h>
  49#include <linux/workqueue.h>
  50#include <rdma/ib_verbs.h>
  51#include <rdma/rdma_cm.h>
  52#include <linux/sunrpc/svc_rdma.h>
 
 
  53
  54#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
  55
 
  56static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
  57					struct net *net,
  58					struct sockaddr *sa, int salen,
  59					int flags);
  60static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
  61static void svc_rdma_release_rqst(struct svc_rqst *);
  62static void dto_tasklet_func(unsigned long data);
  63static void svc_rdma_detach(struct svc_xprt *xprt);
  64static void svc_rdma_free(struct svc_xprt *xprt);
  65static int svc_rdma_has_wspace(struct svc_xprt *xprt);
  66static void rq_cq_reap(struct svcxprt_rdma *xprt);
  67static void sq_cq_reap(struct svcxprt_rdma *xprt);
  68
  69static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
  70static DEFINE_SPINLOCK(dto_lock);
  71static LIST_HEAD(dto_xprt_q);
  72
  73static struct svc_xprt_ops svc_rdma_ops = {
  74	.xpo_create = svc_rdma_create,
  75	.xpo_recvfrom = svc_rdma_recvfrom,
  76	.xpo_sendto = svc_rdma_sendto,
  77	.xpo_release_rqst = svc_rdma_release_rqst,
  78	.xpo_detach = svc_rdma_detach,
  79	.xpo_free = svc_rdma_free,
  80	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
  81	.xpo_has_wspace = svc_rdma_has_wspace,
  82	.xpo_accept = svc_rdma_accept,
 
  83};
  84
  85struct svc_xprt_class svc_rdma_class = {
  86	.xcl_name = "rdma",
  87	.xcl_owner = THIS_MODULE,
  88	.xcl_ops = &svc_rdma_ops,
  89	.xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  90};
  91
  92/* WR context cache. Created in svc_rdma.c  */
  93extern struct kmem_cache *svc_rdma_ctxt_cachep;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  94
  95/* Workqueue created in svc_rdma.c */
  96extern struct workqueue_struct *svc_rdma_wq;
 
 
 
  97
  98struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
 
  99{
 100	struct svc_rdma_op_ctxt *ctxt;
 101
 102	while (1) {
 103		ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, GFP_KERNEL);
 104		if (ctxt)
 105			break;
 106		schedule_timeout_uninterruptible(msecs_to_jiffies(500));
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 107	}
 108	ctxt->xprt = xprt;
 109	INIT_LIST_HEAD(&ctxt->dto_q);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 110	ctxt->count = 0;
 111	ctxt->frmr = NULL;
 112	atomic_inc(&xprt->sc_ctxt_used);
 113	return ctxt;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 114}
 115
 116void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
 117{
 118	struct svcxprt_rdma *xprt = ctxt->xprt;
 119	int i;
 120	for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
 121		/*
 122		 * Unmap the DMA addr in the SGE if the lkey matches
 123		 * the sc_dma_lkey, otherwise, ignore it since it is
 124		 * an FRMR lkey and will be unmapped later when the
 125		 * last WR that uses it completes.
 126		 */
 127		if (ctxt->sge[i].lkey == xprt->sc_dma_lkey) {
 128			atomic_dec(&xprt->sc_dma_used);
 129			ib_dma_unmap_page(xprt->sc_cm_id->device,
 130					    ctxt->sge[i].addr,
 131					    ctxt->sge[i].length,
 132					    ctxt->direction);
 133		}
 134	}
 135}
 136
 137void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
 138{
 139	struct svcxprt_rdma *xprt;
 140	int i;
 141
 142	BUG_ON(!ctxt);
 143	xprt = ctxt->xprt;
 144	if (free_pages)
 145		for (i = 0; i < ctxt->count; i++)
 146			put_page(ctxt->pages[i]);
 147
 148	kmem_cache_free(svc_rdma_ctxt_cachep, ctxt);
 149	atomic_dec(&xprt->sc_ctxt_used);
 
 
 150}
 151
 152/* Temporary NFS request map cache. Created in svc_rdma.c  */
 153extern struct kmem_cache *svc_rdma_map_cachep;
 
 
 
 
 
 
 
 
 
 154
 155/*
 156 * Temporary NFS req mappings are shared across all transport
 157 * instances. These are short lived and should be bounded by the number
 158 * of concurrent server threads * depth of the SQ.
 159 */
 160struct svc_rdma_req_map *svc_rdma_get_req_map(void)
 161{
 162	struct svc_rdma_req_map *map;
 163	while (1) {
 164		map = kmem_cache_alloc(svc_rdma_map_cachep, GFP_KERNEL);
 165		if (map)
 166			break;
 167		schedule_timeout_uninterruptible(msecs_to_jiffies(500));
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 168	}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 169	map->count = 0;
 170	map->frmr = NULL;
 171	return map;
 
 
 
 
 
 
 
 
 
 
 
 172}
 173
 174void svc_rdma_put_req_map(struct svc_rdma_req_map *map)
 
 175{
 176	kmem_cache_free(svc_rdma_map_cachep, map);
 
 
 177}
 178
 179/* ib_cq event handler */
 180static void cq_event_handler(struct ib_event *event, void *context)
 181{
 182	struct svc_xprt *xprt = context;
 183	dprintk("svcrdma: received CQ event id=%d, context=%p\n",
 184		event->event, context);
 185	set_bit(XPT_CLOSE, &xprt->xpt_flags);
 
 
 
 
 186}
 187
 188/* QP event handler */
 189static void qp_event_handler(struct ib_event *event, void *context)
 190{
 191	struct svc_xprt *xprt = context;
 192
 193	switch (event->event) {
 194	/* These are considered benign events */
 195	case IB_EVENT_PATH_MIG:
 196	case IB_EVENT_COMM_EST:
 197	case IB_EVENT_SQ_DRAINED:
 198	case IB_EVENT_QP_LAST_WQE_REACHED:
 199		dprintk("svcrdma: QP event %d received for QP=%p\n",
 200			event->event, event->element.qp);
 
 201		break;
 202	/* These are considered fatal events */
 203	case IB_EVENT_PATH_MIG_ERR:
 204	case IB_EVENT_QP_FATAL:
 205	case IB_EVENT_QP_REQ_ERR:
 206	case IB_EVENT_QP_ACCESS_ERR:
 207	case IB_EVENT_DEVICE_FATAL:
 208	default:
 209		dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
 210			"closing transport\n",
 211			event->event, event->element.qp);
 
 212		set_bit(XPT_CLOSE, &xprt->xpt_flags);
 213		break;
 214	}
 215}
 216
 217/*
 218 * Data Transfer Operation Tasklet
 
 
 219 *
 220 * Walks a list of transports with I/O pending, removing entries as
 221 * they are added to the server's I/O pending list. Two bits indicate
 222 * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
 223 * spinlock that serializes access to the transport list with the RQ
 224 * and SQ interrupt handlers.
 225 */
 226static void dto_tasklet_func(unsigned long data)
 227{
 228	struct svcxprt_rdma *xprt;
 229	unsigned long flags;
 
 230
 231	spin_lock_irqsave(&dto_lock, flags);
 232	while (!list_empty(&dto_xprt_q)) {
 233		xprt = list_entry(dto_xprt_q.next,
 234				  struct svcxprt_rdma, sc_dto_q);
 235		list_del_init(&xprt->sc_dto_q);
 236		spin_unlock_irqrestore(&dto_lock, flags);
 
 237
 238		rq_cq_reap(xprt);
 239		sq_cq_reap(xprt);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 240
 241		svc_xprt_put(&xprt->sc_xprt);
 242		spin_lock_irqsave(&dto_lock, flags);
 243	}
 244	spin_unlock_irqrestore(&dto_lock, flags);
 245}
 246
 247/*
 248 * Receive Queue Completion Handler
 249 *
 250 * Since an RQ completion handler is called on interrupt context, we
 251 * need to defer the handling of the I/O to a tasklet
 252 */
 253static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
 254{
 255	struct svcxprt_rdma *xprt = cq_context;
 256	unsigned long flags;
 257
 258	/* Guard against unconditional flush call for destroyed QP */
 259	if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
 260		return;
 
 261
 262	/*
 263	 * Set the bit regardless of whether or not it's on the list
 264	 * because it may be on the list already due to an SQ
 265	 * completion.
 266	 */
 267	set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
 
 
 268
 269	/*
 270	 * If this transport is not already on the DTO transport queue,
 271	 * add it
 272	 */
 273	spin_lock_irqsave(&dto_lock, flags);
 274	if (list_empty(&xprt->sc_dto_q)) {
 275		svc_xprt_get(&xprt->sc_xprt);
 276		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
 277	}
 278	spin_unlock_irqrestore(&dto_lock, flags);
 279
 280	/* Tasklet does all the work to avoid irqsave locks. */
 281	tasklet_schedule(&dto_tasklet);
 282}
 283
 284/*
 285 * rq_cq_reap - Process the RQ CQ.
 
 
 286 *
 287 * Take all completing WC off the CQE and enqueue the associated DTO
 288 * context on the dto_q for the transport.
 289 *
 290 * Note that caller must hold a transport reference.
 291 */
 292static void rq_cq_reap(struct svcxprt_rdma *xprt)
 293{
 294	int ret;
 295	struct ib_wc wc;
 296	struct svc_rdma_op_ctxt *ctxt = NULL;
 297
 298	if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
 299		return;
 300
 301	ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
 302	atomic_inc(&rdma_stat_rq_poll);
 
 
 303
 304	while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
 305		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
 306		ctxt->wc_status = wc.status;
 307		ctxt->byte_len = wc.byte_len;
 308		svc_rdma_unmap_dma(ctxt);
 309		if (wc.status != IB_WC_SUCCESS) {
 310			/* Close the transport */
 311			dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
 312			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 313			svc_rdma_put_context(ctxt, 1);
 314			svc_xprt_put(&xprt->sc_xprt);
 315			continue;
 316		}
 317		spin_lock_bh(&xprt->sc_rq_dto_lock);
 318		list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
 319		spin_unlock_bh(&xprt->sc_rq_dto_lock);
 320		svc_xprt_put(&xprt->sc_xprt);
 321	}
 322
 323	if (ctxt)
 324		atomic_inc(&rdma_stat_rq_prod);
 325
 326	set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
 327	/*
 328	 * If data arrived before established event,
 329	 * don't enqueue. This defers RPC I/O until the
 330	 * RDMA connection is complete.
 331	 */
 332	if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
 333		svc_xprt_enqueue(&xprt->sc_xprt);
 334}
 335
 336/*
 337 * Process a completion context
 
 
 
 338 */
 339static void process_context(struct svcxprt_rdma *xprt,
 340			    struct svc_rdma_op_ctxt *ctxt)
 341{
 342	svc_rdma_unmap_dma(ctxt);
 343
 344	switch (ctxt->wr_op) {
 345	case IB_WR_SEND:
 346		if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
 347			svc_rdma_put_frmr(xprt, ctxt->frmr);
 348		svc_rdma_put_context(ctxt, 1);
 349		break;
 350
 351	case IB_WR_RDMA_WRITE:
 352		svc_rdma_put_context(ctxt, 0);
 353		break;
 354
 355	case IB_WR_RDMA_READ:
 356	case IB_WR_RDMA_READ_WITH_INV:
 357		if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
 358			struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
 359			BUG_ON(!read_hdr);
 360			if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
 361				svc_rdma_put_frmr(xprt, ctxt->frmr);
 362			spin_lock_bh(&xprt->sc_rq_dto_lock);
 363			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
 364			list_add_tail(&read_hdr->dto_q,
 365				      &xprt->sc_read_complete_q);
 366			spin_unlock_bh(&xprt->sc_rq_dto_lock);
 367			svc_xprt_enqueue(&xprt->sc_xprt);
 368		}
 369		svc_rdma_put_context(ctxt, 0);
 370		break;
 371
 372	default:
 373		printk(KERN_ERR "svcrdma: unexpected completion type, "
 374		       "opcode=%d\n",
 375		       ctxt->wr_op);
 376		break;
 377	}
 378}
 379
 380/*
 381 * Send Queue Completion Handler - potentially called on interrupt context.
 
 
 382 *
 383 * Note that caller must hold a transport reference.
 384 */
 385static void sq_cq_reap(struct svcxprt_rdma *xprt)
 386{
 387	struct svc_rdma_op_ctxt *ctxt = NULL;
 388	struct ib_wc wc;
 389	struct ib_cq *cq = xprt->sc_sq_cq;
 390	int ret;
 
 391
 392	if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
 393		return;
 
 394
 395	ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
 396	atomic_inc(&rdma_stat_sq_poll);
 397	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
 398		if (wc.status != IB_WC_SUCCESS)
 399			/* Close the transport */
 400			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 401
 402		/* Decrement used SQ WR count */
 403		atomic_dec(&xprt->sc_sq_count);
 404		wake_up(&xprt->sc_send_wait);
 405
 406		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
 407		if (ctxt)
 408			process_context(xprt, ctxt);
 409
 410		svc_xprt_put(&xprt->sc_xprt);
 
 411	}
 412
 413	if (ctxt)
 414		atomic_inc(&rdma_stat_sq_prod);
 415}
 416
 417static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
 
 
 
 
 
 
 418{
 419	struct svcxprt_rdma *xprt = cq_context;
 420	unsigned long flags;
 421
 422	/* Guard against unconditional flush call for destroyed QP */
 423	if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
 424		return;
 425
 426	/*
 427	 * Set the bit regardless of whether or not it's on the list
 428	 * because it may be on the list already due to an RQ
 429	 * completion.
 430	 */
 431	set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
 432
 433	/*
 434	 * If this transport is not already on the DTO transport queue,
 435	 * add it
 436	 */
 437	spin_lock_irqsave(&dto_lock, flags);
 438	if (list_empty(&xprt->sc_dto_q)) {
 439		svc_xprt_get(&xprt->sc_xprt);
 440		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
 441	}
 442	spin_unlock_irqrestore(&dto_lock, flags);
 443
 444	/* Tasklet does all the work to avoid irqsave locks. */
 445	tasklet_schedule(&dto_tasklet);
 446}
 447
 448static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
 449					     int listener)
 450{
 451	struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
 452
 453	if (!cma_xprt)
 454		return NULL;
 455	svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv);
 456	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
 457	INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
 458	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
 459	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 460	INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
 
 
 461	init_waitqueue_head(&cma_xprt->sc_send_wait);
 462
 463	spin_lock_init(&cma_xprt->sc_lock);
 464	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 465	spin_lock_init(&cma_xprt->sc_frmr_q_lock);
 466
 467	cma_xprt->sc_ord = svcrdma_ord;
 468
 469	cma_xprt->sc_max_req_size = svcrdma_max_req_size;
 470	cma_xprt->sc_max_requests = svcrdma_max_requests;
 471	cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
 472	atomic_set(&cma_xprt->sc_sq_count, 0);
 473	atomic_set(&cma_xprt->sc_ctxt_used, 0);
 474
 475	if (listener)
 476		set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
 477
 478	return cma_xprt;
 479}
 480
 481struct page *svc_rdma_get_page(void)
 482{
 483	struct page *page;
 484
 485	while ((page = alloc_page(GFP_KERNEL)) == NULL) {
 486		/* If we can't get memory, wait a bit and try again */
 487		printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 "
 488		       "jiffies.\n");
 489		schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
 490	}
 491	return page;
 492}
 493
 494int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
 495{
 496	struct ib_recv_wr recv_wr, *bad_recv_wr;
 497	struct svc_rdma_op_ctxt *ctxt;
 498	struct page *page;
 499	dma_addr_t pa;
 500	int sge_no;
 501	int buflen;
 502	int ret;
 503
 504	ctxt = svc_rdma_get_context(xprt);
 505	buflen = 0;
 506	ctxt->direction = DMA_FROM_DEVICE;
 
 507	for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
 508		BUG_ON(sge_no >= xprt->sc_max_sge);
 509		page = svc_rdma_get_page();
 
 
 
 
 
 510		ctxt->pages[sge_no] = page;
 511		pa = ib_dma_map_page(xprt->sc_cm_id->device,
 512				     page, 0, PAGE_SIZE,
 513				     DMA_FROM_DEVICE);
 514		if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
 515			goto err_put_ctxt;
 516		atomic_inc(&xprt->sc_dma_used);
 517		ctxt->sge[sge_no].addr = pa;
 518		ctxt->sge[sge_no].length = PAGE_SIZE;
 519		ctxt->sge[sge_no].lkey = xprt->sc_dma_lkey;
 520		ctxt->count = sge_no + 1;
 521		buflen += PAGE_SIZE;
 522	}
 523	recv_wr.next = NULL;
 524	recv_wr.sg_list = &ctxt->sge[0];
 525	recv_wr.num_sge = ctxt->count;
 526	recv_wr.wr_id = (u64)(unsigned long)ctxt;
 527
 528	svc_xprt_get(&xprt->sc_xprt);
 529	ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
 530	if (ret) {
 531		svc_rdma_unmap_dma(ctxt);
 532		svc_rdma_put_context(ctxt, 1);
 533		svc_xprt_put(&xprt->sc_xprt);
 534	}
 535	return ret;
 536
 537 err_put_ctxt:
 538	svc_rdma_unmap_dma(ctxt);
 539	svc_rdma_put_context(ctxt, 1);
 540	return -ENOMEM;
 541}
 542
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 543/*
 544 * This function handles the CONNECT_REQUEST event on a listening
 545 * endpoint. It is passed the cma_id for the _new_ connection. The context in
 546 * this cma_id is inherited from the listening cma_id and is the svc_xprt
 547 * structure for the listening endpoint.
 548 *
 549 * This function creates a new xprt for the new connection and enqueues it on
 550 * the accept queue for the listent xprt. When the listen thread is kicked, it
 551 * will call the recvfrom method on the listen xprt which will accept the new
 552 * connection.
 553 */
 554static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
 555{
 556	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
 557	struct svcxprt_rdma *newxprt;
 558	struct sockaddr *sa;
 559
 560	/* Create a new transport */
 561	newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
 562	if (!newxprt) {
 563		dprintk("svcrdma: failed to create new transport\n");
 564		return;
 565	}
 566	newxprt->sc_cm_id = new_cma_id;
 567	new_cma_id->context = newxprt;
 568	dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
 569		newxprt, newxprt->sc_cm_id, listen_xprt);
 570
 571	/* Save client advertised inbound read limit for use later in accept. */
 572	newxprt->sc_ord = client_ird;
 573
 574	/* Set the local and remote addresses in the transport */
 575	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
 576	svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 577	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
 578	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 579
 580	/*
 581	 * Enqueue the new transport on the accept queue of the listening
 582	 * transport
 583	 */
 584	spin_lock_bh(&listen_xprt->sc_lock);
 585	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
 586	spin_unlock_bh(&listen_xprt->sc_lock);
 587
 588	/*
 589	 * Can't use svc_xprt_received here because we are not on a
 590	 * rqstp thread
 591	*/
 592	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
 593	svc_xprt_enqueue(&listen_xprt->sc_xprt);
 594}
 595
 596/*
 597 * Handles events generated on the listening endpoint. These events will be
 598 * either be incoming connect requests or adapter removal  events.
 599 */
 600static int rdma_listen_handler(struct rdma_cm_id *cma_id,
 601			       struct rdma_cm_event *event)
 602{
 603	struct svcxprt_rdma *xprt = cma_id->context;
 604	int ret = 0;
 605
 606	switch (event->event) {
 607	case RDMA_CM_EVENT_CONNECT_REQUEST:
 608		dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
 609			"event=%d\n", cma_id, cma_id->context, event->event);
 
 610		handle_connect_req(cma_id,
 611				   event->param.conn.initiator_depth);
 612		break;
 613
 614	case RDMA_CM_EVENT_ESTABLISHED:
 615		/* Accept complete */
 616		dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
 617			"cm_id=%p\n", xprt, cma_id);
 618		break;
 619
 620	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 621		dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
 622			xprt, cma_id);
 623		if (xprt)
 624			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 625		break;
 626
 627	default:
 628		dprintk("svcrdma: Unexpected event on listening endpoint %p, "
 629			"event=%d\n", cma_id, event->event);
 
 630		break;
 631	}
 632
 633	return ret;
 634}
 635
 636static int rdma_cma_handler(struct rdma_cm_id *cma_id,
 637			    struct rdma_cm_event *event)
 638{
 639	struct svc_xprt *xprt = cma_id->context;
 640	struct svcxprt_rdma *rdma =
 641		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 642	switch (event->event) {
 643	case RDMA_CM_EVENT_ESTABLISHED:
 644		/* Accept complete */
 645		svc_xprt_get(xprt);
 646		dprintk("svcrdma: Connection completed on DTO xprt=%p, "
 647			"cm_id=%p\n", xprt, cma_id);
 648		clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
 649		svc_xprt_enqueue(xprt);
 650		break;
 651	case RDMA_CM_EVENT_DISCONNECTED:
 652		dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
 653			xprt, cma_id);
 654		if (xprt) {
 655			set_bit(XPT_CLOSE, &xprt->xpt_flags);
 656			svc_xprt_enqueue(xprt);
 657			svc_xprt_put(xprt);
 658		}
 659		break;
 660	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 661		dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
 662			"event=%d\n", cma_id, xprt, event->event);
 
 663		if (xprt) {
 664			set_bit(XPT_CLOSE, &xprt->xpt_flags);
 665			svc_xprt_enqueue(xprt);
 
 666		}
 667		break;
 668	default:
 669		dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
 670			"event=%d\n", cma_id, event->event);
 
 671		break;
 672	}
 673	return 0;
 674}
 675
 676/*
 677 * Create a listening RDMA service endpoint.
 678 */
 679static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 680					struct net *net,
 681					struct sockaddr *sa, int salen,
 682					int flags)
 683{
 684	struct rdma_cm_id *listen_id;
 685	struct svcxprt_rdma *cma_xprt;
 686	struct svc_xprt *xprt;
 687	int ret;
 688
 689	dprintk("svcrdma: Creating RDMA socket\n");
 690	if (sa->sa_family != AF_INET) {
 691		dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family);
 692		return ERR_PTR(-EAFNOSUPPORT);
 693	}
 694	cma_xprt = rdma_create_xprt(serv, 1);
 695	if (!cma_xprt)
 696		return ERR_PTR(-ENOMEM);
 697	xprt = &cma_xprt->sc_xprt;
 698
 699	listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP,
 700				   IB_QPT_RC);
 701	if (IS_ERR(listen_id)) {
 702		ret = PTR_ERR(listen_id);
 703		dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
 704		goto err0;
 705	}
 706
 707	ret = rdma_bind_addr(listen_id, sa);
 708	if (ret) {
 709		dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
 710		goto err1;
 711	}
 712	cma_xprt->sc_cm_id = listen_id;
 713
 714	ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
 715	if (ret) {
 716		dprintk("svcrdma: rdma_listen failed = %d\n", ret);
 717		goto err1;
 718	}
 719
 720	/*
 721	 * We need to use the address from the cm_id in case the
 722	 * caller specified 0 for the port number.
 723	 */
 724	sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
 725	svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
 726
 727	return &cma_xprt->sc_xprt;
 728
 729 err1:
 730	rdma_destroy_id(listen_id);
 731 err0:
 732	kfree(cma_xprt);
 733	return ERR_PTR(ret);
 734}
 735
 736static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
 737{
 738	struct ib_mr *mr;
 739	struct ib_fast_reg_page_list *pl;
 740	struct svc_rdma_fastreg_mr *frmr;
 
 741
 742	frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);
 743	if (!frmr)
 744		goto err;
 745
 746	mr = ib_alloc_fast_reg_mr(xprt->sc_pd, RPCSVC_MAXPAGES);
 
 747	if (IS_ERR(mr))
 748		goto err_free_frmr;
 749
 750	pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device,
 751					 RPCSVC_MAXPAGES);
 752	if (IS_ERR(pl))
 753		goto err_free_mr;
 754
 
 
 755	frmr->mr = mr;
 756	frmr->page_list = pl;
 757	INIT_LIST_HEAD(&frmr->frmr_list);
 758	return frmr;
 759
 760 err_free_mr:
 761	ib_dereg_mr(mr);
 762 err_free_frmr:
 763	kfree(frmr);
 764 err:
 765	return ERR_PTR(-ENOMEM);
 766}
 767
 768static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
 769{
 770	struct svc_rdma_fastreg_mr *frmr;
 771
 772	while (!list_empty(&xprt->sc_frmr_q)) {
 773		frmr = list_entry(xprt->sc_frmr_q.next,
 774				  struct svc_rdma_fastreg_mr, frmr_list);
 775		list_del_init(&frmr->frmr_list);
 
 776		ib_dereg_mr(frmr->mr);
 777		ib_free_fast_reg_page_list(frmr->page_list);
 778		kfree(frmr);
 779	}
 780}
 781
 782struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
 783{
 784	struct svc_rdma_fastreg_mr *frmr = NULL;
 785
 786	spin_lock_bh(&rdma->sc_frmr_q_lock);
 787	if (!list_empty(&rdma->sc_frmr_q)) {
 788		frmr = list_entry(rdma->sc_frmr_q.next,
 789				  struct svc_rdma_fastreg_mr, frmr_list);
 790		list_del_init(&frmr->frmr_list);
 791		frmr->map_len = 0;
 792		frmr->page_list_len = 0;
 793	}
 794	spin_unlock_bh(&rdma->sc_frmr_q_lock);
 795	if (frmr)
 796		return frmr;
 797
 798	return rdma_alloc_frmr(rdma);
 799}
 800
 801static void frmr_unmap_dma(struct svcxprt_rdma *xprt,
 802			   struct svc_rdma_fastreg_mr *frmr)
 803{
 804	int page_no;
 805	for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
 806		dma_addr_t addr = frmr->page_list->page_list[page_no];
 807		if (ib_dma_mapping_error(frmr->mr->device, addr))
 808			continue;
 809		atomic_dec(&xprt->sc_dma_used);
 810		ib_dma_unmap_page(frmr->mr->device, addr, PAGE_SIZE,
 811				  frmr->direction);
 812	}
 813}
 814
 815void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
 816		       struct svc_rdma_fastreg_mr *frmr)
 817{
 818	if (frmr) {
 819		frmr_unmap_dma(rdma, frmr);
 
 
 820		spin_lock_bh(&rdma->sc_frmr_q_lock);
 821		BUG_ON(!list_empty(&frmr->frmr_list));
 822		list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
 823		spin_unlock_bh(&rdma->sc_frmr_q_lock);
 824	}
 825}
 826
 827/*
 828 * This is the xpo_recvfrom function for listening endpoints. Its
 829 * purpose is to accept incoming connections. The CMA callback handler
 830 * has already created a new transport and attached it to the new CMA
 831 * ID.
 832 *
 833 * There is a queue of pending connections hung on the listening
 834 * transport. This queue contains the new svc_xprt structure. This
 835 * function takes svc_xprt structures off the accept_q and completes
 836 * the connection.
 837 */
 838static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 839{
 840	struct svcxprt_rdma *listen_rdma;
 841	struct svcxprt_rdma *newxprt = NULL;
 842	struct rdma_conn_param conn_param;
 843	struct ib_qp_init_attr qp_attr;
 844	struct ib_device_attr devattr;
 845	int uninitialized_var(dma_mr_acc);
 846	int need_dma_mr;
 847	int ret;
 848	int i;
 849
 850	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
 851	clear_bit(XPT_CONN, &xprt->xpt_flags);
 852	/* Get the next entry off the accept list */
 853	spin_lock_bh(&listen_rdma->sc_lock);
 854	if (!list_empty(&listen_rdma->sc_accept_q)) {
 855		newxprt = list_entry(listen_rdma->sc_accept_q.next,
 856				     struct svcxprt_rdma, sc_accept_q);
 857		list_del_init(&newxprt->sc_accept_q);
 858	}
 859	if (!list_empty(&listen_rdma->sc_accept_q))
 860		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
 861	spin_unlock_bh(&listen_rdma->sc_lock);
 862	if (!newxprt)
 863		return NULL;
 864
 865	dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
 866		newxprt, newxprt->sc_cm_id);
 867
 868	ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
 869	if (ret) {
 870		dprintk("svcrdma: could not query device attributes on "
 871			"device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
 872		goto errout;
 873	}
 874
 875	/* Qualify the transport resource defaults with the
 876	 * capabilities of this particular device */
 877	newxprt->sc_max_sge = min((size_t)devattr.max_sge,
 878				  (size_t)RPCSVC_MAXPAGES);
 879	newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
 880				   (size_t)svcrdma_max_requests);
 881	newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
 
 
 
 
 
 
 
 
 
 
 
 
 882
 883	/*
 884	 * Limit ORD based on client limit, local device limit, and
 885	 * configured svcrdma limit.
 886	 */
 887	newxprt->sc_ord = min_t(size_t, devattr.max_qp_rd_atom, newxprt->sc_ord);
 888	newxprt->sc_ord = min_t(size_t,	svcrdma_ord, newxprt->sc_ord);
 889
 890	newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
 891	if (IS_ERR(newxprt->sc_pd)) {
 892		dprintk("svcrdma: error creating PD for connect request\n");
 893		goto errout;
 894	}
 895	newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
 896					 sq_comp_handler,
 897					 cq_event_handler,
 898					 newxprt,
 899					 newxprt->sc_sq_depth,
 900					 0);
 901	if (IS_ERR(newxprt->sc_sq_cq)) {
 902		dprintk("svcrdma: error creating SQ CQ for connect request\n");
 903		goto errout;
 904	}
 905	newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
 906					 rq_comp_handler,
 907					 cq_event_handler,
 908					 newxprt,
 909					 newxprt->sc_max_requests,
 910					 0);
 911	if (IS_ERR(newxprt->sc_rq_cq)) {
 912		dprintk("svcrdma: error creating RQ CQ for connect request\n");
 913		goto errout;
 914	}
 915
 916	memset(&qp_attr, 0, sizeof qp_attr);
 917	qp_attr.event_handler = qp_event_handler;
 918	qp_attr.qp_context = &newxprt->sc_xprt;
 919	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
 920	qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
 921	qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
 922	qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
 923	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 924	qp_attr.qp_type = IB_QPT_RC;
 925	qp_attr.send_cq = newxprt->sc_sq_cq;
 926	qp_attr.recv_cq = newxprt->sc_rq_cq;
 927	dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
 928		"    cm_id->device=%p, sc_pd->device=%p\n"
 929		"    cap.max_send_wr = %d\n"
 930		"    cap.max_recv_wr = %d\n"
 931		"    cap.max_send_sge = %d\n"
 932		"    cap.max_recv_sge = %d\n",
 933		newxprt->sc_cm_id, newxprt->sc_pd,
 934		newxprt->sc_cm_id->device, newxprt->sc_pd->device,
 935		qp_attr.cap.max_send_wr,
 936		qp_attr.cap.max_recv_wr,
 937		qp_attr.cap.max_send_sge,
 938		qp_attr.cap.max_recv_sge);
 939
 940	ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
 941	if (ret) {
 942		/*
 943		 * XXX: This is a hack. We need a xx_request_qp interface
 944		 * that will adjust the qp_attr's with a best-effort
 945		 * number
 946		 */
 947		qp_attr.cap.max_send_sge -= 2;
 948		qp_attr.cap.max_recv_sge -= 2;
 949		ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd,
 950				     &qp_attr);
 951		if (ret) {
 952			dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
 953			goto errout;
 954		}
 955		newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
 956		newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
 957		newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
 958		newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
 959	}
 960	newxprt->sc_qp = newxprt->sc_cm_id->qp;
 961
 962	/*
 963	 * Use the most secure set of MR resources based on the
 964	 * transport type and available memory management features in
 965	 * the device. Here's the table implemented below:
 966	 *
 967	 *		Fast	Global	DMA	Remote WR
 968	 *		Reg	LKEY	MR	Access
 969	 *		Sup'd	Sup'd	Needed	Needed
 970	 *
 971	 * IWARP	N	N	Y	Y
 972	 *		N	Y	Y	Y
 973	 *		Y	N	Y	N
 974	 *		Y	Y	N	-
 975	 *
 976	 * IB		N	N	Y	N
 977	 *		N	Y	N	-
 978	 *		Y	N	Y	N
 979	 *		Y	Y	N	-
 980	 *
 981	 * NB:	iWARP requires remote write access for the data sink
 982	 *	of an RDMA_READ. IB does not.
 983	 */
 984	if (devattr.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
 
 985		newxprt->sc_frmr_pg_list_len =
 986			devattr.max_fast_reg_page_list_len;
 987		newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
 
 988	}
 989
 990	/*
 991	 * Determine if a DMA MR is required and if so, what privs are required
 992	 */
 993	switch (rdma_node_get_transport(newxprt->sc_cm_id->device->node_type)) {
 994	case RDMA_TRANSPORT_IWARP:
 995		newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
 996		if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {
 997			need_dma_mr = 1;
 998			dma_mr_acc =
 999				(IB_ACCESS_LOCAL_WRITE |
1000				 IB_ACCESS_REMOTE_WRITE);
1001		} else if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
1002			need_dma_mr = 1;
1003			dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
1004		} else
1005			need_dma_mr = 0;
1006		break;
1007	case RDMA_TRANSPORT_IB:
1008		if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
1009			need_dma_mr = 1;
1010			dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
1011		} else
1012			need_dma_mr = 0;
1013		break;
1014	default:
1015		goto errout;
1016	}
1017
1018	/* Create the DMA MR if needed, otherwise, use the DMA LKEY */
1019	if (need_dma_mr) {
1020		/* Register all of physical memory */
1021		newxprt->sc_phys_mr =
1022			ib_get_dma_mr(newxprt->sc_pd, dma_mr_acc);
1023		if (IS_ERR(newxprt->sc_phys_mr)) {
1024			dprintk("svcrdma: Failed to create DMA MR ret=%d\n",
1025				ret);
1026			goto errout;
1027		}
1028		newxprt->sc_dma_lkey = newxprt->sc_phys_mr->lkey;
1029	} else
1030		newxprt->sc_dma_lkey =
1031			newxprt->sc_cm_id->device->local_dma_lkey;
1032
1033	/* Post receive buffers */
1034	for (i = 0; i < newxprt->sc_max_requests; i++) {
1035		ret = svc_rdma_post_recv(newxprt);
1036		if (ret) {
1037			dprintk("svcrdma: failure posting receive buffers\n");
1038			goto errout;
1039		}
1040	}
1041
1042	/* Swap out the handler */
1043	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
1044
1045	/*
1046	 * Arm the CQs for the SQ and RQ before accepting so we can't
1047	 * miss the first message
1048	 */
1049	ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
1050	ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
1051
1052	/* Accept Connection */
1053	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
1054	memset(&conn_param, 0, sizeof conn_param);
1055	conn_param.responder_resources = 0;
1056	conn_param.initiator_depth = newxprt->sc_ord;
1057	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
1058	if (ret) {
1059		dprintk("svcrdma: failed to accept new connection, ret=%d\n",
1060		       ret);
1061		goto errout;
1062	}
1063
1064	dprintk("svcrdma: new connection %p accepted with the following "
1065		"attributes:\n"
1066		"    local_ip        : %pI4\n"
1067		"    local_port	     : %d\n"
1068		"    remote_ip       : %pI4\n"
1069		"    remote_port     : %d\n"
1070		"    max_sge         : %d\n"
 
1071		"    sq_depth        : %d\n"
1072		"    max_requests    : %d\n"
1073		"    ord             : %d\n",
1074		newxprt,
1075		&((struct sockaddr_in *)&newxprt->sc_cm_id->
1076			 route.addr.src_addr)->sin_addr.s_addr,
1077		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
1078		       route.addr.src_addr)->sin_port),
1079		&((struct sockaddr_in *)&newxprt->sc_cm_id->
1080			 route.addr.dst_addr)->sin_addr.s_addr,
1081		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
1082		       route.addr.dst_addr)->sin_port),
1083		newxprt->sc_max_sge,
 
1084		newxprt->sc_sq_depth,
1085		newxprt->sc_max_requests,
1086		newxprt->sc_ord);
1087
1088	return &newxprt->sc_xprt;
1089
1090 errout:
1091	dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
1092	/* Take a reference in case the DTO handler runs */
1093	svc_xprt_get(&newxprt->sc_xprt);
1094	if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
1095		ib_destroy_qp(newxprt->sc_qp);
1096	rdma_destroy_id(newxprt->sc_cm_id);
1097	/* This call to put will destroy the transport */
1098	svc_xprt_put(&newxprt->sc_xprt);
1099	return NULL;
1100}
1101
1102static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
1103{
1104}
1105
1106/*
1107 * When connected, an svc_xprt has at least two references:
1108 *
1109 * - A reference held by the cm_id between the ESTABLISHED and
1110 *   DISCONNECTED events. If the remote peer disconnected first, this
1111 *   reference could be gone.
1112 *
1113 * - A reference held by the svc_recv code that called this function
1114 *   as part of close processing.
1115 *
1116 * At a minimum one references should still be held.
1117 */
1118static void svc_rdma_detach(struct svc_xprt *xprt)
1119{
1120	struct svcxprt_rdma *rdma =
1121		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1122	dprintk("svc: svc_rdma_detach(%p)\n", xprt);
1123
1124	/* Disconnect and flush posted WQE */
1125	rdma_disconnect(rdma->sc_cm_id);
1126}
1127
1128static void __svc_rdma_free(struct work_struct *work)
1129{
1130	struct svcxprt_rdma *rdma =
1131		container_of(work, struct svcxprt_rdma, sc_work);
1132	dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
 
 
1133
1134	/* We should only be called from kref_put */
1135	BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
 
 
1136
1137	/*
1138	 * Destroy queued, but not processed read completions. Note
1139	 * that this cleanup has to be done before destroying the
1140	 * cm_id because the device ptr is needed to unmap the dma in
1141	 * svc_rdma_put_context.
1142	 */
1143	while (!list_empty(&rdma->sc_read_complete_q)) {
1144		struct svc_rdma_op_ctxt *ctxt;
1145		ctxt = list_entry(rdma->sc_read_complete_q.next,
1146				  struct svc_rdma_op_ctxt,
1147				  dto_q);
1148		list_del_init(&ctxt->dto_q);
1149		svc_rdma_put_context(ctxt, 1);
1150	}
1151
1152	/* Destroy queued, but not processed recv completions */
1153	while (!list_empty(&rdma->sc_rq_dto_q)) {
1154		struct svc_rdma_op_ctxt *ctxt;
1155		ctxt = list_entry(rdma->sc_rq_dto_q.next,
1156				  struct svc_rdma_op_ctxt,
1157				  dto_q);
1158		list_del_init(&ctxt->dto_q);
1159		svc_rdma_put_context(ctxt, 1);
1160	}
1161
1162	/* Warn if we leaked a resource or under-referenced */
1163	WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
1164	WARN_ON(atomic_read(&rdma->sc_dma_used) != 0);
 
 
 
 
 
 
 
 
 
 
1165
1166	/* De-allocate fastreg mr */
1167	rdma_dealloc_frmr_q(rdma);
 
 
1168
1169	/* Destroy the QP if present (not a listener) */
1170	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
1171		ib_destroy_qp(rdma->sc_qp);
1172
1173	if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
1174		ib_destroy_cq(rdma->sc_sq_cq);
1175
1176	if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
1177		ib_destroy_cq(rdma->sc_rq_cq);
1178
1179	if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr))
1180		ib_dereg_mr(rdma->sc_phys_mr);
1181
1182	if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
1183		ib_dealloc_pd(rdma->sc_pd);
1184
1185	/* Destroy the CM ID */
1186	rdma_destroy_id(rdma->sc_cm_id);
1187
1188	kfree(rdma);
1189}
1190
1191static void svc_rdma_free(struct svc_xprt *xprt)
1192{
1193	struct svcxprt_rdma *rdma =
1194		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1195	INIT_WORK(&rdma->sc_work, __svc_rdma_free);
1196	queue_work(svc_rdma_wq, &rdma->sc_work);
1197}
1198
1199static int svc_rdma_has_wspace(struct svc_xprt *xprt)
1200{
1201	struct svcxprt_rdma *rdma =
1202		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1203
1204	/*
1205	 * If there are fewer SQ WR available than required to send a
1206	 * simple response, return false.
1207	 */
1208	if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
1209		return 0;
1210
1211	/*
1212	 * ...or there are already waiters on the SQ,
1213	 * return false.
1214	 */
1215	if (waitqueue_active(&rdma->sc_send_wait))
1216		return 0;
1217
1218	/* Otherwise return true. */
1219	return 1;
1220}
1221
1222/*
1223 * Attempt to register the kvec representing the RPC memory with the
1224 * device.
1225 *
1226 * Returns:
1227 *  NULL : The device does not support fastreg or there were no more
1228 *         fastreg mr.
1229 *  frmr : The kvec register request was successfully posted.
1230 *    <0 : An error was encountered attempting to register the kvec.
1231 */
1232int svc_rdma_fastreg(struct svcxprt_rdma *xprt,
1233		     struct svc_rdma_fastreg_mr *frmr)
1234{
1235	struct ib_send_wr fastreg_wr;
1236	u8 key;
1237
1238	/* Bump the key */
1239	key = (u8)(frmr->mr->lkey & 0x000000FF);
1240	ib_update_fast_reg_key(frmr->mr, ++key);
1241
1242	/* Prepare FASTREG WR */
1243	memset(&fastreg_wr, 0, sizeof fastreg_wr);
1244	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1245	fastreg_wr.send_flags = IB_SEND_SIGNALED;
1246	fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
1247	fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
1248	fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
1249	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1250	fastreg_wr.wr.fast_reg.length = frmr->map_len;
1251	fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
1252	fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
1253	return svc_rdma_send(xprt, &fastreg_wr);
1254}
1255
1256int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1257{
1258	struct ib_send_wr *bad_wr, *n_wr;
1259	int wr_count;
1260	int i;
1261	int ret;
1262
1263	if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1264		return -ENOTCONN;
1265
1266	BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
1267	wr_count = 1;
1268	for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
1269		wr_count++;
1270
1271	/* If the SQ is full, wait until an SQ entry is available */
1272	while (1) {
1273		spin_lock_bh(&xprt->sc_lock);
1274		if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) {
1275			spin_unlock_bh(&xprt->sc_lock);
1276			atomic_inc(&rdma_stat_sq_starve);
1277
1278			/* See if we can opportunistically reap SQ WR to make room */
1279			sq_cq_reap(xprt);
1280
1281			/* Wait until SQ WR available if SQ still full */
1282			wait_event(xprt->sc_send_wait,
1283				   atomic_read(&xprt->sc_sq_count) <
1284				   xprt->sc_sq_depth);
1285			if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1286				return -ENOTCONN;
1287			continue;
1288		}
1289		/* Take a transport ref for each WR posted */
1290		for (i = 0; i < wr_count; i++)
1291			svc_xprt_get(&xprt->sc_xprt);
1292
1293		/* Bump used SQ WR count and post */
1294		atomic_add(wr_count, &xprt->sc_sq_count);
1295		ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
1296		if (ret) {
1297			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
1298			atomic_sub(wr_count, &xprt->sc_sq_count);
1299			for (i = 0; i < wr_count; i ++)
1300				svc_xprt_put(&xprt->sc_xprt);
1301			dprintk("svcrdma: failed to post SQ WR rc=%d, "
1302			       "sc_sq_count=%d, sc_sq_depth=%d\n",
1303			       ret, atomic_read(&xprt->sc_sq_count),
1304			       xprt->sc_sq_depth);
1305		}
1306		spin_unlock_bh(&xprt->sc_lock);
1307		if (ret)
1308			wake_up(&xprt->sc_send_wait);
1309		break;
1310	}
1311	return ret;
1312}
1313
1314void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1315			 enum rpcrdma_errcode err)
1316{
1317	struct ib_send_wr err_wr;
1318	struct page *p;
1319	struct svc_rdma_op_ctxt *ctxt;
1320	u32 *va;
1321	int length;
1322	int ret;
1323
1324	p = svc_rdma_get_page();
1325	va = page_address(p);
1326
1327	/* XDR encode error */
1328	length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
1329
1330	ctxt = svc_rdma_get_context(xprt);
1331	ctxt->direction = DMA_FROM_DEVICE;
1332	ctxt->count = 1;
1333	ctxt->pages[0] = p;
1334
1335	/* Prepare SGE for local address */
1336	ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
1337					    p, 0, length, DMA_FROM_DEVICE);
1338	if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
1339		put_page(p);
1340		svc_rdma_put_context(ctxt, 1);
1341		return;
1342	}
1343	atomic_inc(&xprt->sc_dma_used);
1344	ctxt->sge[0].lkey = xprt->sc_dma_lkey;
1345	ctxt->sge[0].length = length;
1346
1347	/* Prepare SEND WR */
1348	memset(&err_wr, 0, sizeof err_wr);
1349	ctxt->wr_op = IB_WR_SEND;
1350	err_wr.wr_id = (unsigned long)ctxt;
1351	err_wr.sg_list = ctxt->sge;
1352	err_wr.num_sge = 1;
1353	err_wr.opcode = IB_WR_SEND;
1354	err_wr.send_flags = IB_SEND_SIGNALED;
1355
1356	/* Post It */
1357	ret = svc_rdma_send(xprt, &err_wr);
1358	if (ret) {
1359		dprintk("svcrdma: Error %d posting send for protocol error\n",
1360			ret);
1361		svc_rdma_unmap_dma(ctxt);
1362		svc_rdma_put_context(ctxt, 1);
1363	}
1364}