Linux Audio

Check our new training course

Loading...
v4.6
 
   1/*
 
   2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the BSD-type
   8 * license below:
   9 *
  10 * Redistribution and use in source and binary forms, with or without
  11 * modification, are permitted provided that the following conditions
  12 * are met:
  13 *
  14 *      Redistributions of source code must retain the above copyright
  15 *      notice, this list of conditions and the following disclaimer.
  16 *
  17 *      Redistributions in binary form must reproduce the above
  18 *      copyright notice, this list of conditions and the following
  19 *      disclaimer in the documentation and/or other materials provided
  20 *      with the distribution.
  21 *
  22 *      Neither the name of the Network Appliance, Inc. nor the names of
  23 *      its contributors may be used to endorse or promote products
  24 *      derived from this software without specific prior written
  25 *      permission.
  26 *
  27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38 */
  39
  40/*
  41 * verbs.c
  42 *
  43 * Encapsulates the major functions managing:
  44 *  o adapters
  45 *  o endpoints
  46 *  o connections
  47 *  o buffer memory
  48 */
  49
  50#include <linux/interrupt.h>
  51#include <linux/slab.h>
  52#include <linux/prefetch.h>
  53#include <linux/sunrpc/addr.h>
 
 
 
 
  54#include <asm/bitops.h>
  55#include <linux/module.h> /* try_module_get()/module_put() */
 
  56
  57#include "xprt_rdma.h"
 
  58
  59/*
  60 * Globals/Macros
  61 */
  62
  63#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  64# define RPCDBG_FACILITY	RPCDBG_TRANS
  65#endif
  66
  67/*
  68 * internal functions
  69 */
  70
  71static struct workqueue_struct *rpcrdma_receive_wq;
  72
  73int
  74rpcrdma_alloc_wq(void)
 
 
 
 
 
 
 
 
 
 
 
 
  75{
  76	struct workqueue_struct *recv_wq;
  77
  78	recv_wq = alloc_workqueue("xprtrdma_receive",
  79				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
  80				  0);
  81	if (!recv_wq)
  82		return -ENOMEM;
  83
  84	rpcrdma_receive_wq = recv_wq;
  85	return 0;
  86}
  87
  88void
  89rpcrdma_destroy_wq(void)
  90{
  91	struct workqueue_struct *wq;
  92
  93	if (rpcrdma_receive_wq) {
  94		wq = rpcrdma_receive_wq;
  95		rpcrdma_receive_wq = NULL;
  96		destroy_workqueue(wq);
  97	}
  98}
  99
 
 
 
 
 
 
 
 
 100static void
 101rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
 102{
 103	struct rpcrdma_ep *ep = context;
 
 
 104
 105	pr_err("RPC:       %s: %s on device %s ep %p\n",
 106	       __func__, ib_event_msg(event->event),
 107		event->device->name, context);
 108	if (ep->rep_connected == 1) {
 109		ep->rep_connected = -EIO;
 110		rpcrdma_conn_func(ep);
 111		wake_up_all(&ep->rep_connect_wait);
 112	}
 113}
 114
 115/**
 116 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 117 * @cq:	completion queue (ignored)
 118 * @wc:	completed WR
 119 *
 120 */
 121static void
 122rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 123{
 124	/* WARNING: Only wr_cqe and status are reliable at this point */
 125	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
 126		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
 127		       ib_wc_status_msg(wc->status),
 128		       wc->status, wc->vendor_err);
 129}
 130
 131static void
 132rpcrdma_receive_worker(struct work_struct *work)
 133{
 134	struct rpcrdma_rep *rep =
 135			container_of(work, struct rpcrdma_rep, rr_work);
 136
 137	rpcrdma_reply_handler(rep);
 138}
 139
 140/* Perform basic sanity checking to avoid using garbage
 141 * to update the credit grant value.
 142 */
 143static void
 144rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
 145{
 146	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
 147	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
 148	u32 credits;
 149
 150	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
 151		return;
 152
 153	credits = be32_to_cpu(rmsgp->rm_credit);
 154	if (credits == 0)
 155		credits = 1;	/* don't deadlock */
 156	else if (credits > buffer->rb_max_requests)
 157		credits = buffer->rb_max_requests;
 158
 159	atomic_set(&buffer->rb_credits, credits);
 
 
 160}
 161
 162/**
 163 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 164 * @cq:	completion queue (ignored)
 165 * @wc:	completed WR
 166 *
 167 */
 168static void
 169rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
 170{
 171	struct ib_cqe *cqe = wc->wr_cqe;
 172	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 173					       rr_cqe);
 
 174
 175	/* WARNING: Only wr_id and status are reliable at this point */
 
 
 176	if (wc->status != IB_WC_SUCCESS)
 177		goto out_fail;
 178
 179	/* status == SUCCESS means all fields in wc are trustworthy */
 180	if (wc->opcode != IB_WC_RECV)
 181		return;
 182
 183	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
 184		__func__, rep, wc->byte_len);
 185
 186	rep->rr_len = wc->byte_len;
 187	ib_dma_sync_single_for_cpu(rep->rr_device,
 188				   rdmab_addr(rep->rr_rdmabuf),
 189				   rep->rr_len, DMA_FROM_DEVICE);
 190
 191	rpcrdma_update_granted_credits(rep);
 192
 193out_schedule:
 194	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 195	return;
 196
 197out_fail:
 198	if (wc->status != IB_WC_WR_FLUSH_ERR)
 199		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
 200		       ib_wc_status_msg(wc->status),
 201		       wc->status, wc->vendor_err);
 202	rep->rr_len = RPCRDMA_BAD_LEN;
 203	goto out_schedule;
 204}
 205
 206static void
 207rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 
 208{
 209	struct ib_wc wc;
 
 210
 211	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
 212		rpcrdma_receive_wc(NULL, &wc);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 213}
 214
 
 
 
 
 
 
 
 
 215static int
 216rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 217{
 218	struct rpcrdma_xprt *xprt = id->context;
 219	struct rpcrdma_ia *ia = &xprt->rx_ia;
 220	struct rpcrdma_ep *ep = &xprt->rx_ep;
 221#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 222	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
 223#endif
 224	struct ib_qp_attr *attr = &ia->ri_qp_attr;
 225	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
 226	int connstate = 0;
 227
 
 228	switch (event->event) {
 229	case RDMA_CM_EVENT_ADDR_RESOLVED:
 230	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 231		ia->ri_async_rc = 0;
 232		complete(&ia->ri_done);
 233		break;
 234	case RDMA_CM_EVENT_ADDR_ERROR:
 235		ia->ri_async_rc = -EHOSTUNREACH;
 236		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
 237			__func__, ep);
 238		complete(&ia->ri_done);
 239		break;
 240	case RDMA_CM_EVENT_ROUTE_ERROR:
 241		ia->ri_async_rc = -ENETUNREACH;
 242		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
 243			__func__, ep);
 244		complete(&ia->ri_done);
 245		break;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 246	case RDMA_CM_EVENT_ESTABLISHED:
 247		connstate = 1;
 248		ib_query_qp(ia->ri_id->qp, attr,
 249			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
 250			    iattr);
 251		dprintk("RPC:       %s: %d responder resources"
 252			" (%d initiator)\n",
 253			__func__, attr->max_dest_rd_atomic,
 254			attr->max_rd_atomic);
 255		goto connected;
 256	case RDMA_CM_EVENT_CONNECT_ERROR:
 257		connstate = -ENOTCONN;
 258		goto connected;
 259	case RDMA_CM_EVENT_UNREACHABLE:
 260		connstate = -ENETDOWN;
 261		goto connected;
 262	case RDMA_CM_EVENT_REJECTED:
 263		connstate = -ECONNREFUSED;
 264		goto connected;
 
 
 
 
 
 265	case RDMA_CM_EVENT_DISCONNECTED:
 266		connstate = -ECONNABORTED;
 267		goto connected;
 268	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 269		connstate = -ENODEV;
 270connected:
 271		dprintk("RPC:       %s: %sconnected\n",
 272					__func__, connstate > 0 ? "" : "dis");
 273		atomic_set(&xprt->rx_buf.rb_credits, 1);
 274		ep->rep_connected = connstate;
 275		rpcrdma_conn_func(ep);
 276		wake_up_all(&ep->rep_connect_wait);
 277		/*FALLTHROUGH*/
 278	default:
 279		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
 280			__func__, sap, rpc_get_port(sap), ep,
 281			rdma_event_msg(event->event));
 282		break;
 283	}
 284
 285#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 286	if (connstate == 1) {
 287		int ird = attr->max_dest_rd_atomic;
 288		int tird = ep->rep_remote_cma.responder_resources;
 289
 290		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
 291			sap, rpc_get_port(sap),
 292			ia->ri_device->name,
 293			ia->ri_ops->ro_displayname,
 294			xprt->rx_buf.rb_max_requests,
 295			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
 296	} else if (connstate < 0) {
 297		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
 298			sap, rpc_get_port(sap), connstate);
 299	}
 300#endif
 301
 302	return 0;
 303}
 304
 305static void rpcrdma_destroy_id(struct rdma_cm_id *id)
 306{
 307	if (id) {
 308		module_put(id->device->owner);
 309		rdma_destroy_id(id);
 310	}
 311}
 312
 313static struct rdma_cm_id *
 314rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 315			struct rpcrdma_ia *ia, struct sockaddr *addr)
 316{
 
 317	struct rdma_cm_id *id;
 318	int rc;
 319
 
 
 320	init_completion(&ia->ri_done);
 
 321
 322	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
 323			    IB_QPT_RC);
 324	if (IS_ERR(id)) {
 325		rc = PTR_ERR(id);
 326		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
 327			__func__, rc);
 328		return id;
 329	}
 330
 331	ia->ri_async_rc = -ETIMEDOUT;
 332	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
 333	if (rc) {
 334		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
 335			__func__, rc);
 
 
 
 
 336		goto out;
 337	}
 338	wait_for_completion_interruptible_timeout(&ia->ri_done,
 339				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 340
 341	/* FIXME:
 342	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
 343	 * be pinned while there are active NFS/RDMA mounts to prevent
 344	 * hangs and crashes at umount time.
 345	 */
 346	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
 347		dprintk("RPC:       %s: Failed to get device module\n",
 348			__func__);
 349		ia->ri_async_rc = -ENODEV;
 350	}
 351	rc = ia->ri_async_rc;
 352	if (rc)
 353		goto out;
 354
 355	ia->ri_async_rc = -ETIMEDOUT;
 356	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 357	if (rc) {
 358		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
 359			__func__, rc);
 360		goto put;
 
 
 361	}
 362	wait_for_completion_interruptible_timeout(&ia->ri_done,
 363				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 364	rc = ia->ri_async_rc;
 365	if (rc)
 366		goto put;
 367
 368	return id;
 369put:
 370	module_put(id->device->owner);
 371out:
 372	rdma_destroy_id(id);
 373	return ERR_PTR(rc);
 374}
 375
 376/*
 377 * Drain any cq, prior to teardown.
 378 */
 379static void
 380rpcrdma_clean_cq(struct ib_cq *cq)
 381{
 382	struct ib_wc wc;
 383	int count = 0;
 384
 385	while (1 == ib_poll_cq(cq, 1, &wc))
 386		++count;
 387
 388	if (count)
 389		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
 390			__func__, count, wc.opcode);
 391}
 392
 393/*
 394 * Exported functions.
 395 */
 396
 397/*
 398 * Open and initialize an Interface Adapter.
 399 *  o initializes fields of struct rpcrdma_ia, including
 400 *    interface and provider attributes and protection zone.
 
 
 401 */
 402int
 403rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 404{
 405	struct rpcrdma_ia *ia = &xprt->rx_ia;
 406	int rc;
 407
 408	ia->ri_dma_mr = NULL;
 409
 410	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
 411	if (IS_ERR(ia->ri_id)) {
 412		rc = PTR_ERR(ia->ri_id);
 413		goto out1;
 414	}
 415	ia->ri_device = ia->ri_id->device;
 416
 417	ia->ri_pd = ib_alloc_pd(ia->ri_device);
 418	if (IS_ERR(ia->ri_pd)) {
 419		rc = PTR_ERR(ia->ri_pd);
 420		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
 421			__func__, rc);
 422		goto out2;
 423	}
 424
 425	if (memreg == RPCRDMA_FRMR) {
 426		if (!(ia->ri_device->attrs.device_cap_flags &
 427				IB_DEVICE_MEM_MGT_EXTENSIONS) ||
 428		    (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
 429			dprintk("RPC:       %s: FRMR registration "
 430				"not supported by HCA\n", __func__);
 431			memreg = RPCRDMA_MTHCAFMR;
 432		}
 433	}
 434	if (memreg == RPCRDMA_MTHCAFMR) {
 435		if (!ia->ri_device->alloc_fmr) {
 436			dprintk("RPC:       %s: MTHCAFMR registration "
 437				"not supported by HCA\n", __func__);
 438			rc = -EINVAL;
 439			goto out3;
 440		}
 441	}
 442
 443	switch (memreg) {
 444	case RPCRDMA_FRMR:
 445		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
 446		break;
 447	case RPCRDMA_ALLPHYSICAL:
 448		ia->ri_ops = &rpcrdma_physical_memreg_ops;
 449		break;
 450	case RPCRDMA_MTHCAFMR:
 451		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
 452		break;
 453	default:
 454		printk(KERN_ERR "RPC: Unsupported memory "
 455				"registration mode: %d\n", memreg);
 456		rc = -ENOMEM;
 457		goto out3;
 458	}
 459	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
 460		__func__, ia->ri_ops->ro_displayname);
 461
 462	rwlock_init(&ia->ri_qplock);
 463	return 0;
 464
 465out3:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 466	ib_dealloc_pd(ia->ri_pd);
 467	ia->ri_pd = NULL;
 468out2:
 469	rpcrdma_destroy_id(ia->ri_id);
 470	ia->ri_id = NULL;
 471out1:
 472	return rc;
 473}
 474
 475/*
 476 * Clean up/close an IA.
 477 *   o if event handles and PD have been initialized, free them.
 478 *   o close the IA
 479 */
 480void
 481rpcrdma_ia_close(struct rpcrdma_ia *ia)
 482{
 483	dprintk("RPC:       %s: entering\n", __func__);
 484	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 485		if (ia->ri_id->qp)
 486			rdma_destroy_qp(ia->ri_id);
 487		rpcrdma_destroy_id(ia->ri_id);
 488		ia->ri_id = NULL;
 489	}
 
 490
 491	/* If the pd is still busy, xprtrdma missed freeing a resource */
 492	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
 493		ib_dealloc_pd(ia->ri_pd);
 
 494}
 495
 496/*
 497 * Create unconnected endpoint.
 
 
 
 498 */
 499int
 500rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 501				struct rpcrdma_create_data_internal *cdata)
 502{
 
 
 
 503	struct ib_cq *sendcq, *recvcq;
 504	unsigned int max_qp_wr;
 505	int rc;
 506
 507	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
 508		dprintk("RPC:       %s: insufficient sge's available\n",
 509			__func__);
 
 
 
 
 
 510		return -ENOMEM;
 511	}
 
 512
 513	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
 514		dprintk("RPC:       %s: insufficient wqe's available\n",
 515			__func__);
 516		return -ENOMEM;
 517	}
 518	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS;
 519
 520	/* check provider's send/recv wr limits */
 521	if (cdata->max_requests > max_qp_wr)
 522		cdata->max_requests = max_qp_wr;
 523
 524	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 525	ep->rep_attr.qp_context = ep;
 526	ep->rep_attr.srq = NULL;
 527	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 528	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
 529	rc = ia->ri_ops->ro_open(ia, ep, cdata);
 530	if (rc)
 531		return rc;
 532	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
 533	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 534	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
 535	ep->rep_attr.cap.max_recv_sge = 1;
 536	ep->rep_attr.cap.max_inline_data = 0;
 537	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 538	ep->rep_attr.qp_type = IB_QPT_RC;
 539	ep->rep_attr.port_num = ~0;
 540
 541	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 542		"iovs: send %d recv %d\n",
 543		__func__,
 544		ep->rep_attr.cap.max_send_wr,
 545		ep->rep_attr.cap.max_recv_wr,
 546		ep->rep_attr.cap.max_send_sge,
 547		ep->rep_attr.cap.max_recv_sge);
 548
 549	/* set trigger for requesting send completion */
 550	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
 551	if (ep->rep_cqinit <= 2)
 552		ep->rep_cqinit = 0;	/* always signal? */
 553	INIT_CQCOUNT(ep);
 554	init_waitqueue_head(&ep->rep_connect_wait);
 555	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 556
 557	sendcq = ib_alloc_cq(ia->ri_device, NULL,
 558			     ep->rep_attr.cap.max_send_wr + 1,
 559			     0, IB_POLL_SOFTIRQ);
 560	if (IS_ERR(sendcq)) {
 561		rc = PTR_ERR(sendcq);
 562		dprintk("RPC:       %s: failed to create send CQ: %i\n",
 563			__func__, rc);
 564		goto out1;
 565	}
 566
 567	recvcq = ib_alloc_cq(ia->ri_device, NULL,
 568			     ep->rep_attr.cap.max_recv_wr + 1,
 569			     0, IB_POLL_SOFTIRQ);
 570	if (IS_ERR(recvcq)) {
 571		rc = PTR_ERR(recvcq);
 572		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
 573			__func__, rc);
 574		goto out2;
 575	}
 576
 577	ep->rep_attr.send_cq = sendcq;
 578	ep->rep_attr.recv_cq = recvcq;
 579
 580	/* Initialize cma parameters */
 
 581
 582	/* RPC/RDMA does not use private data */
 583	ep->rep_remote_cma.private_data = NULL;
 584	ep->rep_remote_cma.private_data_len = 0;
 
 
 
 
 
 585
 586	/* Client offers RDMA Read but does not initiate */
 587	ep->rep_remote_cma.initiator_depth = 0;
 588	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
 589		ep->rep_remote_cma.responder_resources = 32;
 590	else
 591		ep->rep_remote_cma.responder_resources =
 592						ia->ri_device->attrs.max_qp_rd_atom;
 
 
 
 593
 594	ep->rep_remote_cma.retry_count = 7;
 
 
 
 595	ep->rep_remote_cma.flow_control = 0;
 596	ep->rep_remote_cma.rnr_retry_count = 0;
 597
 598	return 0;
 599
 600out2:
 601	ib_free_cq(sendcq);
 602out1:
 603	if (ia->ri_dma_mr)
 604		ib_dereg_mr(ia->ri_dma_mr);
 605	return rc;
 606}
 607
 608/*
 609 * rpcrdma_ep_destroy
 
 610 *
 611 * Disconnect and destroy endpoint. After this, the only
 612 * valid operations on the ep are to free it (if dynamically
 613 * allocated) or re-create it.
 614 */
 615void
 616rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 617{
 618	int rc;
 619
 620	dprintk("RPC:       %s: entering, connected is %d\n",
 621		__func__, ep->rep_connected);
 622
 623	cancel_delayed_work_sync(&ep->rep_connect_worker);
 624
 625	if (ia->ri_id->qp)
 626		rpcrdma_ep_disconnect(ep, ia);
 627
 628	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
 629	rpcrdma_clean_cq(ep->rep_attr.send_cq);
 630
 631	if (ia->ri_id->qp) {
 632		rdma_destroy_qp(ia->ri_id);
 633		ia->ri_id->qp = NULL;
 634	}
 635
 636	ib_free_cq(ep->rep_attr.recv_cq);
 637	ib_free_cq(ep->rep_attr.send_cq);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 638
 639	if (ia->ri_dma_mr) {
 640		rc = ib_dereg_mr(ia->ri_dma_mr);
 641		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
 642			__func__, rc);
 
 643	}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 644}
 645
 646/*
 647 * Connect unconnected endpoint.
 648 */
 649int
 650rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 651{
 652	struct rdma_cm_id *id, *old;
 653	int rc = 0;
 654	int retry_count = 0;
 
 
 655
 656	if (ep->rep_connected != 0) {
 657		struct rpcrdma_xprt *xprt;
 658retry:
 659		dprintk("RPC:       %s: reconnecting...\n", __func__);
 660
 661		rpcrdma_ep_disconnect(ep, ia);
 662		rpcrdma_flush_cqs(ep);
 663
 664		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 665		id = rpcrdma_create_id(xprt, ia,
 666				(struct sockaddr *)&xprt->rx_data.addr);
 667		if (IS_ERR(id)) {
 668			rc = -EHOSTUNREACH;
 669			goto out;
 670		}
 671		/* TEMP TEMP TEMP - fail if new device:
 672		 * Deregister/remarshal *all* requests!
 673		 * Close and recreate adapter, pd, etc!
 674		 * Re-determine all attributes still sane!
 675		 * More stuff I haven't thought of!
 676		 * Rrrgh!
 677		 */
 678		if (ia->ri_device != id->device) {
 679			printk("RPC:       %s: can't reconnect on "
 680				"different device!\n", __func__);
 681			rpcrdma_destroy_id(id);
 682			rc = -ENETUNREACH;
 683			goto out;
 684		}
 685		/* END TEMP */
 686		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
 687		if (rc) {
 688			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 689				__func__, rc);
 690			rpcrdma_destroy_id(id);
 691			rc = -ENETUNREACH;
 692			goto out;
 693		}
 694
 695		write_lock(&ia->ri_qplock);
 696		old = ia->ri_id;
 697		ia->ri_id = id;
 698		write_unlock(&ia->ri_qplock);
 699
 700		rdma_destroy_qp(old);
 701		rpcrdma_destroy_id(old);
 702	} else {
 703		dprintk("RPC:       %s: connecting...\n", __func__);
 704		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
 705		if (rc) {
 706			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 707				__func__, rc);
 708			/* do not update ep->rep_connected */
 709			return -ENETUNREACH;
 710		}
 
 
 
 
 
 
 
 
 
 
 711	}
 712
 713	ep->rep_connected = 0;
 
 
 
 714
 715	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 716	if (rc) {
 717		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
 718				__func__, rc);
 719		goto out;
 720	}
 721
 
 
 722	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 723
 724	/*
 725	 * Check state. A non-peer reject indicates no listener
 726	 * (ECONNREFUSED), which may be a transient state. All
 727	 * others indicate a transport condition which has already
 728	 * undergone a best-effort.
 729	 */
 730	if (ep->rep_connected == -ECONNREFUSED &&
 731	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
 732		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
 733		goto retry;
 734	}
 735	if (ep->rep_connected <= 0) {
 736		/* Sometimes, the only way to reliably connect to remote
 737		 * CMs is to use same nonzero values for ORD and IRD. */
 738		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
 739		    (ep->rep_remote_cma.responder_resources == 0 ||
 740		     ep->rep_remote_cma.initiator_depth !=
 741				ep->rep_remote_cma.responder_resources)) {
 742			if (ep->rep_remote_cma.responder_resources == 0)
 743				ep->rep_remote_cma.responder_resources = 1;
 744			ep->rep_remote_cma.initiator_depth =
 745				ep->rep_remote_cma.responder_resources;
 746			goto retry;
 747		}
 748		rc = ep->rep_connected;
 749	} else {
 750		struct rpcrdma_xprt *r_xprt;
 751		unsigned int extras;
 752
 753		dprintk("RPC:       %s: connected\n", __func__);
 754
 755		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 756		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
 757
 758		if (extras) {
 759			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
 760			if (rc) {
 761				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
 762					__func__, rc);
 763				rc = 0;
 764			}
 765		}
 766	}
 767
 
 
 768out:
 769	if (rc)
 770		ep->rep_connected = rc;
 
 
 771	return rc;
 772}
 773
 774/*
 775 * rpcrdma_ep_disconnect
 
 
 776 *
 777 * This is separate from destroy to facilitate the ability
 778 * to reconnect without recreating the endpoint.
 779 *
 780 * This call is not reentrant, and must not be made in parallel
 781 * on the same endpoint.
 782 */
 783void
 784rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 785{
 
 
 786	int rc;
 787
 788	rpcrdma_flush_cqs(ep);
 789	rc = rdma_disconnect(ia->ri_id);
 790	if (!rc) {
 791		/* returns without wait if not connected */
 792		wait_event_interruptible(ep->rep_connect_wait,
 793							ep->rep_connected != 1);
 794		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
 795			(ep->rep_connected == 1) ? "still " : "dis");
 796	} else {
 797		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
 798		ep->rep_connected = rc;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 799	}
 
 
 800}
 801
 802struct rpcrdma_req *
 803rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 804{
 805	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 
 806	struct rpcrdma_req *req;
 
 807
 808	req = kzalloc(sizeof(*req), GFP_KERNEL);
 809	if (req == NULL)
 810		return ERR_PTR(-ENOMEM);
 811
 812	INIT_LIST_HEAD(&req->rl_free);
 813	spin_lock(&buffer->rb_reqslock);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 814	list_add(&req->rl_all, &buffer->rb_allreqs);
 815	spin_unlock(&buffer->rb_reqslock);
 816	req->rl_cqe.done = rpcrdma_wc_send;
 817	req->rl_buffer = &r_xprt->rx_buf;
 818	return req;
 
 
 
 
 
 
 
 
 
 819}
 820
 821struct rpcrdma_rep *
 822rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 823{
 824	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 825	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 826	struct rpcrdma_rep *rep;
 827	int rc;
 828
 829	rc = -ENOMEM;
 830	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
 831	if (rep == NULL)
 832		goto out;
 833
 834	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
 835					       GFP_KERNEL);
 836	if (IS_ERR(rep->rr_rdmabuf)) {
 837		rc = PTR_ERR(rep->rr_rdmabuf);
 838		goto out_free;
 839	}
 840
 841	rep->rr_device = ia->ri_device;
 842	rep->rr_cqe.done = rpcrdma_receive_wc;
 
 843	rep->rr_rxprt = r_xprt;
 844	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
 
 
 
 
 845	return rep;
 846
 847out_free:
 848	kfree(rep);
 849out:
 850	return ERR_PTR(rc);
 851}
 852
 853int
 854rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 855{
 856	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 857	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 858	int i, rc;
 859
 860	buf->rb_max_requests = r_xprt->rx_data.max_requests;
 861	buf->rb_bc_srv_max_requests = 0;
 862	spin_lock_init(&buf->rb_lock);
 863	atomic_set(&buf->rb_credits, 1);
 
 
 864
 865	rc = ia->ri_ops->ro_init(r_xprt);
 866	if (rc)
 867		goto out;
 868
 869	INIT_LIST_HEAD(&buf->rb_send_bufs);
 870	INIT_LIST_HEAD(&buf->rb_allreqs);
 871	spin_lock_init(&buf->rb_reqslock);
 
 872	for (i = 0; i < buf->rb_max_requests; i++) {
 873		struct rpcrdma_req *req;
 874
 875		req = rpcrdma_create_req(r_xprt);
 876		if (IS_ERR(req)) {
 877			dprintk("RPC:       %s: request buffer %d alloc"
 878				" failed\n", __func__, i);
 879			rc = PTR_ERR(req);
 880			goto out;
 881		}
 882		req->rl_backchannel = false;
 883		list_add(&req->rl_free, &buf->rb_send_bufs);
 884	}
 885
 886	INIT_LIST_HEAD(&buf->rb_recv_bufs);
 887	for (i = 0; i < buf->rb_max_requests + 2; i++) {
 888		struct rpcrdma_rep *rep;
 889
 890		rep = rpcrdma_create_rep(r_xprt);
 891		if (IS_ERR(rep)) {
 892			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
 893				__func__, i);
 894			rc = PTR_ERR(rep);
 895			goto out;
 896		}
 897		list_add(&rep->rr_list, &buf->rb_recv_bufs);
 898	}
 899
 900	return 0;
 901out:
 902	rpcrdma_buffer_destroy(buf);
 903	return rc;
 904}
 905
 906static struct rpcrdma_req *
 907rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
 
 
 
 
 
 
 908{
 909	struct rpcrdma_req *req;
 910
 911	req = list_first_entry(&buf->rb_send_bufs,
 912			       struct rpcrdma_req, rl_free);
 913	list_del(&req->rl_free);
 914	return req;
 915}
 916
 917static struct rpcrdma_rep *
 918rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
 919{
 920	struct rpcrdma_rep *rep;
 921
 922	rep = list_first_entry(&buf->rb_recv_bufs,
 923			       struct rpcrdma_rep, rr_list);
 924	list_del(&rep->rr_list);
 925	return rep;
 926}
 927
 928static void
 929rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 930{
 931	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
 932	kfree(rep);
 933}
 934
 935void
 936rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 937{
 938	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
 939	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
 940	kfree(req);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 941}
 942
 
 
 
 
 
 
 
 
 943void
 944rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 945{
 946	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
 947
 948	while (!list_empty(&buf->rb_recv_bufs)) {
 949		struct rpcrdma_rep *rep;
 950
 951		rep = rpcrdma_buffer_get_rep_locked(buf);
 952		rpcrdma_destroy_rep(ia, rep);
 953	}
 954
 955	spin_lock(&buf->rb_reqslock);
 956	while (!list_empty(&buf->rb_allreqs)) {
 957		struct rpcrdma_req *req;
 958
 959		req = list_first_entry(&buf->rb_allreqs,
 960				       struct rpcrdma_req, rl_all);
 961		list_del(&req->rl_all);
 962
 963		spin_unlock(&buf->rb_reqslock);
 964		rpcrdma_destroy_req(ia, req);
 965		spin_lock(&buf->rb_reqslock);
 966	}
 967	spin_unlock(&buf->rb_reqslock);
 968
 969	ia->ri_ops->ro_destroy(buf);
 970}
 971
 972struct rpcrdma_mw *
 973rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
 
 
 
 
 
 
 
 974{
 975	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 976	struct rpcrdma_mw *mw = NULL;
 977
 978	spin_lock(&buf->rb_mwlock);
 979	if (!list_empty(&buf->rb_mws)) {
 980		mw = list_first_entry(&buf->rb_mws,
 981				      struct rpcrdma_mw, mw_list);
 982		list_del_init(&mw->mw_list);
 983	}
 984	spin_unlock(&buf->rb_mwlock);
 985
 986	if (!mw)
 987		pr_err("RPC:       %s: no MWs available\n", __func__);
 988	return mw;
 989}
 990
 991void
 992rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
 
 
 
 
 993{
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 994	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 995
 996	spin_lock(&buf->rb_mwlock);
 997	list_add_tail(&mw->mw_list, &buf->rb_mws);
 998	spin_unlock(&buf->rb_mwlock);
 
 999}
1000
1001/*
1002 * Get a set of request/reply buffers.
 
1003 *
1004 * Reply buffer (if available) is attached to send buffer upon return.
1005 */
1006struct rpcrdma_req *
1007rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1008{
1009	struct rpcrdma_req *req;
1010
1011	spin_lock(&buffers->rb_lock);
1012	if (list_empty(&buffers->rb_send_bufs))
1013		goto out_reqbuf;
1014	req = rpcrdma_buffer_get_req_locked(buffers);
1015	if (list_empty(&buffers->rb_recv_bufs))
1016		goto out_repbuf;
1017	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1018	spin_unlock(&buffers->rb_lock);
1019	return req;
1020
1021out_reqbuf:
1022	spin_unlock(&buffers->rb_lock);
1023	pr_warn("RPC:       %s: out of request buffers\n", __func__);
1024	return NULL;
1025out_repbuf:
1026	spin_unlock(&buffers->rb_lock);
1027	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
1028	req->rl_reply = NULL;
1029	return req;
1030}
1031
1032/*
1033 * Put request/reply buffers back into pool.
1034 * Pre-decrement counter/array index.
 
 
1035 */
1036void
1037rpcrdma_buffer_put(struct rpcrdma_req *req)
1038{
1039	struct rpcrdma_buffer *buffers = req->rl_buffer;
1040	struct rpcrdma_rep *rep = req->rl_reply;
1041
1042	req->rl_niovs = 0;
1043	req->rl_reply = NULL;
1044
1045	spin_lock(&buffers->rb_lock);
1046	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1047	if (rep)
1048		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1049	spin_unlock(&buffers->rb_lock);
1050}
1051
1052/*
1053 * Recover reply buffers from pool.
1054 * This happens when recovering from disconnect.
 
 
1055 */
1056void
1057rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1058{
1059	struct rpcrdma_buffer *buffers = req->rl_buffer;
1060
1061	spin_lock(&buffers->rb_lock);
1062	if (!list_empty(&buffers->rb_recv_bufs))
1063		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1064	spin_unlock(&buffers->rb_lock);
1065}
1066
1067/*
1068 * Put reply buffers back into pool when not attached to
1069 * request. This happens in error conditions.
 
 
1070 */
1071void
1072rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 
1073{
1074	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1075
1076	spin_lock(&buffers->rb_lock);
1077	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1078	spin_unlock(&buffers->rb_lock);
1079}
1080
1081/*
1082 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1083 */
 
 
 
 
 
1084
1085void
1086rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1087{
1088	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1089		seg->mr_offset,
1090		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
1091}
1092
1093/**
1094 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1095 * @ia: controlling rpcrdma_ia
1096 * @size: size of buffer to be allocated, in bytes
1097 * @flags: GFP flags
1098 *
1099 * Returns pointer to private header of an area of internally
1100 * registered memory, or an ERR_PTR. The registered buffer follows
1101 * the end of the private header.
1102 *
1103 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1104 * receiving the payload of RDMA RECV operations. regbufs are not
1105 * used for RDMA READ/WRITE operations, thus are registered only for
1106 * LOCAL access.
1107 */
1108struct rpcrdma_regbuf *
1109rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1110{
1111	struct rpcrdma_regbuf *rb;
1112	struct ib_sge *iov;
1113
1114	rb = kmalloc(sizeof(*rb) + size, flags);
1115	if (rb == NULL)
1116		goto out;
1117
1118	iov = &rb->rg_iov;
1119	iov->addr = ib_dma_map_single(ia->ri_device,
1120				      (void *)rb->rg_base, size,
1121				      DMA_BIDIRECTIONAL);
1122	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1123		goto out_free;
1124
1125	iov->length = size;
1126	iov->lkey = ia->ri_pd->local_dma_lkey;
1127	rb->rg_size = size;
1128	rb->rg_owner = NULL;
1129	return rb;
1130
1131out_free:
1132	kfree(rb);
1133out:
1134	return ERR_PTR(-ENOMEM);
1135}
1136
1137/**
1138 * rpcrdma_free_regbuf - deregister and free registered buffer
1139 * @ia: controlling rpcrdma_ia
1140 * @rb: regbuf to be deregistered and freed
 
 
1141 */
1142void
1143rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1144{
1145	struct ib_sge *iov;
1146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1147	if (!rb)
1148		return;
1149
1150	iov = &rb->rg_iov;
1151	ib_dma_unmap_single(ia->ri_device,
1152			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
 
 
 
 
 
 
 
 
 
 
1153	kfree(rb);
1154}
1155
1156/*
1157 * Prepost any receive buffer, then post send.
 
 
 
1158 *
1159 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 
1160 */
1161int
1162rpcrdma_ep_post(struct rpcrdma_ia *ia,
1163		struct rpcrdma_ep *ep,
1164		struct rpcrdma_req *req)
1165{
1166	struct ib_device *device = ia->ri_device;
1167	struct ib_send_wr send_wr, *send_wr_fail;
1168	struct rpcrdma_rep *rep = req->rl_reply;
1169	struct ib_sge *iov = req->rl_send_iov;
1170	int i, rc;
1171
1172	if (rep) {
1173		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1174		if (rc)
1175			goto out;
1176		req->rl_reply = NULL;
1177	}
1178
1179	send_wr.next = NULL;
1180	send_wr.wr_cqe = &req->rl_cqe;
1181	send_wr.sg_list = iov;
1182	send_wr.num_sge = req->rl_niovs;
1183	send_wr.opcode = IB_WR_SEND;
1184
1185	for (i = 0; i < send_wr.num_sge; i++)
1186		ib_dma_sync_single_for_device(device, iov[i].addr,
1187					      iov[i].length, DMA_TO_DEVICE);
1188	dprintk("RPC:       %s: posting %d s/g entries\n",
1189		__func__, send_wr.num_sge);
1190
1191	if (DECR_CQCOUNT(ep) > 0)
1192		send_wr.send_flags = 0;
1193	else { /* Provider must take a send completion every now and then */
1194		INIT_CQCOUNT(ep);
1195		send_wr.send_flags = IB_SEND_SIGNALED;
1196	}
1197
1198	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1199	if (rc)
1200		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1201			rc);
1202out:
1203	return rc;
1204}
1205
1206/*
1207 * (Re)post a receive buffer.
1208 */
1209int
1210rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1211		     struct rpcrdma_ep *ep,
1212		     struct rpcrdma_rep *rep)
1213{
1214	struct ib_recv_wr recv_wr, *recv_wr_fail;
1215	int rc;
1216
1217	recv_wr.next = NULL;
1218	recv_wr.wr_cqe = &rep->rr_cqe;
1219	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1220	recv_wr.num_sge = 1;
1221
1222	ib_dma_sync_single_for_cpu(ia->ri_device,
1223				   rdmab_addr(rep->rr_rdmabuf),
1224				   rdmab_length(rep->rr_rdmabuf),
1225				   DMA_BIDIRECTIONAL);
1226
1227	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1228
 
 
1229	if (rc)
1230		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1231			rc);
1232	return rc;
1233}
1234
1235/**
1236 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1237 * @r_xprt: transport associated with these backchannel resources
1238 * @min_reqs: minimum number of incoming requests expected
1239 *
1240 * Returns zero if all requested buffers were posted, or a negative errno.
1241 */
1242int
1243rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1244{
1245	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1246	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1247	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 
1248	struct rpcrdma_rep *rep;
1249	int rc;
1250
1251	while (count--) {
1252		spin_lock(&buffers->rb_lock);
1253		if (list_empty(&buffers->rb_recv_bufs))
1254			goto out_reqbuf;
1255		rep = rpcrdma_buffer_get_rep_locked(buffers);
1256		spin_unlock(&buffers->rb_lock);
1257
1258		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1259		if (rc)
1260			goto out_rc;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1261	}
 
 
1262
1263	return 0;
 
1264
1265out_reqbuf:
1266	spin_unlock(&buffers->rb_lock);
1267	pr_warn("%s: no extra receive buffers\n", __func__);
1268	return -ENOMEM;
1269
1270out_rc:
1271	rpcrdma_recv_buffer_put(rep);
1272	return rc;
1273}
1274
1275/* How many chunk list items fit within our inline buffers?
1276 */
1277unsigned int
1278rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1279{
1280	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1281	int bytes, segments;
1282
1283	bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1284	bytes -= RPCRDMA_HDRLEN_MIN;
1285	if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1286		pr_warn("RPC:       %s: inline threshold too small\n",
1287			__func__);
1288		return 0;
1289	}
 
 
1290
1291	segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1292	dprintk("RPC:       %s: max chunk list size = %d segments\n",
1293		__func__, segments);
1294	return segments;
 
 
1295}
v5.4
   1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2/*
   3 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5 *
   6 * This software is available to you under a choice of one of two
   7 * licenses.  You may choose to be licensed under the terms of the GNU
   8 * General Public License (GPL) Version 2, available from the file
   9 * COPYING in the main directory of this source tree, or the BSD-type
  10 * license below:
  11 *
  12 * Redistribution and use in source and binary forms, with or without
  13 * modification, are permitted provided that the following conditions
  14 * are met:
  15 *
  16 *      Redistributions of source code must retain the above copyright
  17 *      notice, this list of conditions and the following disclaimer.
  18 *
  19 *      Redistributions in binary form must reproduce the above
  20 *      copyright notice, this list of conditions and the following
  21 *      disclaimer in the documentation and/or other materials provided
  22 *      with the distribution.
  23 *
  24 *      Neither the name of the Network Appliance, Inc. nor the names of
  25 *      its contributors may be used to endorse or promote products
  26 *      derived from this software without specific prior written
  27 *      permission.
  28 *
  29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40 */
  41
  42/*
  43 * verbs.c
  44 *
  45 * Encapsulates the major functions managing:
  46 *  o adapters
  47 *  o endpoints
  48 *  o connections
  49 *  o buffer memory
  50 */
  51
  52#include <linux/interrupt.h>
  53#include <linux/slab.h>
 
  54#include <linux/sunrpc/addr.h>
  55#include <linux/sunrpc/svc_rdma.h>
  56#include <linux/log2.h>
  57
  58#include <asm-generic/barrier.h>
  59#include <asm/bitops.h>
  60
  61#include <rdma/ib_cm.h>
  62
  63#include "xprt_rdma.h"
  64#include <trace/events/rpcrdma.h>
  65
  66/*
  67 * Globals/Macros
  68 */
  69
  70#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  71# define RPCDBG_FACILITY	RPCDBG_TRANS
  72#endif
  73
  74/*
  75 * internal functions
  76 */
  77static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
  78static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf);
  79static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
  80static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
  81static void rpcrdma_mr_free(struct rpcrdma_mr *mr);
  82static struct rpcrdma_regbuf *
  83rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
  84		     gfp_t flags);
  85static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
  86static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
  87static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
  88
  89/* Wait for outstanding transport work to finish. ib_drain_qp
  90 * handles the drains in the wrong order for us, so open code
  91 * them here.
  92 */
  93static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
  94{
  95	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
 
 
 
 
 
 
 
 
 
  96
  97	/* Flush Receives, then wait for deferred Reply work
  98	 * to complete.
  99	 */
 100	ib_drain_rq(ia->ri_id->qp);
 101
 102	/* Deferred Reply processing might have scheduled
 103	 * local invalidations.
 104	 */
 105	ib_drain_sq(ia->ri_id->qp);
 
 106}
 107
 108/**
 109 * rpcrdma_qp_event_handler - Handle one QP event (error notification)
 110 * @event: details of the event
 111 * @context: ep that owns QP where event occurred
 112 *
 113 * Called from the RDMA provider (device driver) possibly in an interrupt
 114 * context.
 115 */
 116static void
 117rpcrdma_qp_event_handler(struct ib_event *event, void *context)
 118{
 119	struct rpcrdma_ep *ep = context;
 120	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
 121						   rx_ep);
 122
 123	trace_xprtrdma_qp_event(r_xprt, event);
 
 
 
 
 
 
 
 124}
 125
 126/**
 127 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 128 * @cq:	completion queue (ignored)
 129 * @wc:	completed WR
 130 *
 131 */
 132static void
 133rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 134{
 135	struct ib_cqe *cqe = wc->wr_cqe;
 136	struct rpcrdma_sendctx *sc =
 137		container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 138
 139	/* WARNING: Only wr_cqe and status are reliable at this point */
 140	trace_xprtrdma_wc_send(sc, wc);
 141	rpcrdma_sendctx_put_locked(sc);
 142}
 143
 144/**
 145 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 146 * @cq:	completion queue (ignored)
 147 * @wc:	completed WR
 148 *
 149 */
 150static void
 151rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 152{
 153	struct ib_cqe *cqe = wc->wr_cqe;
 154	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 155					       rr_cqe);
 156	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 157
 158	/* WARNING: Only wr_cqe and status are reliable at this point */
 159	trace_xprtrdma_wc_receive(wc);
 160	--r_xprt->rx_ep.rep_receive_count;
 161	if (wc->status != IB_WC_SUCCESS)
 162		goto out_flushed;
 163
 164	/* status == SUCCESS means all fields in wc are trustworthy */
 165	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
 166	rep->rr_wc_flags = wc->wc_flags;
 167	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 
 
 168
 169	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
 
 170				   rdmab_addr(rep->rr_rdmabuf),
 171				   wc->byte_len, DMA_FROM_DEVICE);
 172
 173	rpcrdma_post_recvs(r_xprt, false);
 174	rpcrdma_reply_handler(rep);
 
 
 175	return;
 176
 177out_flushed:
 178	rpcrdma_recv_buffer_put(rep);
 
 
 
 
 
 179}
 180
 181static void
 182rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
 183			       struct rdma_conn_param *param)
 184{
 185	const struct rpcrdma_connect_private *pmsg = param->private_data;
 186	unsigned int rsize, wsize;
 187
 188	/* Default settings for RPC-over-RDMA Version One */
 189	r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
 190	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 191	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 192
 193	if (pmsg &&
 194	    pmsg->cp_magic == rpcrdma_cmp_magic &&
 195	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
 196		r_xprt->rx_ia.ri_implicit_roundup = true;
 197		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
 198		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
 199	}
 200
 201	if (rsize < r_xprt->rx_ep.rep_inline_recv)
 202		r_xprt->rx_ep.rep_inline_recv = rsize;
 203	if (wsize < r_xprt->rx_ep.rep_inline_send)
 204		r_xprt->rx_ep.rep_inline_send = wsize;
 205	dprintk("RPC:       %s: max send %u, max recv %u\n", __func__,
 206		r_xprt->rx_ep.rep_inline_send,
 207		r_xprt->rx_ep.rep_inline_recv);
 208	rpcrdma_set_max_header_sizes(r_xprt);
 209}
 210
 211/**
 212 * rpcrdma_cm_event_handler - Handle RDMA CM events
 213 * @id: rdma_cm_id on which an event has occurred
 214 * @event: details of the event
 215 *
 216 * Called with @id's mutex held. Returns 1 if caller should
 217 * destroy @id, otherwise 0.
 218 */
 219static int
 220rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 221{
 222	struct rpcrdma_xprt *r_xprt = id->context;
 223	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 224	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 225	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 226
 227	might_sleep();
 
 
 
 228
 229	trace_xprtrdma_cm_event(r_xprt, event);
 230	switch (event->event) {
 231	case RDMA_CM_EVENT_ADDR_RESOLVED:
 232	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 233		ia->ri_async_rc = 0;
 234		complete(&ia->ri_done);
 235		return 0;
 236	case RDMA_CM_EVENT_ADDR_ERROR:
 237		ia->ri_async_rc = -EPROTO;
 
 
 238		complete(&ia->ri_done);
 239		return 0;
 240	case RDMA_CM_EVENT_ROUTE_ERROR:
 241		ia->ri_async_rc = -ENETUNREACH;
 
 
 242		complete(&ia->ri_done);
 243		return 0;
 244	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 245#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 246		pr_info("rpcrdma: removing device %s for %s:%s\n",
 247			ia->ri_id->device->name,
 248			rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
 249#endif
 250		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
 251		ep->rep_connected = -ENODEV;
 252		xprt_force_disconnect(xprt);
 253		wait_for_completion(&ia->ri_remove_done);
 254
 255		ia->ri_id = NULL;
 256		/* Return 1 to ensure the core destroys the id. */
 257		return 1;
 258	case RDMA_CM_EVENT_ESTABLISHED:
 259		++xprt->connect_cookie;
 260		ep->rep_connected = 1;
 261		rpcrdma_update_connect_private(r_xprt, &event->param.conn);
 262		wake_up_all(&ep->rep_connect_wait);
 263		break;
 
 
 
 
 264	case RDMA_CM_EVENT_CONNECT_ERROR:
 265		ep->rep_connected = -ENOTCONN;
 266		goto disconnected;
 267	case RDMA_CM_EVENT_UNREACHABLE:
 268		ep->rep_connected = -ENETUNREACH;
 269		goto disconnected;
 270	case RDMA_CM_EVENT_REJECTED:
 271		dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
 272			rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
 273			rdma_reject_msg(id, event->status));
 274		ep->rep_connected = -ECONNREFUSED;
 275		if (event->status == IB_CM_REJ_STALE_CONN)
 276			ep->rep_connected = -EAGAIN;
 277		goto disconnected;
 278	case RDMA_CM_EVENT_DISCONNECTED:
 279		ep->rep_connected = -ECONNABORTED;
 280disconnected:
 281		xprt_force_disconnect(xprt);
 
 
 
 
 
 
 
 282		wake_up_all(&ep->rep_connect_wait);
 283		break;
 284	default:
 
 
 
 285		break;
 286	}
 287
 288	dprintk("RPC:       %s: %s:%s on %s/frwr: %s\n", __func__,
 289		rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
 290		ia->ri_id->device->name, rdma_event_msg(event->event));
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 291	return 0;
 292}
 293
 
 
 
 
 
 
 
 
 294static struct rdma_cm_id *
 295rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
 
 296{
 297	unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
 298	struct rdma_cm_id *id;
 299	int rc;
 300
 301	trace_xprtrdma_conn_start(xprt);
 302
 303	init_completion(&ia->ri_done);
 304	init_completion(&ia->ri_remove_done);
 305
 306	id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
 307			    xprt, RDMA_PS_TCP, IB_QPT_RC);
 308	if (IS_ERR(id))
 
 
 
 309		return id;
 
 310
 311	ia->ri_async_rc = -ETIMEDOUT;
 312	rc = rdma_resolve_addr(id, NULL,
 313			       (struct sockaddr *)&xprt->rx_xprt.addr,
 314			       RDMA_RESOLVE_TIMEOUT);
 315	if (rc)
 316		goto out;
 317	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 318	if (rc < 0) {
 319		trace_xprtrdma_conn_tout(xprt);
 320		goto out;
 321	}
 
 
 322
 
 
 
 
 
 
 
 
 
 
 323	rc = ia->ri_async_rc;
 324	if (rc)
 325		goto out;
 326
 327	ia->ri_async_rc = -ETIMEDOUT;
 328	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 329	if (rc)
 330		goto out;
 331	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 332	if (rc < 0) {
 333		trace_xprtrdma_conn_tout(xprt);
 334		goto out;
 335	}
 
 
 336	rc = ia->ri_async_rc;
 337	if (rc)
 338		goto out;
 339
 340	return id;
 341
 
 342out:
 343	rdma_destroy_id(id);
 344	return ERR_PTR(rc);
 345}
 346
 347/*
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 348 * Exported functions.
 349 */
 350
 351/**
 352 * rpcrdma_ia_open - Open and initialize an Interface Adapter.
 353 * @xprt: transport with IA to (re)initialize
 354 *
 355 * Returns 0 on success, negative errno if an appropriate
 356 * Interface Adapter could not be found and opened.
 357 */
 358int
 359rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
 360{
 361	struct rpcrdma_ia *ia = &xprt->rx_ia;
 362	int rc;
 363
 364	ia->ri_id = rpcrdma_create_id(xprt, ia);
 
 
 365	if (IS_ERR(ia->ri_id)) {
 366		rc = PTR_ERR(ia->ri_id);
 367		goto out_err;
 368	}
 
 369
 370	ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0);
 371	if (IS_ERR(ia->ri_pd)) {
 372		rc = PTR_ERR(ia->ri_pd);
 373		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
 374		goto out_err;
 
 375	}
 376
 377	switch (xprt_rdma_memreg_strategy) {
 378	case RPCRDMA_FRWR:
 379		if (frwr_is_supported(ia->ri_id->device))
 380			break;
 381		/*FALLTHROUGH*/
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 382	default:
 383		pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
 384		       ia->ri_id->device->name, xprt_rdma_memreg_strategy);
 385		rc = -EINVAL;
 386		goto out_err;
 387	}
 
 
 388
 
 389	return 0;
 390
 391out_err:
 392	rpcrdma_ia_close(ia);
 393	return rc;
 394}
 395
 396/**
 397 * rpcrdma_ia_remove - Handle device driver unload
 398 * @ia: interface adapter being removed
 399 *
 400 * Divest transport H/W resources associated with this adapter,
 401 * but allow it to be restored later.
 402 */
 403void
 404rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 405{
 406	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 407						   rx_ia);
 408	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 409	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 410	struct rpcrdma_req *req;
 411
 412	cancel_work_sync(&buf->rb_refresh_worker);
 413
 414	/* This is similar to rpcrdma_ep_destroy, but:
 415	 * - Don't cancel the connect worker.
 416	 * - Don't call rpcrdma_ep_disconnect, which waits
 417	 *   for another conn upcall, which will deadlock.
 418	 * - rdma_disconnect is unneeded, the underlying
 419	 *   connection is already gone.
 420	 */
 421	if (ia->ri_id->qp) {
 422		rpcrdma_xprt_drain(r_xprt);
 423		rdma_destroy_qp(ia->ri_id);
 424		ia->ri_id->qp = NULL;
 425	}
 426	ib_free_cq(ep->rep_attr.recv_cq);
 427	ep->rep_attr.recv_cq = NULL;
 428	ib_free_cq(ep->rep_attr.send_cq);
 429	ep->rep_attr.send_cq = NULL;
 430
 431	/* The ULP is responsible for ensuring all DMA
 432	 * mappings and MRs are gone.
 433	 */
 434	rpcrdma_reps_destroy(buf);
 435	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
 436		rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
 437		rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
 438		rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
 439	}
 440	rpcrdma_mrs_destroy(buf);
 441	ib_dealloc_pd(ia->ri_pd);
 442	ia->ri_pd = NULL;
 443
 444	/* Allow waiters to continue */
 445	complete(&ia->ri_remove_done);
 446
 447	trace_xprtrdma_remove(r_xprt);
 448}
 449
 450/**
 451 * rpcrdma_ia_close - Clean up/close an IA.
 452 * @ia: interface adapter to close
 453 *
 454 */
 455void
 456rpcrdma_ia_close(struct rpcrdma_ia *ia)
 457{
 
 458	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 459		if (ia->ri_id->qp)
 460			rdma_destroy_qp(ia->ri_id);
 461		rdma_destroy_id(ia->ri_id);
 
 462	}
 463	ia->ri_id = NULL;
 464
 465	/* If the pd is still busy, xprtrdma missed freeing a resource */
 466	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
 467		ib_dealloc_pd(ia->ri_pd);
 468	ia->ri_pd = NULL;
 469}
 470
 471/**
 472 * rpcrdma_ep_create - Create unconnected endpoint
 473 * @r_xprt: transport to instantiate
 474 *
 475 * Returns zero on success, or a negative errno.
 476 */
 477int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 
 
 478{
 479	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 480	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 481	struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
 482	struct ib_cq *sendcq, *recvcq;
 483	unsigned int max_sge;
 484	int rc;
 485
 486	ep->rep_max_requests = xprt_rdma_slot_table_entries;
 487	ep->rep_inline_send = xprt_rdma_max_inline_write;
 488	ep->rep_inline_recv = xprt_rdma_max_inline_read;
 489
 490	max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge,
 491			RPCRDMA_MAX_SEND_SGES);
 492	if (max_sge < RPCRDMA_MIN_SEND_SGES) {
 493		pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
 494		return -ENOMEM;
 495	}
 496	ia->ri_max_send_sges = max_sge;
 497
 498	rc = frwr_open(ia, ep);
 499	if (rc)
 500		return rc;
 
 
 
 
 
 
 
 501
 502	ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
 503	ep->rep_attr.qp_context = ep;
 504	ep->rep_attr.srq = NULL;
 505	ep->rep_attr.cap.max_send_sge = max_sge;
 
 
 
 
 
 
 
 506	ep->rep_attr.cap.max_recv_sge = 1;
 507	ep->rep_attr.cap.max_inline_data = 0;
 508	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 509	ep->rep_attr.qp_type = IB_QPT_RC;
 510	ep->rep_attr.port_num = ~0;
 511
 512	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 513		"iovs: send %d recv %d\n",
 514		__func__,
 515		ep->rep_attr.cap.max_send_wr,
 516		ep->rep_attr.cap.max_recv_wr,
 517		ep->rep_attr.cap.max_send_sge,
 518		ep->rep_attr.cap.max_recv_sge);
 519
 520	ep->rep_send_batch = ep->rep_max_requests >> 3;
 521	ep->rep_send_count = ep->rep_send_batch;
 
 
 
 522	init_waitqueue_head(&ep->rep_connect_wait);
 523	ep->rep_receive_count = 0;
 524
 525	sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
 526				 ep->rep_attr.cap.max_send_wr + 1,
 527				 IB_POLL_WORKQUEUE);
 528	if (IS_ERR(sendcq)) {
 529		rc = PTR_ERR(sendcq);
 
 
 530		goto out1;
 531	}
 532
 533	recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
 534				 ep->rep_attr.cap.max_recv_wr + 1,
 535				 IB_POLL_WORKQUEUE);
 536	if (IS_ERR(recvcq)) {
 537		rc = PTR_ERR(recvcq);
 
 
 538		goto out2;
 539	}
 540
 541	ep->rep_attr.send_cq = sendcq;
 542	ep->rep_attr.recv_cq = recvcq;
 543
 544	/* Initialize cma parameters */
 545	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
 546
 547	/* Prepare RDMA-CM private message */
 548	pmsg->cp_magic = rpcrdma_cmp_magic;
 549	pmsg->cp_version = RPCRDMA_CMP_VERSION;
 550	pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
 551	pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send);
 552	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv);
 553	ep->rep_remote_cma.private_data = pmsg;
 554	ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
 555
 556	/* Client offers RDMA Read but does not initiate */
 557	ep->rep_remote_cma.initiator_depth = 0;
 558	ep->rep_remote_cma.responder_resources =
 559		min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom);
 560
 561	/* Limit transport retries so client can detect server
 562	 * GID changes quickly. RPC layer handles re-establishing
 563	 * transport connection and retransmission.
 564	 */
 565	ep->rep_remote_cma.retry_count = 6;
 566
 567	/* RPC-over-RDMA handles its own flow control. In addition,
 568	 * make all RNR NAKs visible so we know that RPC-over-RDMA
 569	 * flow control is working correctly (no NAKs should be seen).
 570	 */
 571	ep->rep_remote_cma.flow_control = 0;
 572	ep->rep_remote_cma.rnr_retry_count = 0;
 573
 574	return 0;
 575
 576out2:
 577	ib_free_cq(sendcq);
 578out1:
 
 
 579	return rc;
 580}
 581
 582/**
 583 * rpcrdma_ep_destroy - Disconnect and destroy endpoint.
 584 * @r_xprt: transport instance to shut down
 585 *
 
 
 
 586 */
 587void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
 
 588{
 589	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 590	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
 
 
 
 591
 592	if (ia->ri_id && ia->ri_id->qp) {
 593		rpcrdma_ep_disconnect(ep, ia);
 
 
 
 
 
 594		rdma_destroy_qp(ia->ri_id);
 595		ia->ri_id->qp = NULL;
 596	}
 597
 598	if (ep->rep_attr.recv_cq)
 599		ib_free_cq(ep->rep_attr.recv_cq);
 600	if (ep->rep_attr.send_cq)
 601		ib_free_cq(ep->rep_attr.send_cq);
 602}
 603
 604/* Re-establish a connection after a device removal event.
 605 * Unlike a normal reconnection, a fresh PD and a new set
 606 * of MRs and buffers is needed.
 607 */
 608static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
 609				    struct ib_qp_init_attr *qp_init_attr)
 610{
 611	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 612	int rc, err;
 613
 614	trace_xprtrdma_reinsert(r_xprt);
 615
 616	rc = -EHOSTUNREACH;
 617	if (rpcrdma_ia_open(r_xprt))
 618		goto out1;
 619
 620	rc = -ENOMEM;
 621	err = rpcrdma_ep_create(r_xprt);
 622	if (err) {
 623		pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
 624		goto out2;
 625	}
 626
 627	rc = -ENETUNREACH;
 628	err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
 629	if (err) {
 630		pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
 631		goto out3;
 632	}
 633
 634	rpcrdma_mrs_create(r_xprt);
 635	return 0;
 636
 637out3:
 638	rpcrdma_ep_destroy(r_xprt);
 639out2:
 640	rpcrdma_ia_close(ia);
 641out1:
 642	return rc;
 643}
 644
 645static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
 646				struct ib_qp_init_attr *qp_init_attr)
 647{
 648	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 649	struct rdma_cm_id *id, *old;
 650	int err, rc;
 651
 652	trace_xprtrdma_reconnect(r_xprt);
 653
 654	rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
 655
 656	rc = -EHOSTUNREACH;
 657	id = rpcrdma_create_id(r_xprt, ia);
 658	if (IS_ERR(id))
 659		goto out;
 660
 661	/* As long as the new ID points to the same device as the
 662	 * old ID, we can reuse the transport's existing PD and all
 663	 * previously allocated MRs. Also, the same device means
 664	 * the transport's previous DMA mappings are still valid.
 665	 *
 666	 * This is a sanity check only. There should be no way these
 667	 * point to two different devices here.
 668	 */
 669	old = id;
 670	rc = -ENETUNREACH;
 671	if (ia->ri_id->device != id->device) {
 672		pr_err("rpcrdma: can't reconnect on different device!\n");
 673		goto out_destroy;
 674	}
 675
 676	err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
 677	if (err)
 678		goto out_destroy;
 679
 680	/* Atomically replace the transport's ID and QP. */
 681	rc = 0;
 682	old = ia->ri_id;
 683	ia->ri_id = id;
 684	rdma_destroy_qp(old);
 685
 686out_destroy:
 687	rdma_destroy_id(old);
 688out:
 689	return rc;
 690}
 691
 692/*
 693 * Connect unconnected endpoint.
 694 */
 695int
 696rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 697{
 698	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 699						   rx_ia);
 700	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 701	struct ib_qp_init_attr qp_init_attr;
 702	int rc;
 703
 
 
 704retry:
 705	memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
 706	switch (ep->rep_connected) {
 707	case 0:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 708		dprintk("RPC:       %s: connecting...\n", __func__);
 709		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
 710		if (rc) {
 711			rc = -ENETUNREACH;
 712			goto out_noupdate;
 
 
 713		}
 714		break;
 715	case -ENODEV:
 716		rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
 717		if (rc)
 718			goto out_noupdate;
 719		break;
 720	default:
 721		rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
 722		if (rc)
 723			goto out;
 724	}
 725
 726	ep->rep_connected = 0;
 727	xprt_clear_connected(xprt);
 728
 729	rpcrdma_post_recvs(r_xprt, true);
 730
 731	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 732	if (rc)
 
 
 733		goto out;
 
 734
 735	if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 736		xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 737	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 
 
 
 
 
 
 
 
 
 
 
 
 738	if (ep->rep_connected <= 0) {
 739		if (ep->rep_connected == -EAGAIN)
 
 
 
 
 
 
 
 
 
 740			goto retry;
 
 741		rc = ep->rep_connected;
 742		goto out;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 743	}
 744
 745	dprintk("RPC:       %s: connected\n", __func__);
 746
 747out:
 748	if (rc)
 749		ep->rep_connected = rc;
 750
 751out_noupdate:
 752	return rc;
 753}
 754
 755/**
 756 * rpcrdma_ep_disconnect - Disconnect underlying transport
 757 * @ep: endpoint to disconnect
 758 * @ia: associated interface adapter
 759 *
 760 * This is separate from destroy to facilitate the ability
 761 * to reconnect without recreating the endpoint.
 762 *
 763 * This call is not reentrant, and must not be made in parallel
 764 * on the same endpoint.
 765 */
 766void
 767rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 768{
 769	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
 770						   rx_ep);
 771	int rc;
 772
 773	/* returns without wait if ID is not connected */
 774	rc = rdma_disconnect(ia->ri_id);
 775	if (!rc)
 
 776		wait_event_interruptible(ep->rep_connect_wait,
 777							ep->rep_connected != 1);
 778	else
 
 
 
 779		ep->rep_connected = rc;
 780	trace_xprtrdma_disconnect(r_xprt, rc);
 781
 782	rpcrdma_xprt_drain(r_xprt);
 783}
 784
 785/* Fixed-size circular FIFO queue. This implementation is wait-free and
 786 * lock-free.
 787 *
 788 * Consumer is the code path that posts Sends. This path dequeues a
 789 * sendctx for use by a Send operation. Multiple consumer threads
 790 * are serialized by the RPC transport lock, which allows only one
 791 * ->send_request call at a time.
 792 *
 793 * Producer is the code path that handles Send completions. This path
 794 * enqueues a sendctx that has been completed. Multiple producer
 795 * threads are serialized by the ib_poll_cq() function.
 796 */
 797
 798/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
 799 * queue activity, and rpcrdma_xprt_drain has flushed all remaining
 800 * Send requests.
 801 */
 802static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
 803{
 804	unsigned long i;
 805
 806	for (i = 0; i <= buf->rb_sc_last; i++)
 807		kfree(buf->rb_sc_ctxs[i]);
 808	kfree(buf->rb_sc_ctxs);
 809}
 810
 811static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
 812{
 813	struct rpcrdma_sendctx *sc;
 814
 815	sc = kzalloc(struct_size(sc, sc_sges, ia->ri_max_send_sges),
 816		     GFP_KERNEL);
 817	if (!sc)
 818		return NULL;
 819
 820	sc->sc_wr.wr_cqe = &sc->sc_cqe;
 821	sc->sc_wr.sg_list = sc->sc_sges;
 822	sc->sc_wr.opcode = IB_WR_SEND;
 823	sc->sc_cqe.done = rpcrdma_wc_send;
 824	return sc;
 825}
 826
 827static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
 828{
 829	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 830	struct rpcrdma_sendctx *sc;
 831	unsigned long i;
 832
 833	/* Maximum number of concurrent outstanding Send WRs. Capping
 834	 * the circular queue size stops Send Queue overflow by causing
 835	 * the ->send_request call to fail temporarily before too many
 836	 * Sends are posted.
 837	 */
 838	i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
 839	dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
 840	buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
 841	if (!buf->rb_sc_ctxs)
 842		return -ENOMEM;
 843
 844	buf->rb_sc_last = i - 1;
 845	for (i = 0; i <= buf->rb_sc_last; i++) {
 846		sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
 847		if (!sc)
 848			return -ENOMEM;
 849
 850		sc->sc_xprt = r_xprt;
 851		buf->rb_sc_ctxs[i] = sc;
 852	}
 853
 854	return 0;
 855}
 856
 857/* The sendctx queue is not guaranteed to have a size that is a
 858 * power of two, thus the helpers in circ_buf.h cannot be used.
 859 * The other option is to use modulus (%), which can be expensive.
 860 */
 861static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
 862					  unsigned long item)
 863{
 864	return likely(item < buf->rb_sc_last) ? item + 1 : 0;
 865}
 866
 867/**
 868 * rpcrdma_sendctx_get_locked - Acquire a send context
 869 * @r_xprt: controlling transport instance
 870 *
 871 * Returns pointer to a free send completion context; or NULL if
 872 * the queue is empty.
 873 *
 874 * Usage: Called to acquire an SGE array before preparing a Send WR.
 875 *
 876 * The caller serializes calls to this function (per transport), and
 877 * provides an effective memory barrier that flushes the new value
 878 * of rb_sc_head.
 879 */
 880struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
 881{
 882	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 883	struct rpcrdma_sendctx *sc;
 884	unsigned long next_head;
 885
 886	next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
 887
 888	if (next_head == READ_ONCE(buf->rb_sc_tail))
 889		goto out_emptyq;
 890
 891	/* ORDER: item must be accessed _before_ head is updated */
 892	sc = buf->rb_sc_ctxs[next_head];
 893
 894	/* Releasing the lock in the caller acts as a memory
 895	 * barrier that flushes rb_sc_head.
 896	 */
 897	buf->rb_sc_head = next_head;
 898
 899	return sc;
 900
 901out_emptyq:
 902	/* The queue is "empty" if there have not been enough Send
 903	 * completions recently. This is a sign the Send Queue is
 904	 * backing up. Cause the caller to pause and try again.
 905	 */
 906	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
 907	r_xprt->rx_stats.empty_sendctx_q++;
 908	return NULL;
 909}
 910
 911/**
 912 * rpcrdma_sendctx_put_locked - Release a send context
 913 * @sc: send context to release
 914 *
 915 * Usage: Called from Send completion to return a sendctxt
 916 * to the queue.
 917 *
 918 * The caller serializes calls to this function (per transport).
 919 */
 920static void
 921rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
 922{
 923	struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
 924	unsigned long next_tail;
 925
 926	/* Unmap SGEs of previously completed but unsignaled
 927	 * Sends by walking up the queue until @sc is found.
 928	 */
 929	next_tail = buf->rb_sc_tail;
 930	do {
 931		next_tail = rpcrdma_sendctx_next(buf, next_tail);
 932
 933		/* ORDER: item must be accessed _before_ tail is updated */
 934		rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
 935
 936	} while (buf->rb_sc_ctxs[next_tail] != sc);
 937
 938	/* Paired with READ_ONCE */
 939	smp_store_release(&buf->rb_sc_tail, next_tail);
 940
 941	xprt_write_space(&sc->sc_xprt->rx_xprt);
 942}
 943
 944static void
 945rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
 946{
 947	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 948	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 949	unsigned int count;
 950
 951	for (count = 0; count < ia->ri_max_segs; count++) {
 952		struct rpcrdma_mr *mr;
 953		int rc;
 954
 955		mr = kzalloc(sizeof(*mr), GFP_NOFS);
 956		if (!mr)
 957			break;
 958
 959		rc = frwr_init_mr(ia, mr);
 960		if (rc) {
 961			kfree(mr);
 962			break;
 963		}
 964
 965		mr->mr_xprt = r_xprt;
 966
 967		spin_lock(&buf->rb_lock);
 968		list_add(&mr->mr_list, &buf->rb_mrs);
 969		list_add(&mr->mr_all, &buf->rb_all_mrs);
 970		spin_unlock(&buf->rb_lock);
 971	}
 972
 973	r_xprt->rx_stats.mrs_allocated += count;
 974	trace_xprtrdma_createmrs(r_xprt, count);
 975}
 976
 977static void
 978rpcrdma_mr_refresh_worker(struct work_struct *work)
 979{
 980	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
 981						  rb_refresh_worker);
 982	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
 983						   rx_buf);
 984
 985	rpcrdma_mrs_create(r_xprt);
 986	xprt_write_space(&r_xprt->rx_xprt);
 987}
 988
 989/**
 990 * rpcrdma_req_create - Allocate an rpcrdma_req object
 991 * @r_xprt: controlling r_xprt
 992 * @size: initial size, in bytes, of send and receive buffers
 993 * @flags: GFP flags passed to memory allocators
 994 *
 995 * Returns an allocated and fully initialized rpcrdma_req or NULL.
 996 */
 997struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
 998				       gfp_t flags)
 999{
1000	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1001	struct rpcrdma_regbuf *rb;
1002	struct rpcrdma_req *req;
1003	size_t maxhdrsize;
1004
1005	req = kzalloc(sizeof(*req), flags);
1006	if (req == NULL)
1007		goto out1;
1008
1009	/* Compute maximum header buffer size in bytes */
1010	maxhdrsize = rpcrdma_fixed_maxsz + 3 +
1011		     r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz;
1012	maxhdrsize *= sizeof(__be32);
1013	rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
1014				  DMA_TO_DEVICE, flags);
1015	if (!rb)
1016		goto out2;
1017	req->rl_rdmabuf = rb;
1018	xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
1019
1020	req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
1021	if (!req->rl_sendbuf)
1022		goto out3;
1023
1024	req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
1025	if (!req->rl_recvbuf)
1026		goto out4;
1027
1028	INIT_LIST_HEAD(&req->rl_free_mrs);
1029	INIT_LIST_HEAD(&req->rl_registered);
1030	spin_lock(&buffer->rb_lock);
1031	list_add(&req->rl_all, &buffer->rb_allreqs);
1032	spin_unlock(&buffer->rb_lock);
 
 
1033	return req;
1034
1035out4:
1036	kfree(req->rl_sendbuf);
1037out3:
1038	kfree(req->rl_rdmabuf);
1039out2:
1040	kfree(req);
1041out1:
1042	return NULL;
1043}
1044
1045static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
1046					      bool temp)
1047{
 
 
1048	struct rpcrdma_rep *rep;
 
1049
 
1050	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1051	if (rep == NULL)
1052		goto out;
1053
1054	rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv,
1055					       DMA_FROM_DEVICE, GFP_KERNEL);
1056	if (!rep->rr_rdmabuf)
 
1057		goto out_free;
 
1058
1059	xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
1060		     rdmab_length(rep->rr_rdmabuf));
1061	rep->rr_cqe.done = rpcrdma_wc_receive;
1062	rep->rr_rxprt = r_xprt;
1063	rep->rr_recv_wr.next = NULL;
1064	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1065	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1066	rep->rr_recv_wr.num_sge = 1;
1067	rep->rr_temp = temp;
1068	return rep;
1069
1070out_free:
1071	kfree(rep);
1072out:
1073	return NULL;
1074}
1075
1076static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
1077{
1078	rpcrdma_regbuf_free(rep->rr_rdmabuf);
1079	kfree(rep);
1080}
1081
1082static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
1083{
1084	struct llist_node *node;
1085
1086	/* Calls to llist_del_first are required to be serialized */
1087	node = llist_del_first(&buf->rb_free_reps);
1088	if (!node)
1089		return NULL;
1090	return llist_entry(node, struct rpcrdma_rep, rr_node);
1091}
1092
1093static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
1094			    struct rpcrdma_rep *rep)
1095{
1096	if (!rep->rr_temp)
1097		llist_add(&rep->rr_node, &buf->rb_free_reps);
1098	else
1099		rpcrdma_rep_destroy(rep);
1100}
1101
1102static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1103{
1104	struct rpcrdma_rep *rep;
1105
1106	while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
1107		rpcrdma_rep_destroy(rep);
1108}
1109
1110/**
1111 * rpcrdma_buffer_create - Create initial set of req/rep objects
1112 * @r_xprt: transport instance to (re)initialize
1113 *
1114 * Returns zero on success, otherwise a negative errno.
1115 */
1116int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1117{
1118	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 
1119	int i, rc;
1120
1121	buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests;
1122	buf->rb_bc_srv_max_requests = 0;
1123	spin_lock_init(&buf->rb_lock);
1124	INIT_LIST_HEAD(&buf->rb_mrs);
1125	INIT_LIST_HEAD(&buf->rb_all_mrs);
1126	INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1127
1128	rpcrdma_mrs_create(r_xprt);
 
 
1129
1130	INIT_LIST_HEAD(&buf->rb_send_bufs);
1131	INIT_LIST_HEAD(&buf->rb_allreqs);
1132
1133	rc = -ENOMEM;
1134	for (i = 0; i < buf->rb_max_requests; i++) {
1135		struct rpcrdma_req *req;
1136
1137		req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
1138					 GFP_KERNEL);
1139		if (!req)
 
 
1140			goto out;
1141		list_add(&req->rl_list, &buf->rb_send_bufs);
 
 
1142	}
1143
1144	buf->rb_credits = 1;
1145	init_llist_head(&buf->rb_free_reps);
1146
1147	rc = rpcrdma_sendctxs_create(r_xprt);
1148	if (rc)
1149		goto out;
 
 
 
 
 
 
 
1150
1151	return 0;
1152out:
1153	rpcrdma_buffer_destroy(buf);
1154	return rc;
1155}
1156
1157/**
1158 * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1159 * @req: unused object to be destroyed
1160 *
1161 * This function assumes that the caller prevents concurrent device
1162 * unload and transport tear-down.
1163 */
1164void rpcrdma_req_destroy(struct rpcrdma_req *req)
1165{
1166	list_del(&req->rl_all);
1167
1168	while (!list_empty(&req->rl_free_mrs))
1169		rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs));
 
 
 
 
 
 
 
 
1170
1171	rpcrdma_regbuf_free(req->rl_recvbuf);
1172	rpcrdma_regbuf_free(req->rl_sendbuf);
1173	rpcrdma_regbuf_free(req->rl_rdmabuf);
1174	kfree(req);
1175}
1176
1177static void
1178rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
 
 
 
 
 
 
 
1179{
1180	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1181						   rx_buf);
1182	struct rpcrdma_mr *mr;
1183	unsigned int count;
1184
1185	count = 0;
1186	spin_lock(&buf->rb_lock);
1187	while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1188					      struct rpcrdma_mr,
1189					      mr_all)) != NULL) {
1190		list_del(&mr->mr_all);
1191		spin_unlock(&buf->rb_lock);
1192
1193		frwr_release_mr(mr);
1194		count++;
1195		spin_lock(&buf->rb_lock);
1196	}
1197	spin_unlock(&buf->rb_lock);
1198	r_xprt->rx_stats.mrs_allocated = 0;
1199}
1200
1201/**
1202 * rpcrdma_buffer_destroy - Release all hw resources
1203 * @buf: root control block for resources
1204 *
1205 * ORDERING: relies on a prior rpcrdma_xprt_drain :
1206 * - No more Send or Receive completions can occur
1207 * - All MRs, reps, and reqs are returned to their free lists
1208 */
1209void
1210rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1211{
1212	cancel_work_sync(&buf->rb_refresh_worker);
 
 
 
1213
1214	rpcrdma_sendctxs_destroy(buf);
1215	rpcrdma_reps_destroy(buf);
 
1216
1217	while (!list_empty(&buf->rb_send_bufs)) {
 
1218		struct rpcrdma_req *req;
1219
1220		req = list_first_entry(&buf->rb_send_bufs,
1221				       struct rpcrdma_req, rl_list);
1222		list_del(&req->rl_list);
1223		rpcrdma_req_destroy(req);
 
 
 
1224	}
 
1225
1226	rpcrdma_mrs_destroy(buf);
1227}
1228
1229/**
1230 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1231 * @r_xprt: controlling transport
1232 *
1233 * Returns an initialized rpcrdma_mr or NULL if no free
1234 * rpcrdma_mr objects are available.
1235 */
1236struct rpcrdma_mr *
1237rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1238{
1239	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1240	struct rpcrdma_mr *mr;
1241
1242	spin_lock(&buf->rb_lock);
1243	mr = rpcrdma_mr_pop(&buf->rb_mrs);
1244	spin_unlock(&buf->rb_lock);
1245	return mr;
 
 
 
 
 
 
 
1246}
1247
1248/**
1249 * rpcrdma_mr_put - DMA unmap an MR and release it
1250 * @mr: MR to release
1251 *
1252 */
1253void rpcrdma_mr_put(struct rpcrdma_mr *mr)
1254{
1255	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1256
1257	if (mr->mr_dir != DMA_NONE) {
1258		trace_xprtrdma_mr_unmap(mr);
1259		ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
1260				mr->mr_sg, mr->mr_nents, mr->mr_dir);
1261		mr->mr_dir = DMA_NONE;
1262	}
1263
1264	rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
1265}
1266
1267static void rpcrdma_mr_free(struct rpcrdma_mr *mr)
1268{
1269	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1270	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1271
1272	mr->mr_req = NULL;
1273	spin_lock(&buf->rb_lock);
1274	rpcrdma_mr_push(mr, &buf->rb_mrs);
1275	spin_unlock(&buf->rb_lock);
1276}
1277
1278/**
1279 * rpcrdma_buffer_get - Get a request buffer
1280 * @buffers: Buffer pool from which to obtain a buffer
1281 *
1282 * Returns a fresh rpcrdma_req, or NULL if none are available.
1283 */
1284struct rpcrdma_req *
1285rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1286{
1287	struct rpcrdma_req *req;
1288
1289	spin_lock(&buffers->rb_lock);
1290	req = list_first_entry_or_null(&buffers->rb_send_bufs,
1291				       struct rpcrdma_req, rl_list);
1292	if (req)
1293		list_del_init(&req->rl_list);
 
 
1294	spin_unlock(&buffers->rb_lock);
1295	return req;
 
 
 
 
 
 
 
 
 
 
1296}
1297
1298/**
1299 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1300 * @buffers: buffer pool
1301 * @req: object to return
1302 *
1303 */
1304void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 
1305{
1306	if (req->rl_reply)
1307		rpcrdma_rep_put(buffers, req->rl_reply);
 
 
1308	req->rl_reply = NULL;
1309
1310	spin_lock(&buffers->rb_lock);
1311	list_add(&req->rl_list, &buffers->rb_send_bufs);
 
 
1312	spin_unlock(&buffers->rb_lock);
1313}
1314
1315/**
1316 * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
1317 * @rep: rep to release
1318 *
1319 * Used after error conditions.
1320 */
1321void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 
1322{
1323	rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
 
 
 
 
 
1324}
1325
1326/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1327 *
1328 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1329 * receiving the payload of RDMA RECV operations. During Long Calls
1330 * or Replies they may be registered externally via frwr_map.
1331 */
1332static struct rpcrdma_regbuf *
1333rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
1334		     gfp_t flags)
1335{
1336	struct rpcrdma_regbuf *rb;
 
 
 
 
 
1337
1338	rb = kmalloc(sizeof(*rb), flags);
1339	if (!rb)
1340		return NULL;
1341	rb->rg_data = kmalloc(size, flags);
1342	if (!rb->rg_data) {
1343		kfree(rb);
1344		return NULL;
1345	}
1346
1347	rb->rg_device = NULL;
1348	rb->rg_direction = direction;
1349	rb->rg_iov.length = size;
1350	return rb;
 
 
1351}
1352
1353/**
1354 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1355 * @rb: regbuf to reallocate
1356 * @size: size of buffer to be allocated, in bytes
1357 * @flags: GFP flags
1358 *
1359 * Returns true if reallocation was successful. If false is
1360 * returned, @rb is left untouched.
 
 
 
 
 
 
1361 */
1362bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
 
1363{
1364	void *buf;
 
 
 
 
 
1365
1366	buf = kmalloc(size, flags);
1367	if (!buf)
1368		return false;
 
 
 
1369
1370	rpcrdma_regbuf_dma_unmap(rb);
1371	kfree(rb->rg_data);
 
 
 
1372
1373	rb->rg_data = buf;
1374	rb->rg_iov.length = size;
1375	return true;
 
1376}
1377
1378/**
1379 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1380 * @r_xprt: controlling transport instance
1381 * @rb: regbuf to be mapped
1382 *
1383 * Returns true if the buffer is now DMA mapped to @r_xprt's device
1384 */
1385bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1386			      struct rpcrdma_regbuf *rb)
1387{
1388	struct ib_device *device = r_xprt->rx_ia.ri_id->device;
1389
1390	if (rb->rg_direction == DMA_NONE)
1391		return false;
1392
1393	rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1394					    rdmab_length(rb), rb->rg_direction);
1395	if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1396		trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1397		return false;
1398	}
1399
1400	rb->rg_device = device;
1401	rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey;
1402	return true;
1403}
1404
1405static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
1406{
1407	if (!rb)
1408		return;
1409
1410	if (!rpcrdma_regbuf_is_mapped(rb))
1411		return;
1412
1413	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1414			    rb->rg_direction);
1415	rb->rg_device = NULL;
1416}
1417
1418static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1419{
1420	rpcrdma_regbuf_dma_unmap(rb);
1421	if (rb)
1422		kfree(rb->rg_data);
1423	kfree(rb);
1424}
1425
1426/**
1427 * rpcrdma_ep_post - Post WRs to a transport's Send Queue
1428 * @ia: transport's device information
1429 * @ep: transport's RDMA endpoint information
1430 * @req: rpcrdma_req containing the Send WR to post
1431 *
1432 * Returns 0 if the post was successful, otherwise -ENOTCONN
1433 * is returned.
1434 */
1435int
1436rpcrdma_ep_post(struct rpcrdma_ia *ia,
1437		struct rpcrdma_ep *ep,
1438		struct rpcrdma_req *req)
1439{
1440	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1441	int rc;
1442
1443	if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
1444		send_wr->send_flags |= IB_SEND_SIGNALED;
1445		ep->rep_send_count = ep->rep_send_batch;
1446	} else {
1447		send_wr->send_flags &= ~IB_SEND_SIGNALED;
1448		--ep->rep_send_count;
1449	}
 
 
 
 
1450
1451	rc = frwr_send(ia, req);
1452	trace_xprtrdma_post_send(req, rc);
1453	if (rc)
1454		return -ENOTCONN;
1455	return 0;
 
1456}
1457
1458static void
1459rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 
 
 
 
 
 
 
1460{
1461	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 
1462	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1463	struct ib_recv_wr *i, *wr, *bad_wr;
1464	struct rpcrdma_rep *rep;
1465	int needed, count, rc;
1466
1467	rc = 0;
1468	count = 0;
 
 
 
 
1469
1470	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1471	if (likely(ep->rep_receive_count > needed))
1472		goto out;
1473	needed -= ep->rep_receive_count;
1474	if (!temp)
1475		needed += RPCRDMA_MAX_RECV_BATCH;
1476
1477	/* fast path: all needed reps can be found on the free list */
1478	wr = NULL;
1479	while (needed) {
1480		rep = rpcrdma_rep_get_locked(buf);
1481		if (!rep)
1482			rep = rpcrdma_rep_create(r_xprt, temp);
1483		if (!rep)
1484			break;
1485
1486		rep->rr_recv_wr.next = wr;
1487		wr = &rep->rr_recv_wr;
1488		--needed;
1489	}
1490	if (!wr)
1491		goto out;
1492
1493	for (i = wr; i; i = i->next) {
1494		rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
1495
1496		if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
1497			goto release_wrs;
 
 
1498
1499		trace_xprtrdma_post_recv(rep);
1500		++count;
1501	}
 
1502
1503	rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
1504			  (const struct ib_recv_wr **)&bad_wr);
1505out:
1506	trace_xprtrdma_post_recvs(r_xprt, count, rc);
1507	if (rc) {
1508		for (wr = bad_wr; wr;) {
1509			struct rpcrdma_rep *rep;
1510
1511			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1512			wr = wr->next;
1513			rpcrdma_recv_buffer_put(rep);
1514			--count;
1515		}
 
1516	}
1517	ep->rep_receive_count += count;
1518	return;
1519
1520release_wrs:
1521	for (i = wr; i;) {
1522		rep = container_of(i, struct rpcrdma_rep, rr_recv_wr);
1523		i = i->next;
1524		rpcrdma_recv_buffer_put(rep);
1525	}
1526}