Linux Audio

Check our new training course

Loading...
   1/*
   2 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the BSD-type
   8 * license below:
   9 *
  10 * Redistribution and use in source and binary forms, with or without
  11 * modification, are permitted provided that the following conditions
  12 * are met:
  13 *
  14 *      Redistributions of source code must retain the above copyright
  15 *      notice, this list of conditions and the following disclaimer.
  16 *
  17 *      Redistributions in binary form must reproduce the above
  18 *      copyright notice, this list of conditions and the following
  19 *      disclaimer in the documentation and/or other materials provided
  20 *      with the distribution.
  21 *
  22 *      Neither the name of the Network Appliance, Inc. nor the names of
  23 *      its contributors may be used to endorse or promote products
  24 *      derived from this software without specific prior written
  25 *      permission.
  26 *
  27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38 *
  39 * Author: Tom Tucker <tom@opengridcomputing.com>
  40 */
  41
  42#include <linux/sunrpc/svc_xprt.h>
  43#include <linux/sunrpc/debug.h>
  44#include <linux/sunrpc/rpc_rdma.h>
  45#include <linux/interrupt.h>
  46#include <linux/sched.h>
  47#include <linux/slab.h>
  48#include <linux/spinlock.h>
  49#include <linux/workqueue.h>
  50#include <rdma/ib_verbs.h>
  51#include <rdma/rdma_cm.h>
  52#include <linux/sunrpc/svc_rdma.h>
  53#include <linux/export.h>
  54#include "xprt_rdma.h"
  55
  56#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
  57
  58static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
  59					struct net *net,
  60					struct sockaddr *sa, int salen,
  61					int flags);
  62static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
  63static void svc_rdma_release_rqst(struct svc_rqst *);
  64static void dto_tasklet_func(unsigned long data);
  65static void svc_rdma_detach(struct svc_xprt *xprt);
  66static void svc_rdma_free(struct svc_xprt *xprt);
  67static int svc_rdma_has_wspace(struct svc_xprt *xprt);
  68static void rq_cq_reap(struct svcxprt_rdma *xprt);
  69static void sq_cq_reap(struct svcxprt_rdma *xprt);
  70
  71static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
  72static DEFINE_SPINLOCK(dto_lock);
  73static LIST_HEAD(dto_xprt_q);
  74
  75static struct svc_xprt_ops svc_rdma_ops = {
  76	.xpo_create = svc_rdma_create,
  77	.xpo_recvfrom = svc_rdma_recvfrom,
  78	.xpo_sendto = svc_rdma_sendto,
  79	.xpo_release_rqst = svc_rdma_release_rqst,
  80	.xpo_detach = svc_rdma_detach,
  81	.xpo_free = svc_rdma_free,
  82	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
  83	.xpo_has_wspace = svc_rdma_has_wspace,
  84	.xpo_accept = svc_rdma_accept,
  85};
  86
  87struct svc_xprt_class svc_rdma_class = {
  88	.xcl_name = "rdma",
  89	.xcl_owner = THIS_MODULE,
  90	.xcl_ops = &svc_rdma_ops,
  91	.xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
  92};
  93
  94struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
  95{
  96	struct svc_rdma_op_ctxt *ctxt;
  97
  98	while (1) {
  99		ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, GFP_KERNEL);
 100		if (ctxt)
 101			break;
 102		schedule_timeout_uninterruptible(msecs_to_jiffies(500));
 103	}
 104	ctxt->xprt = xprt;
 105	INIT_LIST_HEAD(&ctxt->dto_q);
 106	ctxt->count = 0;
 107	ctxt->frmr = NULL;
 108	atomic_inc(&xprt->sc_ctxt_used);
 109	return ctxt;
 110}
 111
 112void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
 113{
 114	struct svcxprt_rdma *xprt = ctxt->xprt;
 115	int i;
 116	for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
 117		/*
 118		 * Unmap the DMA addr in the SGE if the lkey matches
 119		 * the sc_dma_lkey, otherwise, ignore it since it is
 120		 * an FRMR lkey and will be unmapped later when the
 121		 * last WR that uses it completes.
 122		 */
 123		if (ctxt->sge[i].lkey == xprt->sc_dma_lkey) {
 124			atomic_dec(&xprt->sc_dma_used);
 125			ib_dma_unmap_page(xprt->sc_cm_id->device,
 126					    ctxt->sge[i].addr,
 127					    ctxt->sge[i].length,
 128					    ctxt->direction);
 129		}
 130	}
 131}
 132
 133void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
 134{
 135	struct svcxprt_rdma *xprt;
 136	int i;
 137
 138	BUG_ON(!ctxt);
 139	xprt = ctxt->xprt;
 140	if (free_pages)
 141		for (i = 0; i < ctxt->count; i++)
 142			put_page(ctxt->pages[i]);
 143
 144	kmem_cache_free(svc_rdma_ctxt_cachep, ctxt);
 145	atomic_dec(&xprt->sc_ctxt_used);
 146}
 147
 148/*
 149 * Temporary NFS req mappings are shared across all transport
 150 * instances. These are short lived and should be bounded by the number
 151 * of concurrent server threads * depth of the SQ.
 152 */
 153struct svc_rdma_req_map *svc_rdma_get_req_map(void)
 154{
 155	struct svc_rdma_req_map *map;
 156	while (1) {
 157		map = kmem_cache_alloc(svc_rdma_map_cachep, GFP_KERNEL);
 158		if (map)
 159			break;
 160		schedule_timeout_uninterruptible(msecs_to_jiffies(500));
 161	}
 162	map->count = 0;
 163	map->frmr = NULL;
 164	return map;
 165}
 166
 167void svc_rdma_put_req_map(struct svc_rdma_req_map *map)
 168{
 169	kmem_cache_free(svc_rdma_map_cachep, map);
 170}
 171
 172/* ib_cq event handler */
 173static void cq_event_handler(struct ib_event *event, void *context)
 174{
 175	struct svc_xprt *xprt = context;
 176	dprintk("svcrdma: received CQ event id=%d, context=%p\n",
 177		event->event, context);
 178	set_bit(XPT_CLOSE, &xprt->xpt_flags);
 179}
 180
 181/* QP event handler */
 182static void qp_event_handler(struct ib_event *event, void *context)
 183{
 184	struct svc_xprt *xprt = context;
 185
 186	switch (event->event) {
 187	/* These are considered benign events */
 188	case IB_EVENT_PATH_MIG:
 189	case IB_EVENT_COMM_EST:
 190	case IB_EVENT_SQ_DRAINED:
 191	case IB_EVENT_QP_LAST_WQE_REACHED:
 192		dprintk("svcrdma: QP event %d received for QP=%p\n",
 193			event->event, event->element.qp);
 194		break;
 195	/* These are considered fatal events */
 196	case IB_EVENT_PATH_MIG_ERR:
 197	case IB_EVENT_QP_FATAL:
 198	case IB_EVENT_QP_REQ_ERR:
 199	case IB_EVENT_QP_ACCESS_ERR:
 200	case IB_EVENT_DEVICE_FATAL:
 201	default:
 202		dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
 203			"closing transport\n",
 204			event->event, event->element.qp);
 205		set_bit(XPT_CLOSE, &xprt->xpt_flags);
 206		break;
 207	}
 208}
 209
 210/*
 211 * Data Transfer Operation Tasklet
 212 *
 213 * Walks a list of transports with I/O pending, removing entries as
 214 * they are added to the server's I/O pending list. Two bits indicate
 215 * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
 216 * spinlock that serializes access to the transport list with the RQ
 217 * and SQ interrupt handlers.
 218 */
 219static void dto_tasklet_func(unsigned long data)
 220{
 221	struct svcxprt_rdma *xprt;
 222	unsigned long flags;
 223
 224	spin_lock_irqsave(&dto_lock, flags);
 225	while (!list_empty(&dto_xprt_q)) {
 226		xprt = list_entry(dto_xprt_q.next,
 227				  struct svcxprt_rdma, sc_dto_q);
 228		list_del_init(&xprt->sc_dto_q);
 229		spin_unlock_irqrestore(&dto_lock, flags);
 230
 231		rq_cq_reap(xprt);
 232		sq_cq_reap(xprt);
 233
 234		svc_xprt_put(&xprt->sc_xprt);
 235		spin_lock_irqsave(&dto_lock, flags);
 236	}
 237	spin_unlock_irqrestore(&dto_lock, flags);
 238}
 239
 240/*
 241 * Receive Queue Completion Handler
 242 *
 243 * Since an RQ completion handler is called on interrupt context, we
 244 * need to defer the handling of the I/O to a tasklet
 245 */
 246static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
 247{
 248	struct svcxprt_rdma *xprt = cq_context;
 249	unsigned long flags;
 250
 251	/* Guard against unconditional flush call for destroyed QP */
 252	if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
 253		return;
 254
 255	/*
 256	 * Set the bit regardless of whether or not it's on the list
 257	 * because it may be on the list already due to an SQ
 258	 * completion.
 259	 */
 260	set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
 261
 262	/*
 263	 * If this transport is not already on the DTO transport queue,
 264	 * add it
 265	 */
 266	spin_lock_irqsave(&dto_lock, flags);
 267	if (list_empty(&xprt->sc_dto_q)) {
 268		svc_xprt_get(&xprt->sc_xprt);
 269		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
 270	}
 271	spin_unlock_irqrestore(&dto_lock, flags);
 272
 273	/* Tasklet does all the work to avoid irqsave locks. */
 274	tasklet_schedule(&dto_tasklet);
 275}
 276
 277/*
 278 * rq_cq_reap - Process the RQ CQ.
 279 *
 280 * Take all completing WC off the CQE and enqueue the associated DTO
 281 * context on the dto_q for the transport.
 282 *
 283 * Note that caller must hold a transport reference.
 284 */
 285static void rq_cq_reap(struct svcxprt_rdma *xprt)
 286{
 287	int ret;
 288	struct ib_wc wc;
 289	struct svc_rdma_op_ctxt *ctxt = NULL;
 290
 291	if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
 292		return;
 293
 294	ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
 295	atomic_inc(&rdma_stat_rq_poll);
 296
 297	while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
 298		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
 299		ctxt->wc_status = wc.status;
 300		ctxt->byte_len = wc.byte_len;
 301		svc_rdma_unmap_dma(ctxt);
 302		if (wc.status != IB_WC_SUCCESS) {
 303			/* Close the transport */
 304			dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
 305			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 306			svc_rdma_put_context(ctxt, 1);
 307			svc_xprt_put(&xprt->sc_xprt);
 308			continue;
 309		}
 310		spin_lock_bh(&xprt->sc_rq_dto_lock);
 311		list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
 312		spin_unlock_bh(&xprt->sc_rq_dto_lock);
 313		svc_xprt_put(&xprt->sc_xprt);
 314	}
 315
 316	if (ctxt)
 317		atomic_inc(&rdma_stat_rq_prod);
 318
 319	set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
 320	/*
 321	 * If data arrived before established event,
 322	 * don't enqueue. This defers RPC I/O until the
 323	 * RDMA connection is complete.
 324	 */
 325	if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
 326		svc_xprt_enqueue(&xprt->sc_xprt);
 327}
 328
 329/*
 330 * Process a completion context
 331 */
 332static void process_context(struct svcxprt_rdma *xprt,
 333			    struct svc_rdma_op_ctxt *ctxt)
 334{
 335	svc_rdma_unmap_dma(ctxt);
 336
 337	switch (ctxt->wr_op) {
 338	case IB_WR_SEND:
 339		if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
 340			svc_rdma_put_frmr(xprt, ctxt->frmr);
 341		svc_rdma_put_context(ctxt, 1);
 342		break;
 343
 344	case IB_WR_RDMA_WRITE:
 345		svc_rdma_put_context(ctxt, 0);
 346		break;
 347
 348	case IB_WR_RDMA_READ:
 349	case IB_WR_RDMA_READ_WITH_INV:
 350		if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
 351			struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
 352			BUG_ON(!read_hdr);
 353			if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
 354				svc_rdma_put_frmr(xprt, ctxt->frmr);
 355			spin_lock_bh(&xprt->sc_rq_dto_lock);
 356			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
 357			list_add_tail(&read_hdr->dto_q,
 358				      &xprt->sc_read_complete_q);
 359			spin_unlock_bh(&xprt->sc_rq_dto_lock);
 360			svc_xprt_enqueue(&xprt->sc_xprt);
 361		}
 362		svc_rdma_put_context(ctxt, 0);
 363		break;
 364
 365	default:
 366		printk(KERN_ERR "svcrdma: unexpected completion type, "
 367		       "opcode=%d\n",
 368		       ctxt->wr_op);
 369		break;
 370	}
 371}
 372
 373/*
 374 * Send Queue Completion Handler - potentially called on interrupt context.
 375 *
 376 * Note that caller must hold a transport reference.
 377 */
 378static void sq_cq_reap(struct svcxprt_rdma *xprt)
 379{
 380	struct svc_rdma_op_ctxt *ctxt = NULL;
 381	struct ib_wc wc;
 382	struct ib_cq *cq = xprt->sc_sq_cq;
 383	int ret;
 384
 385	if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
 386		return;
 387
 388	ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
 389	atomic_inc(&rdma_stat_sq_poll);
 390	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
 391		if (wc.status != IB_WC_SUCCESS)
 392			/* Close the transport */
 393			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 394
 395		/* Decrement used SQ WR count */
 396		atomic_dec(&xprt->sc_sq_count);
 397		wake_up(&xprt->sc_send_wait);
 398
 399		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
 400		if (ctxt)
 401			process_context(xprt, ctxt);
 402
 403		svc_xprt_put(&xprt->sc_xprt);
 404	}
 405
 406	if (ctxt)
 407		atomic_inc(&rdma_stat_sq_prod);
 408}
 409
 410static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
 411{
 412	struct svcxprt_rdma *xprt = cq_context;
 413	unsigned long flags;
 414
 415	/* Guard against unconditional flush call for destroyed QP */
 416	if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
 417		return;
 418
 419	/*
 420	 * Set the bit regardless of whether or not it's on the list
 421	 * because it may be on the list already due to an RQ
 422	 * completion.
 423	 */
 424	set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
 425
 426	/*
 427	 * If this transport is not already on the DTO transport queue,
 428	 * add it
 429	 */
 430	spin_lock_irqsave(&dto_lock, flags);
 431	if (list_empty(&xprt->sc_dto_q)) {
 432		svc_xprt_get(&xprt->sc_xprt);
 433		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
 434	}
 435	spin_unlock_irqrestore(&dto_lock, flags);
 436
 437	/* Tasklet does all the work to avoid irqsave locks. */
 438	tasklet_schedule(&dto_tasklet);
 439}
 440
 441static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
 442					     int listener)
 443{
 444	struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
 445
 446	if (!cma_xprt)
 447		return NULL;
 448	svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
 449	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
 450	INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
 451	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
 452	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 453	INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
 454	init_waitqueue_head(&cma_xprt->sc_send_wait);
 455
 456	spin_lock_init(&cma_xprt->sc_lock);
 457	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 458	spin_lock_init(&cma_xprt->sc_frmr_q_lock);
 459
 460	cma_xprt->sc_ord = svcrdma_ord;
 461
 462	cma_xprt->sc_max_req_size = svcrdma_max_req_size;
 463	cma_xprt->sc_max_requests = svcrdma_max_requests;
 464	cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
 465	atomic_set(&cma_xprt->sc_sq_count, 0);
 466	atomic_set(&cma_xprt->sc_ctxt_used, 0);
 467
 468	if (listener)
 469		set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
 470
 471	return cma_xprt;
 472}
 473
 474struct page *svc_rdma_get_page(void)
 475{
 476	struct page *page;
 477
 478	while ((page = alloc_page(GFP_KERNEL)) == NULL) {
 479		/* If we can't get memory, wait a bit and try again */
 480		printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 "
 481		       "jiffies.\n");
 482		schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
 483	}
 484	return page;
 485}
 486
 487int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
 488{
 489	struct ib_recv_wr recv_wr, *bad_recv_wr;
 490	struct svc_rdma_op_ctxt *ctxt;
 491	struct page *page;
 492	dma_addr_t pa;
 493	int sge_no;
 494	int buflen;
 495	int ret;
 496
 497	ctxt = svc_rdma_get_context(xprt);
 498	buflen = 0;
 499	ctxt->direction = DMA_FROM_DEVICE;
 500	for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
 501		BUG_ON(sge_no >= xprt->sc_max_sge);
 502		page = svc_rdma_get_page();
 503		ctxt->pages[sge_no] = page;
 504		pa = ib_dma_map_page(xprt->sc_cm_id->device,
 505				     page, 0, PAGE_SIZE,
 506				     DMA_FROM_DEVICE);
 507		if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
 508			goto err_put_ctxt;
 509		atomic_inc(&xprt->sc_dma_used);
 510		ctxt->sge[sge_no].addr = pa;
 511		ctxt->sge[sge_no].length = PAGE_SIZE;
 512		ctxt->sge[sge_no].lkey = xprt->sc_dma_lkey;
 513		ctxt->count = sge_no + 1;
 514		buflen += PAGE_SIZE;
 515	}
 516	recv_wr.next = NULL;
 517	recv_wr.sg_list = &ctxt->sge[0];
 518	recv_wr.num_sge = ctxt->count;
 519	recv_wr.wr_id = (u64)(unsigned long)ctxt;
 520
 521	svc_xprt_get(&xprt->sc_xprt);
 522	ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
 523	if (ret) {
 524		svc_rdma_unmap_dma(ctxt);
 525		svc_rdma_put_context(ctxt, 1);
 526		svc_xprt_put(&xprt->sc_xprt);
 527	}
 528	return ret;
 529
 530 err_put_ctxt:
 531	svc_rdma_unmap_dma(ctxt);
 532	svc_rdma_put_context(ctxt, 1);
 533	return -ENOMEM;
 534}
 535
 536/*
 537 * This function handles the CONNECT_REQUEST event on a listening
 538 * endpoint. It is passed the cma_id for the _new_ connection. The context in
 539 * this cma_id is inherited from the listening cma_id and is the svc_xprt
 540 * structure for the listening endpoint.
 541 *
 542 * This function creates a new xprt for the new connection and enqueues it on
 543 * the accept queue for the listent xprt. When the listen thread is kicked, it
 544 * will call the recvfrom method on the listen xprt which will accept the new
 545 * connection.
 546 */
 547static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
 548{
 549	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
 550	struct svcxprt_rdma *newxprt;
 551	struct sockaddr *sa;
 552
 553	/* Create a new transport */
 554	newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
 555	if (!newxprt) {
 556		dprintk("svcrdma: failed to create new transport\n");
 557		return;
 558	}
 559	newxprt->sc_cm_id = new_cma_id;
 560	new_cma_id->context = newxprt;
 561	dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
 562		newxprt, newxprt->sc_cm_id, listen_xprt);
 563
 564	/* Save client advertised inbound read limit for use later in accept. */
 565	newxprt->sc_ord = client_ird;
 566
 567	/* Set the local and remote addresses in the transport */
 568	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
 569	svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 570	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
 571	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
 572
 573	/*
 574	 * Enqueue the new transport on the accept queue of the listening
 575	 * transport
 576	 */
 577	spin_lock_bh(&listen_xprt->sc_lock);
 578	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
 579	spin_unlock_bh(&listen_xprt->sc_lock);
 580
 581	/*
 582	 * Can't use svc_xprt_received here because we are not on a
 583	 * rqstp thread
 584	*/
 585	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
 586	svc_xprt_enqueue(&listen_xprt->sc_xprt);
 587}
 588
 589/*
 590 * Handles events generated on the listening endpoint. These events will be
 591 * either be incoming connect requests or adapter removal  events.
 592 */
 593static int rdma_listen_handler(struct rdma_cm_id *cma_id,
 594			       struct rdma_cm_event *event)
 595{
 596	struct svcxprt_rdma *xprt = cma_id->context;
 597	int ret = 0;
 598
 599	switch (event->event) {
 600	case RDMA_CM_EVENT_CONNECT_REQUEST:
 601		dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
 602			"event=%d\n", cma_id, cma_id->context, event->event);
 603		handle_connect_req(cma_id,
 604				   event->param.conn.initiator_depth);
 605		break;
 606
 607	case RDMA_CM_EVENT_ESTABLISHED:
 608		/* Accept complete */
 609		dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
 610			"cm_id=%p\n", xprt, cma_id);
 611		break;
 612
 613	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 614		dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
 615			xprt, cma_id);
 616		if (xprt)
 617			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 618		break;
 619
 620	default:
 621		dprintk("svcrdma: Unexpected event on listening endpoint %p, "
 622			"event=%d\n", cma_id, event->event);
 623		break;
 624	}
 625
 626	return ret;
 627}
 628
 629static int rdma_cma_handler(struct rdma_cm_id *cma_id,
 630			    struct rdma_cm_event *event)
 631{
 632	struct svc_xprt *xprt = cma_id->context;
 633	struct svcxprt_rdma *rdma =
 634		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 635	switch (event->event) {
 636	case RDMA_CM_EVENT_ESTABLISHED:
 637		/* Accept complete */
 638		svc_xprt_get(xprt);
 639		dprintk("svcrdma: Connection completed on DTO xprt=%p, "
 640			"cm_id=%p\n", xprt, cma_id);
 641		clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
 642		svc_xprt_enqueue(xprt);
 643		break;
 644	case RDMA_CM_EVENT_DISCONNECTED:
 645		dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
 646			xprt, cma_id);
 647		if (xprt) {
 648			set_bit(XPT_CLOSE, &xprt->xpt_flags);
 649			svc_xprt_enqueue(xprt);
 650			svc_xprt_put(xprt);
 651		}
 652		break;
 653	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 654		dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
 655			"event=%d\n", cma_id, xprt, event->event);
 656		if (xprt) {
 657			set_bit(XPT_CLOSE, &xprt->xpt_flags);
 658			svc_xprt_enqueue(xprt);
 659		}
 660		break;
 661	default:
 662		dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
 663			"event=%d\n", cma_id, event->event);
 664		break;
 665	}
 666	return 0;
 667}
 668
 669/*
 670 * Create a listening RDMA service endpoint.
 671 */
 672static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 673					struct net *net,
 674					struct sockaddr *sa, int salen,
 675					int flags)
 676{
 677	struct rdma_cm_id *listen_id;
 678	struct svcxprt_rdma *cma_xprt;
 679	struct svc_xprt *xprt;
 680	int ret;
 681
 682	dprintk("svcrdma: Creating RDMA socket\n");
 683	if (sa->sa_family != AF_INET) {
 684		dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family);
 685		return ERR_PTR(-EAFNOSUPPORT);
 686	}
 687	cma_xprt = rdma_create_xprt(serv, 1);
 688	if (!cma_xprt)
 689		return ERR_PTR(-ENOMEM);
 690	xprt = &cma_xprt->sc_xprt;
 691
 692	listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP,
 693				   IB_QPT_RC);
 694	if (IS_ERR(listen_id)) {
 695		ret = PTR_ERR(listen_id);
 696		dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
 697		goto err0;
 698	}
 699
 700	ret = rdma_bind_addr(listen_id, sa);
 701	if (ret) {
 702		dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
 703		goto err1;
 704	}
 705	cma_xprt->sc_cm_id = listen_id;
 706
 707	ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
 708	if (ret) {
 709		dprintk("svcrdma: rdma_listen failed = %d\n", ret);
 710		goto err1;
 711	}
 712
 713	/*
 714	 * We need to use the address from the cm_id in case the
 715	 * caller specified 0 for the port number.
 716	 */
 717	sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
 718	svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
 719
 720	return &cma_xprt->sc_xprt;
 721
 722 err1:
 723	rdma_destroy_id(listen_id);
 724 err0:
 725	kfree(cma_xprt);
 726	return ERR_PTR(ret);
 727}
 728
 729static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
 730{
 731	struct ib_mr *mr;
 732	struct ib_fast_reg_page_list *pl;
 733	struct svc_rdma_fastreg_mr *frmr;
 734
 735	frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);
 736	if (!frmr)
 737		goto err;
 738
 739	mr = ib_alloc_fast_reg_mr(xprt->sc_pd, RPCSVC_MAXPAGES);
 740	if (IS_ERR(mr))
 741		goto err_free_frmr;
 742
 743	pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device,
 744					 RPCSVC_MAXPAGES);
 745	if (IS_ERR(pl))
 746		goto err_free_mr;
 747
 748	frmr->mr = mr;
 749	frmr->page_list = pl;
 750	INIT_LIST_HEAD(&frmr->frmr_list);
 751	return frmr;
 752
 753 err_free_mr:
 754	ib_dereg_mr(mr);
 755 err_free_frmr:
 756	kfree(frmr);
 757 err:
 758	return ERR_PTR(-ENOMEM);
 759}
 760
 761static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
 762{
 763	struct svc_rdma_fastreg_mr *frmr;
 764
 765	while (!list_empty(&xprt->sc_frmr_q)) {
 766		frmr = list_entry(xprt->sc_frmr_q.next,
 767				  struct svc_rdma_fastreg_mr, frmr_list);
 768		list_del_init(&frmr->frmr_list);
 769		ib_dereg_mr(frmr->mr);
 770		ib_free_fast_reg_page_list(frmr->page_list);
 771		kfree(frmr);
 772	}
 773}
 774
 775struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
 776{
 777	struct svc_rdma_fastreg_mr *frmr = NULL;
 778
 779	spin_lock_bh(&rdma->sc_frmr_q_lock);
 780	if (!list_empty(&rdma->sc_frmr_q)) {
 781		frmr = list_entry(rdma->sc_frmr_q.next,
 782				  struct svc_rdma_fastreg_mr, frmr_list);
 783		list_del_init(&frmr->frmr_list);
 784		frmr->map_len = 0;
 785		frmr->page_list_len = 0;
 786	}
 787	spin_unlock_bh(&rdma->sc_frmr_q_lock);
 788	if (frmr)
 789		return frmr;
 790
 791	return rdma_alloc_frmr(rdma);
 792}
 793
 794static void frmr_unmap_dma(struct svcxprt_rdma *xprt,
 795			   struct svc_rdma_fastreg_mr *frmr)
 796{
 797	int page_no;
 798	for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
 799		dma_addr_t addr = frmr->page_list->page_list[page_no];
 800		if (ib_dma_mapping_error(frmr->mr->device, addr))
 801			continue;
 802		atomic_dec(&xprt->sc_dma_used);
 803		ib_dma_unmap_page(frmr->mr->device, addr, PAGE_SIZE,
 804				  frmr->direction);
 805	}
 806}
 807
 808void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
 809		       struct svc_rdma_fastreg_mr *frmr)
 810{
 811	if (frmr) {
 812		frmr_unmap_dma(rdma, frmr);
 813		spin_lock_bh(&rdma->sc_frmr_q_lock);
 814		BUG_ON(!list_empty(&frmr->frmr_list));
 815		list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
 816		spin_unlock_bh(&rdma->sc_frmr_q_lock);
 817	}
 818}
 819
 820/*
 821 * This is the xpo_recvfrom function for listening endpoints. Its
 822 * purpose is to accept incoming connections. The CMA callback handler
 823 * has already created a new transport and attached it to the new CMA
 824 * ID.
 825 *
 826 * There is a queue of pending connections hung on the listening
 827 * transport. This queue contains the new svc_xprt structure. This
 828 * function takes svc_xprt structures off the accept_q and completes
 829 * the connection.
 830 */
 831static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 832{
 833	struct svcxprt_rdma *listen_rdma;
 834	struct svcxprt_rdma *newxprt = NULL;
 835	struct rdma_conn_param conn_param;
 836	struct ib_qp_init_attr qp_attr;
 837	struct ib_device_attr devattr;
 838	int uninitialized_var(dma_mr_acc);
 839	int need_dma_mr;
 840	int ret;
 841	int i;
 842
 843	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
 844	clear_bit(XPT_CONN, &xprt->xpt_flags);
 845	/* Get the next entry off the accept list */
 846	spin_lock_bh(&listen_rdma->sc_lock);
 847	if (!list_empty(&listen_rdma->sc_accept_q)) {
 848		newxprt = list_entry(listen_rdma->sc_accept_q.next,
 849				     struct svcxprt_rdma, sc_accept_q);
 850		list_del_init(&newxprt->sc_accept_q);
 851	}
 852	if (!list_empty(&listen_rdma->sc_accept_q))
 853		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
 854	spin_unlock_bh(&listen_rdma->sc_lock);
 855	if (!newxprt)
 856		return NULL;
 857
 858	dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
 859		newxprt, newxprt->sc_cm_id);
 860
 861	ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
 862	if (ret) {
 863		dprintk("svcrdma: could not query device attributes on "
 864			"device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
 865		goto errout;
 866	}
 867
 868	/* Qualify the transport resource defaults with the
 869	 * capabilities of this particular device */
 870	newxprt->sc_max_sge = min((size_t)devattr.max_sge,
 871				  (size_t)RPCSVC_MAXPAGES);
 872	newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
 873				   (size_t)svcrdma_max_requests);
 874	newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
 875
 876	/*
 877	 * Limit ORD based on client limit, local device limit, and
 878	 * configured svcrdma limit.
 879	 */
 880	newxprt->sc_ord = min_t(size_t, devattr.max_qp_rd_atom, newxprt->sc_ord);
 881	newxprt->sc_ord = min_t(size_t,	svcrdma_ord, newxprt->sc_ord);
 882
 883	newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
 884	if (IS_ERR(newxprt->sc_pd)) {
 885		dprintk("svcrdma: error creating PD for connect request\n");
 886		goto errout;
 887	}
 888	newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
 889					 sq_comp_handler,
 890					 cq_event_handler,
 891					 newxprt,
 892					 newxprt->sc_sq_depth,
 893					 0);
 894	if (IS_ERR(newxprt->sc_sq_cq)) {
 895		dprintk("svcrdma: error creating SQ CQ for connect request\n");
 896		goto errout;
 897	}
 898	newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
 899					 rq_comp_handler,
 900					 cq_event_handler,
 901					 newxprt,
 902					 newxprt->sc_max_requests,
 903					 0);
 904	if (IS_ERR(newxprt->sc_rq_cq)) {
 905		dprintk("svcrdma: error creating RQ CQ for connect request\n");
 906		goto errout;
 907	}
 908
 909	memset(&qp_attr, 0, sizeof qp_attr);
 910	qp_attr.event_handler = qp_event_handler;
 911	qp_attr.qp_context = &newxprt->sc_xprt;
 912	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
 913	qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
 914	qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
 915	qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
 916	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 917	qp_attr.qp_type = IB_QPT_RC;
 918	qp_attr.send_cq = newxprt->sc_sq_cq;
 919	qp_attr.recv_cq = newxprt->sc_rq_cq;
 920	dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
 921		"    cm_id->device=%p, sc_pd->device=%p\n"
 922		"    cap.max_send_wr = %d\n"
 923		"    cap.max_recv_wr = %d\n"
 924		"    cap.max_send_sge = %d\n"
 925		"    cap.max_recv_sge = %d\n",
 926		newxprt->sc_cm_id, newxprt->sc_pd,
 927		newxprt->sc_cm_id->device, newxprt->sc_pd->device,
 928		qp_attr.cap.max_send_wr,
 929		qp_attr.cap.max_recv_wr,
 930		qp_attr.cap.max_send_sge,
 931		qp_attr.cap.max_recv_sge);
 932
 933	ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
 934	if (ret) {
 935		/*
 936		 * XXX: This is a hack. We need a xx_request_qp interface
 937		 * that will adjust the qp_attr's with a best-effort
 938		 * number
 939		 */
 940		qp_attr.cap.max_send_sge -= 2;
 941		qp_attr.cap.max_recv_sge -= 2;
 942		ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd,
 943				     &qp_attr);
 944		if (ret) {
 945			dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
 946			goto errout;
 947		}
 948		newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
 949		newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
 950		newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
 951		newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
 952	}
 953	newxprt->sc_qp = newxprt->sc_cm_id->qp;
 954
 955	/*
 956	 * Use the most secure set of MR resources based on the
 957	 * transport type and available memory management features in
 958	 * the device. Here's the table implemented below:
 959	 *
 960	 *		Fast	Global	DMA	Remote WR
 961	 *		Reg	LKEY	MR	Access
 962	 *		Sup'd	Sup'd	Needed	Needed
 963	 *
 964	 * IWARP	N	N	Y	Y
 965	 *		N	Y	Y	Y
 966	 *		Y	N	Y	N
 967	 *		Y	Y	N	-
 968	 *
 969	 * IB		N	N	Y	N
 970	 *		N	Y	N	-
 971	 *		Y	N	Y	N
 972	 *		Y	Y	N	-
 973	 *
 974	 * NB:	iWARP requires remote write access for the data sink
 975	 *	of an RDMA_READ. IB does not.
 976	 */
 977	if (devattr.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
 978		newxprt->sc_frmr_pg_list_len =
 979			devattr.max_fast_reg_page_list_len;
 980		newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
 981	}
 982
 983	/*
 984	 * Determine if a DMA MR is required and if so, what privs are required
 985	 */
 986	switch (rdma_node_get_transport(newxprt->sc_cm_id->device->node_type)) {
 987	case RDMA_TRANSPORT_IWARP:
 988		newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
 989		if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {
 990			need_dma_mr = 1;
 991			dma_mr_acc =
 992				(IB_ACCESS_LOCAL_WRITE |
 993				 IB_ACCESS_REMOTE_WRITE);
 994		} else if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
 995			need_dma_mr = 1;
 996			dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
 997		} else
 998			need_dma_mr = 0;
 999		break;
1000	case RDMA_TRANSPORT_IB:
1001		if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
1002			need_dma_mr = 1;
1003			dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
1004		} else
1005			need_dma_mr = 0;
1006		break;
1007	default:
1008		goto errout;
1009	}
1010
1011	/* Create the DMA MR if needed, otherwise, use the DMA LKEY */
1012	if (need_dma_mr) {
1013		/* Register all of physical memory */
1014		newxprt->sc_phys_mr =
1015			ib_get_dma_mr(newxprt->sc_pd, dma_mr_acc);
1016		if (IS_ERR(newxprt->sc_phys_mr)) {
1017			dprintk("svcrdma: Failed to create DMA MR ret=%d\n",
1018				ret);
1019			goto errout;
1020		}
1021		newxprt->sc_dma_lkey = newxprt->sc_phys_mr->lkey;
1022	} else
1023		newxprt->sc_dma_lkey =
1024			newxprt->sc_cm_id->device->local_dma_lkey;
1025
1026	/* Post receive buffers */
1027	for (i = 0; i < newxprt->sc_max_requests; i++) {
1028		ret = svc_rdma_post_recv(newxprt);
1029		if (ret) {
1030			dprintk("svcrdma: failure posting receive buffers\n");
1031			goto errout;
1032		}
1033	}
1034
1035	/* Swap out the handler */
1036	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
1037
1038	/*
1039	 * Arm the CQs for the SQ and RQ before accepting so we can't
1040	 * miss the first message
1041	 */
1042	ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
1043	ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
1044
1045	/* Accept Connection */
1046	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
1047	memset(&conn_param, 0, sizeof conn_param);
1048	conn_param.responder_resources = 0;
1049	conn_param.initiator_depth = newxprt->sc_ord;
1050	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
1051	if (ret) {
1052		dprintk("svcrdma: failed to accept new connection, ret=%d\n",
1053		       ret);
1054		goto errout;
1055	}
1056
1057	dprintk("svcrdma: new connection %p accepted with the following "
1058		"attributes:\n"
1059		"    local_ip        : %pI4\n"
1060		"    local_port	     : %d\n"
1061		"    remote_ip       : %pI4\n"
1062		"    remote_port     : %d\n"
1063		"    max_sge         : %d\n"
1064		"    sq_depth        : %d\n"
1065		"    max_requests    : %d\n"
1066		"    ord             : %d\n",
1067		newxprt,
1068		&((struct sockaddr_in *)&newxprt->sc_cm_id->
1069			 route.addr.src_addr)->sin_addr.s_addr,
1070		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
1071		       route.addr.src_addr)->sin_port),
1072		&((struct sockaddr_in *)&newxprt->sc_cm_id->
1073			 route.addr.dst_addr)->sin_addr.s_addr,
1074		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
1075		       route.addr.dst_addr)->sin_port),
1076		newxprt->sc_max_sge,
1077		newxprt->sc_sq_depth,
1078		newxprt->sc_max_requests,
1079		newxprt->sc_ord);
1080
1081	return &newxprt->sc_xprt;
1082
1083 errout:
1084	dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
1085	/* Take a reference in case the DTO handler runs */
1086	svc_xprt_get(&newxprt->sc_xprt);
1087	if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
1088		ib_destroy_qp(newxprt->sc_qp);
1089	rdma_destroy_id(newxprt->sc_cm_id);
1090	/* This call to put will destroy the transport */
1091	svc_xprt_put(&newxprt->sc_xprt);
1092	return NULL;
1093}
1094
1095static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
1096{
1097}
1098
1099/*
1100 * When connected, an svc_xprt has at least two references:
1101 *
1102 * - A reference held by the cm_id between the ESTABLISHED and
1103 *   DISCONNECTED events. If the remote peer disconnected first, this
1104 *   reference could be gone.
1105 *
1106 * - A reference held by the svc_recv code that called this function
1107 *   as part of close processing.
1108 *
1109 * At a minimum one references should still be held.
1110 */
1111static void svc_rdma_detach(struct svc_xprt *xprt)
1112{
1113	struct svcxprt_rdma *rdma =
1114		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1115	dprintk("svc: svc_rdma_detach(%p)\n", xprt);
1116
1117	/* Disconnect and flush posted WQE */
1118	rdma_disconnect(rdma->sc_cm_id);
1119}
1120
1121static void __svc_rdma_free(struct work_struct *work)
1122{
1123	struct svcxprt_rdma *rdma =
1124		container_of(work, struct svcxprt_rdma, sc_work);
1125	dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
1126
1127	/* We should only be called from kref_put */
1128	BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
1129
1130	/*
1131	 * Destroy queued, but not processed read completions. Note
1132	 * that this cleanup has to be done before destroying the
1133	 * cm_id because the device ptr is needed to unmap the dma in
1134	 * svc_rdma_put_context.
1135	 */
1136	while (!list_empty(&rdma->sc_read_complete_q)) {
1137		struct svc_rdma_op_ctxt *ctxt;
1138		ctxt = list_entry(rdma->sc_read_complete_q.next,
1139				  struct svc_rdma_op_ctxt,
1140				  dto_q);
1141		list_del_init(&ctxt->dto_q);
1142		svc_rdma_put_context(ctxt, 1);
1143	}
1144
1145	/* Destroy queued, but not processed recv completions */
1146	while (!list_empty(&rdma->sc_rq_dto_q)) {
1147		struct svc_rdma_op_ctxt *ctxt;
1148		ctxt = list_entry(rdma->sc_rq_dto_q.next,
1149				  struct svc_rdma_op_ctxt,
1150				  dto_q);
1151		list_del_init(&ctxt->dto_q);
1152		svc_rdma_put_context(ctxt, 1);
1153	}
1154
1155	/* Warn if we leaked a resource or under-referenced */
1156	WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
1157	WARN_ON(atomic_read(&rdma->sc_dma_used) != 0);
1158
1159	/* De-allocate fastreg mr */
1160	rdma_dealloc_frmr_q(rdma);
1161
1162	/* Destroy the QP if present (not a listener) */
1163	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
1164		ib_destroy_qp(rdma->sc_qp);
1165
1166	if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
1167		ib_destroy_cq(rdma->sc_sq_cq);
1168
1169	if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
1170		ib_destroy_cq(rdma->sc_rq_cq);
1171
1172	if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr))
1173		ib_dereg_mr(rdma->sc_phys_mr);
1174
1175	if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
1176		ib_dealloc_pd(rdma->sc_pd);
1177
1178	/* Destroy the CM ID */
1179	rdma_destroy_id(rdma->sc_cm_id);
1180
1181	kfree(rdma);
1182}
1183
1184static void svc_rdma_free(struct svc_xprt *xprt)
1185{
1186	struct svcxprt_rdma *rdma =
1187		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1188	INIT_WORK(&rdma->sc_work, __svc_rdma_free);
1189	queue_work(svc_rdma_wq, &rdma->sc_work);
1190}
1191
1192static int svc_rdma_has_wspace(struct svc_xprt *xprt)
1193{
1194	struct svcxprt_rdma *rdma =
1195		container_of(xprt, struct svcxprt_rdma, sc_xprt);
1196
1197	/*
1198	 * If there are fewer SQ WR available than required to send a
1199	 * simple response, return false.
1200	 */
1201	if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
1202		return 0;
1203
1204	/*
1205	 * ...or there are already waiters on the SQ,
1206	 * return false.
1207	 */
1208	if (waitqueue_active(&rdma->sc_send_wait))
1209		return 0;
1210
1211	/* Otherwise return true. */
1212	return 1;
1213}
1214
1215/*
1216 * Attempt to register the kvec representing the RPC memory with the
1217 * device.
1218 *
1219 * Returns:
1220 *  NULL : The device does not support fastreg or there were no more
1221 *         fastreg mr.
1222 *  frmr : The kvec register request was successfully posted.
1223 *    <0 : An error was encountered attempting to register the kvec.
1224 */
1225int svc_rdma_fastreg(struct svcxprt_rdma *xprt,
1226		     struct svc_rdma_fastreg_mr *frmr)
1227{
1228	struct ib_send_wr fastreg_wr;
1229	u8 key;
1230
1231	/* Bump the key */
1232	key = (u8)(frmr->mr->lkey & 0x000000FF);
1233	ib_update_fast_reg_key(frmr->mr, ++key);
1234
1235	/* Prepare FASTREG WR */
1236	memset(&fastreg_wr, 0, sizeof fastreg_wr);
1237	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1238	fastreg_wr.send_flags = IB_SEND_SIGNALED;
1239	fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
1240	fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
1241	fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
1242	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1243	fastreg_wr.wr.fast_reg.length = frmr->map_len;
1244	fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
1245	fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
1246	return svc_rdma_send(xprt, &fastreg_wr);
1247}
1248
1249int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1250{
1251	struct ib_send_wr *bad_wr, *n_wr;
1252	int wr_count;
1253	int i;
1254	int ret;
1255
1256	if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1257		return -ENOTCONN;
1258
1259	BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
1260	wr_count = 1;
1261	for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
1262		wr_count++;
1263
1264	/* If the SQ is full, wait until an SQ entry is available */
1265	while (1) {
1266		spin_lock_bh(&xprt->sc_lock);
1267		if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) {
1268			spin_unlock_bh(&xprt->sc_lock);
1269			atomic_inc(&rdma_stat_sq_starve);
1270
1271			/* See if we can opportunistically reap SQ WR to make room */
1272			sq_cq_reap(xprt);
1273
1274			/* Wait until SQ WR available if SQ still full */
1275			wait_event(xprt->sc_send_wait,
1276				   atomic_read(&xprt->sc_sq_count) <
1277				   xprt->sc_sq_depth);
1278			if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1279				return -ENOTCONN;
1280			continue;
1281		}
1282		/* Take a transport ref for each WR posted */
1283		for (i = 0; i < wr_count; i++)
1284			svc_xprt_get(&xprt->sc_xprt);
1285
1286		/* Bump used SQ WR count and post */
1287		atomic_add(wr_count, &xprt->sc_sq_count);
1288		ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
1289		if (ret) {
1290			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
1291			atomic_sub(wr_count, &xprt->sc_sq_count);
1292			for (i = 0; i < wr_count; i ++)
1293				svc_xprt_put(&xprt->sc_xprt);
1294			dprintk("svcrdma: failed to post SQ WR rc=%d, "
1295			       "sc_sq_count=%d, sc_sq_depth=%d\n",
1296			       ret, atomic_read(&xprt->sc_sq_count),
1297			       xprt->sc_sq_depth);
1298		}
1299		spin_unlock_bh(&xprt->sc_lock);
1300		if (ret)
1301			wake_up(&xprt->sc_send_wait);
1302		break;
1303	}
1304	return ret;
1305}
1306
1307void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1308			 enum rpcrdma_errcode err)
1309{
1310	struct ib_send_wr err_wr;
1311	struct page *p;
1312	struct svc_rdma_op_ctxt *ctxt;
1313	u32 *va;
1314	int length;
1315	int ret;
1316
1317	p = svc_rdma_get_page();
1318	va = page_address(p);
1319
1320	/* XDR encode error */
1321	length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
1322
1323	ctxt = svc_rdma_get_context(xprt);
1324	ctxt->direction = DMA_FROM_DEVICE;
1325	ctxt->count = 1;
1326	ctxt->pages[0] = p;
1327
1328	/* Prepare SGE for local address */
1329	ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
1330					    p, 0, length, DMA_FROM_DEVICE);
1331	if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
1332		put_page(p);
1333		svc_rdma_put_context(ctxt, 1);
1334		return;
1335	}
1336	atomic_inc(&xprt->sc_dma_used);
1337	ctxt->sge[0].lkey = xprt->sc_dma_lkey;
1338	ctxt->sge[0].length = length;
1339
1340	/* Prepare SEND WR */
1341	memset(&err_wr, 0, sizeof err_wr);
1342	ctxt->wr_op = IB_WR_SEND;
1343	err_wr.wr_id = (unsigned long)ctxt;
1344	err_wr.sg_list = ctxt->sge;
1345	err_wr.num_sge = 1;
1346	err_wr.opcode = IB_WR_SEND;
1347	err_wr.send_flags = IB_SEND_SIGNALED;
1348
1349	/* Post It */
1350	ret = svc_rdma_send(xprt, &err_wr);
1351	if (ret) {
1352		dprintk("svcrdma: Error %d posting send for protocol error\n",
1353			ret);
1354		svc_rdma_unmap_dma(ctxt);
1355		svc_rdma_put_context(ctxt, 1);
1356	}
1357}