Linux Audio

Check our new training course

Loading...
v4.6
 
   1/*
 
   2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the BSD-type
   8 * license below:
   9 *
  10 * Redistribution and use in source and binary forms, with or without
  11 * modification, are permitted provided that the following conditions
  12 * are met:
  13 *
  14 *      Redistributions of source code must retain the above copyright
  15 *      notice, this list of conditions and the following disclaimer.
  16 *
  17 *      Redistributions in binary form must reproduce the above
  18 *      copyright notice, this list of conditions and the following
  19 *      disclaimer in the documentation and/or other materials provided
  20 *      with the distribution.
  21 *
  22 *      Neither the name of the Network Appliance, Inc. nor the names of
  23 *      its contributors may be used to endorse or promote products
  24 *      derived from this software without specific prior written
  25 *      permission.
  26 *
  27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38 */
  39
  40/*
  41 * verbs.c
  42 *
  43 * Encapsulates the major functions managing:
  44 *  o adapters
  45 *  o endpoints
  46 *  o connections
  47 *  o buffer memory
  48 */
  49
  50#include <linux/interrupt.h>
  51#include <linux/slab.h>
  52#include <linux/prefetch.h>
  53#include <linux/sunrpc/addr.h>
  54#include <asm/bitops.h>
  55#include <linux/module.h> /* try_module_get()/module_put() */
  56
  57#include "xprt_rdma.h"
 
  58
  59/*
  60 * Globals/Macros
  61 */
  62
  63#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  64# define RPCDBG_FACILITY	RPCDBG_TRANS
  65#endif
  66
  67/*
  68 * internal functions
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  69 */
  70
  71static struct workqueue_struct *rpcrdma_receive_wq;
  72
  73int
  74rpcrdma_alloc_wq(void)
  75{
  76	struct workqueue_struct *recv_wq;
 
  77
  78	recv_wq = alloc_workqueue("xprtrdma_receive",
  79				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
  80				  0);
  81	if (!recv_wq)
  82		return -ENOMEM;
  83
  84	rpcrdma_receive_wq = recv_wq;
  85	return 0;
  86}
 
  87
  88void
  89rpcrdma_destroy_wq(void)
  90{
  91	struct workqueue_struct *wq;
  92
  93	if (rpcrdma_receive_wq) {
  94		wq = rpcrdma_receive_wq;
  95		rpcrdma_receive_wq = NULL;
  96		destroy_workqueue(wq);
  97	}
  98}
  99
 100static void
 101rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
 
 
 
 102{
 103	struct rpcrdma_ep *ep = context;
 104
 105	pr_err("RPC:       %s: %s on device %s ep %p\n",
 106	       __func__, ib_event_msg(event->event),
 107		event->device->name, context);
 108	if (ep->rep_connected == 1) {
 109		ep->rep_connected = -EIO;
 110		rpcrdma_conn_func(ep);
 111		wake_up_all(&ep->rep_connect_wait);
 112	}
 113}
 114
 115/**
 116 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 117 * @cq:	completion queue (ignored)
 118 * @wc:	completed WR
 119 *
 
 120 */
 121static void
 122rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 123{
 124	/* WARNING: Only wr_cqe and status are reliable at this point */
 125	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
 126		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
 127		       ib_wc_status_msg(wc->status),
 128		       wc->status, wc->vendor_err);
 129}
 130
 131static void
 132rpcrdma_receive_worker(struct work_struct *work)
 133{
 134	struct rpcrdma_rep *rep =
 135			container_of(work, struct rpcrdma_rep, rr_work);
 136
 137	rpcrdma_reply_handler(rep);
 138}
 139
 140/* Perform basic sanity checking to avoid using garbage
 141 * to update the credit grant value.
 
 
 
 142 */
 143static void
 144rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
 145{
 146	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
 147	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
 148	u32 credits;
 149
 150	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
 151		return;
 152
 153	credits = be32_to_cpu(rmsgp->rm_credit);
 154	if (credits == 0)
 155		credits = 1;	/* don't deadlock */
 156	else if (credits > buffer->rb_max_requests)
 157		credits = buffer->rb_max_requests;
 158
 159	atomic_set(&buffer->rb_credits, credits);
 
 
 
 160}
 161
 162/**
 163 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 164 * @cq:	completion queue (ignored)
 165 * @wc:	completed WR
 166 *
 167 */
 168static void
 169rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
 170{
 171	struct ib_cqe *cqe = wc->wr_cqe;
 172	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 173					       rr_cqe);
 
 174
 175	/* WARNING: Only wr_id and status are reliable at this point */
 
 
 176	if (wc->status != IB_WC_SUCCESS)
 177		goto out_fail;
 178
 179	/* status == SUCCESS means all fields in wc are trustworthy */
 180	if (wc->opcode != IB_WC_RECV)
 181		return;
 
 182
 183	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
 184		__func__, rep, wc->byte_len);
 185
 186	rep->rr_len = wc->byte_len;
 187	ib_dma_sync_single_for_cpu(rep->rr_device,
 188				   rdmab_addr(rep->rr_rdmabuf),
 189				   rep->rr_len, DMA_FROM_DEVICE);
 190
 191	rpcrdma_update_granted_credits(rep);
 192
 193out_schedule:
 194	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 195	return;
 196
 197out_fail:
 198	if (wc->status != IB_WC_WR_FLUSH_ERR)
 199		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
 200		       ib_wc_status_msg(wc->status),
 201		       wc->status, wc->vendor_err);
 202	rep->rr_len = RPCRDMA_BAD_LEN;
 203	goto out_schedule;
 204}
 205
 206static void
 207rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 208{
 209	struct ib_wc wc;
 
 210
 211	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
 212		rpcrdma_receive_wc(NULL, &wc);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 213}
 214
 
 
 
 
 
 
 
 
 215static int
 216rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 217{
 218	struct rpcrdma_xprt *xprt = id->context;
 219	struct rpcrdma_ia *ia = &xprt->rx_ia;
 220	struct rpcrdma_ep *ep = &xprt->rx_ep;
 221#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 222	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
 223#endif
 224	struct ib_qp_attr *attr = &ia->ri_qp_attr;
 225	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
 226	int connstate = 0;
 227
 228	switch (event->event) {
 229	case RDMA_CM_EVENT_ADDR_RESOLVED:
 230	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 231		ia->ri_async_rc = 0;
 232		complete(&ia->ri_done);
 233		break;
 234	case RDMA_CM_EVENT_ADDR_ERROR:
 235		ia->ri_async_rc = -EHOSTUNREACH;
 236		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
 237			__func__, ep);
 238		complete(&ia->ri_done);
 239		break;
 240	case RDMA_CM_EVENT_ROUTE_ERROR:
 241		ia->ri_async_rc = -ENETUNREACH;
 242		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
 243			__func__, ep);
 244		complete(&ia->ri_done);
 245		break;
 
 
 
 
 
 246	case RDMA_CM_EVENT_ESTABLISHED:
 247		connstate = 1;
 248		ib_query_qp(ia->ri_id->qp, attr,
 249			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
 250			    iattr);
 251		dprintk("RPC:       %s: %d responder resources"
 252			" (%d initiator)\n",
 253			__func__, attr->max_dest_rd_atomic,
 254			attr->max_rd_atomic);
 255		goto connected;
 256	case RDMA_CM_EVENT_CONNECT_ERROR:
 257		connstate = -ENOTCONN;
 258		goto connected;
 259	case RDMA_CM_EVENT_UNREACHABLE:
 260		connstate = -ENETDOWN;
 261		goto connected;
 262	case RDMA_CM_EVENT_REJECTED:
 263		connstate = -ECONNREFUSED;
 264		goto connected;
 
 
 
 
 265	case RDMA_CM_EVENT_DISCONNECTED:
 266		connstate = -ECONNABORTED;
 267		goto connected;
 268	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 269		connstate = -ENODEV;
 270connected:
 271		dprintk("RPC:       %s: %sconnected\n",
 272					__func__, connstate > 0 ? "" : "dis");
 273		atomic_set(&xprt->rx_buf.rb_credits, 1);
 274		ep->rep_connected = connstate;
 275		rpcrdma_conn_func(ep);
 276		wake_up_all(&ep->rep_connect_wait);
 277		/*FALLTHROUGH*/
 278	default:
 279		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
 280			__func__, sap, rpc_get_port(sap), ep,
 281			rdma_event_msg(event->event));
 282		break;
 283	}
 284
 285#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 286	if (connstate == 1) {
 287		int ird = attr->max_dest_rd_atomic;
 288		int tird = ep->rep_remote_cma.responder_resources;
 289
 290		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
 291			sap, rpc_get_port(sap),
 292			ia->ri_device->name,
 293			ia->ri_ops->ro_displayname,
 294			xprt->rx_buf.rb_max_requests,
 295			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
 296	} else if (connstate < 0) {
 297		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
 298			sap, rpc_get_port(sap), connstate);
 299	}
 300#endif
 301
 302	return 0;
 303}
 304
 305static void rpcrdma_destroy_id(struct rdma_cm_id *id)
 306{
 307	if (id) {
 308		module_put(id->device->owner);
 309		rdma_destroy_id(id);
 310	}
 311}
 312
 313static struct rdma_cm_id *
 314rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 315			struct rpcrdma_ia *ia, struct sockaddr *addr)
 316{
 
 
 317	struct rdma_cm_id *id;
 318	int rc;
 319
 320	init_completion(&ia->ri_done);
 321
 322	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
 323			    IB_QPT_RC);
 324	if (IS_ERR(id)) {
 325		rc = PTR_ERR(id);
 326		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
 327			__func__, rc);
 328		return id;
 329	}
 330
 331	ia->ri_async_rc = -ETIMEDOUT;
 332	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
 333	if (rc) {
 334		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
 335			__func__, rc);
 
 
 336		goto out;
 337	}
 338	wait_for_completion_interruptible_timeout(&ia->ri_done,
 339				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 340
 341	/* FIXME:
 342	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
 343	 * be pinned while there are active NFS/RDMA mounts to prevent
 344	 * hangs and crashes at umount time.
 345	 */
 346	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
 347		dprintk("RPC:       %s: Failed to get device module\n",
 348			__func__);
 349		ia->ri_async_rc = -ENODEV;
 350	}
 351	rc = ia->ri_async_rc;
 352	if (rc)
 353		goto out;
 354
 355	ia->ri_async_rc = -ETIMEDOUT;
 356	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 357	if (rc) {
 358		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
 359			__func__, rc);
 360		goto put;
 361	}
 362	wait_for_completion_interruptible_timeout(&ia->ri_done,
 363				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 364	rc = ia->ri_async_rc;
 365	if (rc)
 366		goto put;
 
 
 
 
 
 
 367
 368	return id;
 369put:
 370	module_put(id->device->owner);
 371out:
 372	rdma_destroy_id(id);
 373	return ERR_PTR(rc);
 374}
 375
 376/*
 377 * Drain any cq, prior to teardown.
 378 */
 379static void
 380rpcrdma_clean_cq(struct ib_cq *cq)
 381{
 382	struct ib_wc wc;
 383	int count = 0;
 384
 385	while (1 == ib_poll_cq(cq, 1, &wc))
 386		++count;
 
 
 
 
 
 
 
 
 
 387
 388	if (count)
 389		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
 390			__func__, count, wc.opcode);
 
 
 
 391}
 392
 393/*
 394 * Exported functions.
 395 */
 
 396
 397/*
 398 * Open and initialize an Interface Adapter.
 399 *  o initializes fields of struct rpcrdma_ia, including
 400 *    interface and provider attributes and protection zone.
 401 */
 402int
 403rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 404{
 405	struct rpcrdma_ia *ia = &xprt->rx_ia;
 
 
 
 
 
 
 
 
 406	int rc;
 407
 408	ia->ri_dma_mr = NULL;
 
 
 
 
 409
 410	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
 411	if (IS_ERR(ia->ri_id)) {
 412		rc = PTR_ERR(ia->ri_id);
 413		goto out1;
 414	}
 415	ia->ri_device = ia->ri_id->device;
 
 
 
 
 
 
 
 
 
 
 416
 417	ia->ri_pd = ib_alloc_pd(ia->ri_device);
 418	if (IS_ERR(ia->ri_pd)) {
 419		rc = PTR_ERR(ia->ri_pd);
 420		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
 421			__func__, rc);
 422		goto out2;
 423	}
 424
 425	if (memreg == RPCRDMA_FRMR) {
 426		if (!(ia->ri_device->attrs.device_cap_flags &
 427				IB_DEVICE_MEM_MGT_EXTENSIONS) ||
 428		    (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
 429			dprintk("RPC:       %s: FRMR registration "
 430				"not supported by HCA\n", __func__);
 431			memreg = RPCRDMA_MTHCAFMR;
 432		}
 433	}
 434	if (memreg == RPCRDMA_MTHCAFMR) {
 435		if (!ia->ri_device->alloc_fmr) {
 436			dprintk("RPC:       %s: MTHCAFMR registration "
 437				"not supported by HCA\n", __func__);
 438			rc = -EINVAL;
 439			goto out3;
 440		}
 
 
 
 
 
 
 
 
 
 
 441	}
 
 442
 443	switch (memreg) {
 444	case RPCRDMA_FRMR:
 445		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
 446		break;
 447	case RPCRDMA_ALLPHYSICAL:
 448		ia->ri_ops = &rpcrdma_physical_memreg_ops;
 449		break;
 450	case RPCRDMA_MTHCAFMR:
 451		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
 452		break;
 453	default:
 454		printk(KERN_ERR "RPC: Unsupported memory "
 455				"registration mode: %d\n", memreg);
 456		rc = -ENOMEM;
 457		goto out3;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 458	}
 459	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
 460		__func__, ia->ri_ops->ro_displayname);
 461
 462	rwlock_init(&ia->ri_qplock);
 
 
 
 
 463	return 0;
 464
 465out3:
 466	ib_dealloc_pd(ia->ri_pd);
 467	ia->ri_pd = NULL;
 468out2:
 469	rpcrdma_destroy_id(ia->ri_id);
 470	ia->ri_id = NULL;
 471out1:
 472	return rc;
 473}
 474
 475/*
 476 * Clean up/close an IA.
 477 *   o if event handles and PD have been initialized, free them.
 478 *   o close the IA
 
 479 */
 480void
 481rpcrdma_ia_close(struct rpcrdma_ia *ia)
 482{
 483	dprintk("RPC:       %s: entering\n", __func__);
 484	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 485		if (ia->ri_id->qp)
 486			rdma_destroy_qp(ia->ri_id);
 487		rpcrdma_destroy_id(ia->ri_id);
 488		ia->ri_id = NULL;
 489	}
 490
 491	/* If the pd is still busy, xprtrdma missed freeing a resource */
 492	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
 493		ib_dealloc_pd(ia->ri_pd);
 494}
 495
 496/*
 497 * Create unconnected endpoint.
 498 */
 499int
 500rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 501				struct rpcrdma_create_data_internal *cdata)
 502{
 503	struct ib_cq *sendcq, *recvcq;
 504	unsigned int max_qp_wr;
 505	int rc;
 506
 507	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
 508		dprintk("RPC:       %s: insufficient sge's available\n",
 509			__func__);
 510		return -ENOMEM;
 511	}
 512
 513	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
 514		dprintk("RPC:       %s: insufficient wqe's available\n",
 515			__func__);
 516		return -ENOMEM;
 
 
 
 
 
 
 
 517	}
 518	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS;
 519
 520	/* check provider's send/recv wr limits */
 521	if (cdata->max_requests > max_qp_wr)
 522		cdata->max_requests = max_qp_wr;
 523
 524	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 525	ep->rep_attr.qp_context = ep;
 526	ep->rep_attr.srq = NULL;
 527	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 528	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
 529	rc = ia->ri_ops->ro_open(ia, ep, cdata);
 530	if (rc)
 531		return rc;
 532	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
 533	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 534	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
 535	ep->rep_attr.cap.max_recv_sge = 1;
 536	ep->rep_attr.cap.max_inline_data = 0;
 537	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 538	ep->rep_attr.qp_type = IB_QPT_RC;
 539	ep->rep_attr.port_num = ~0;
 540
 541	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 542		"iovs: send %d recv %d\n",
 543		__func__,
 544		ep->rep_attr.cap.max_send_wr,
 545		ep->rep_attr.cap.max_recv_wr,
 546		ep->rep_attr.cap.max_send_sge,
 547		ep->rep_attr.cap.max_recv_sge);
 548
 549	/* set trigger for requesting send completion */
 550	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
 551	if (ep->rep_cqinit <= 2)
 552		ep->rep_cqinit = 0;	/* always signal? */
 553	INIT_CQCOUNT(ep);
 554	init_waitqueue_head(&ep->rep_connect_wait);
 555	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 556
 557	sendcq = ib_alloc_cq(ia->ri_device, NULL,
 558			     ep->rep_attr.cap.max_send_wr + 1,
 559			     0, IB_POLL_SOFTIRQ);
 560	if (IS_ERR(sendcq)) {
 561		rc = PTR_ERR(sendcq);
 562		dprintk("RPC:       %s: failed to create send CQ: %i\n",
 563			__func__, rc);
 564		goto out1;
 565	}
 566
 567	recvcq = ib_alloc_cq(ia->ri_device, NULL,
 568			     ep->rep_attr.cap.max_recv_wr + 1,
 569			     0, IB_POLL_SOFTIRQ);
 570	if (IS_ERR(recvcq)) {
 571		rc = PTR_ERR(recvcq);
 572		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
 573			__func__, rc);
 574		goto out2;
 575	}
 
 
 576
 577	ep->rep_attr.send_cq = sendcq;
 578	ep->rep_attr.recv_cq = recvcq;
 
 
 579
 580	/* Initialize cma parameters */
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 581
 582	/* RPC/RDMA does not use private data */
 583	ep->rep_remote_cma.private_data = NULL;
 584	ep->rep_remote_cma.private_data_len = 0;
 585
 586	/* Client offers RDMA Read but does not initiate */
 587	ep->rep_remote_cma.initiator_depth = 0;
 588	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
 589		ep->rep_remote_cma.responder_resources = 32;
 590	else
 591		ep->rep_remote_cma.responder_resources =
 592						ia->ri_device->attrs.max_qp_rd_atom;
 593
 594	ep->rep_remote_cma.retry_count = 7;
 595	ep->rep_remote_cma.flow_control = 0;
 596	ep->rep_remote_cma.rnr_retry_count = 0;
 597
 598	return 0;
 
 599
 600out2:
 601	ib_free_cq(sendcq);
 602out1:
 603	if (ia->ri_dma_mr)
 604		ib_dereg_mr(ia->ri_dma_mr);
 605	return rc;
 606}
 607
 608/*
 609 * rpcrdma_ep_destroy
 610 *
 611 * Disconnect and destroy endpoint. After this, the only
 612 * valid operations on the ep are to free it (if dynamically
 613 * allocated) or re-create it.
 
 
 
 
 
 614 */
 615void
 616rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
 
 
 
 617{
 618	int rc;
 
 619
 620	dprintk("RPC:       %s: entering, connected is %d\n",
 621		__func__, ep->rep_connected);
 
 
 
 
 
 
 
 
 
 622
 623	cancel_delayed_work_sync(&ep->rep_connect_worker);
 
 
 
 624
 625	if (ia->ri_id->qp)
 626		rpcrdma_ep_disconnect(ep, ia);
 
 
 
 
 627
 628	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
 629	rpcrdma_clean_cq(ep->rep_attr.send_cq);
 
 
 
 630
 631	if (ia->ri_id->qp) {
 632		rdma_destroy_qp(ia->ri_id);
 633		ia->ri_id->qp = NULL;
 634	}
 
 
 
 
 
 635
 636	ib_free_cq(ep->rep_attr.recv_cq);
 637	ib_free_cq(ep->rep_attr.send_cq);
 
 
 
 638
 639	if (ia->ri_dma_mr) {
 640		rc = ib_dereg_mr(ia->ri_dma_mr);
 641		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
 642			__func__, rc);
 643	}
 
 
 
 
 644}
 645
 646/*
 647 * Connect unconnected endpoint.
 
 648 */
 649int
 650rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 651{
 652	struct rdma_cm_id *id, *old;
 653	int rc = 0;
 654	int retry_count = 0;
 655
 656	if (ep->rep_connected != 0) {
 657		struct rpcrdma_xprt *xprt;
 658retry:
 659		dprintk("RPC:       %s: reconnecting...\n", __func__);
 660
 661		rpcrdma_ep_disconnect(ep, ia);
 662		rpcrdma_flush_cqs(ep);
 663
 664		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 665		id = rpcrdma_create_id(xprt, ia,
 666				(struct sockaddr *)&xprt->rx_data.addr);
 667		if (IS_ERR(id)) {
 668			rc = -EHOSTUNREACH;
 669			goto out;
 670		}
 671		/* TEMP TEMP TEMP - fail if new device:
 672		 * Deregister/remarshal *all* requests!
 673		 * Close and recreate adapter, pd, etc!
 674		 * Re-determine all attributes still sane!
 675		 * More stuff I haven't thought of!
 676		 * Rrrgh!
 677		 */
 678		if (ia->ri_device != id->device) {
 679			printk("RPC:       %s: can't reconnect on "
 680				"different device!\n", __func__);
 681			rpcrdma_destroy_id(id);
 682			rc = -ENETUNREACH;
 683			goto out;
 684		}
 685		/* END TEMP */
 686		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
 687		if (rc) {
 688			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 689				__func__, rc);
 690			rpcrdma_destroy_id(id);
 691			rc = -ENETUNREACH;
 692			goto out;
 693		}
 694
 695		write_lock(&ia->ri_qplock);
 696		old = ia->ri_id;
 697		ia->ri_id = id;
 698		write_unlock(&ia->ri_qplock);
 699
 700		rdma_destroy_qp(old);
 701		rpcrdma_destroy_id(old);
 702	} else {
 703		dprintk("RPC:       %s: connecting...\n", __func__);
 704		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
 705		if (rc) {
 706			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 707				__func__, rc);
 708			/* do not update ep->rep_connected */
 709			return -ENETUNREACH;
 710		}
 711	}
 
 712
 713	ep->rep_connected = 0;
 714
 715	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 716	if (rc) {
 717		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
 718				__func__, rc);
 719		goto out;
 720	}
 721
 722	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 
 723
 724	/*
 725	 * Check state. A non-peer reject indicates no listener
 726	 * (ECONNREFUSED), which may be a transient state. All
 727	 * others indicate a transport condition which has already
 728	 * undergone a best-effort.
 729	 */
 730	if (ep->rep_connected == -ECONNREFUSED &&
 731	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
 732		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
 733		goto retry;
 734	}
 735	if (ep->rep_connected <= 0) {
 736		/* Sometimes, the only way to reliably connect to remote
 737		 * CMs is to use same nonzero values for ORD and IRD. */
 738		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
 739		    (ep->rep_remote_cma.responder_resources == 0 ||
 740		     ep->rep_remote_cma.initiator_depth !=
 741				ep->rep_remote_cma.responder_resources)) {
 742			if (ep->rep_remote_cma.responder_resources == 0)
 743				ep->rep_remote_cma.responder_resources = 1;
 744			ep->rep_remote_cma.initiator_depth =
 745				ep->rep_remote_cma.responder_resources;
 746			goto retry;
 747		}
 748		rc = ep->rep_connected;
 749	} else {
 750		struct rpcrdma_xprt *r_xprt;
 751		unsigned int extras;
 752
 753		dprintk("RPC:       %s: connected\n", __func__);
 754
 755		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 756		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
 757
 758		if (extras) {
 759			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
 760			if (rc) {
 761				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
 762					__func__, rc);
 763				rc = 0;
 764			}
 765		}
 766	}
 767
 768out:
 769	if (rc)
 770		ep->rep_connected = rc;
 771	return rc;
 
 
 
 
 
 
 772}
 773
 774/*
 775 * rpcrdma_ep_disconnect
 
 
 776 *
 777 * This is separate from destroy to facilitate the ability
 778 * to reconnect without recreating the endpoint.
 779 *
 780 * This call is not reentrant, and must not be made in parallel
 781 * on the same endpoint.
 782 */
 783void
 784rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 785{
 786	int rc;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 787
 788	rpcrdma_flush_cqs(ep);
 789	rc = rdma_disconnect(ia->ri_id);
 790	if (!rc) {
 791		/* returns without wait if not connected */
 792		wait_event_interruptible(ep->rep_connect_wait,
 793							ep->rep_connected != 1);
 794		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
 795			(ep->rep_connected == 1) ? "still " : "dis");
 796	} else {
 797		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
 798		ep->rep_connected = rc;
 799	}
 
 
 
 800}
 801
 802struct rpcrdma_req *
 803rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 804{
 805	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 806	struct rpcrdma_req *req;
 807
 808	req = kzalloc(sizeof(*req), GFP_KERNEL);
 809	if (req == NULL)
 810		return ERR_PTR(-ENOMEM);
 
 
 
 
 811
 812	INIT_LIST_HEAD(&req->rl_free);
 813	spin_lock(&buffer->rb_reqslock);
 
 
 
 
 
 814	list_add(&req->rl_all, &buffer->rb_allreqs);
 815	spin_unlock(&buffer->rb_reqslock);
 816	req->rl_cqe.done = rpcrdma_wc_send;
 817	req->rl_buffer = &r_xprt->rx_buf;
 818	return req;
 
 
 
 
 
 
 
 819}
 820
 821struct rpcrdma_rep *
 822rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 
 
 
 
 
 
 823{
 824	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 825	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 826	struct rpcrdma_rep *rep;
 827	int rc;
 828
 829	rc = -ENOMEM;
 830	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
 831	if (rep == NULL)
 
 
 
 
 832		goto out;
 833
 834	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
 835					       GFP_KERNEL);
 836	if (IS_ERR(rep->rr_rdmabuf)) {
 837		rc = PTR_ERR(rep->rr_rdmabuf);
 838		goto out_free;
 839	}
 840
 841	rep->rr_device = ia->ri_device;
 842	rep->rr_cqe.done = rpcrdma_receive_wc;
 843	rep->rr_rxprt = r_xprt;
 844	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
 845	return rep;
 846
 847out_free:
 848	kfree(rep);
 849out:
 850	return ERR_PTR(rc);
 851}
 852
 853int
 854rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 
 
 
 
 855{
 856	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 857	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 858	int i, rc;
 859
 860	buf->rb_max_requests = r_xprt->rx_data.max_requests;
 861	buf->rb_bc_srv_max_requests = 0;
 862	spin_lock_init(&buf->rb_lock);
 863	atomic_set(&buf->rb_credits, 1);
 864
 865	rc = ia->ri_ops->ro_init(r_xprt);
 866	if (rc)
 867		goto out;
 
 
 
 
 868
 869	INIT_LIST_HEAD(&buf->rb_send_bufs);
 870	INIT_LIST_HEAD(&buf->rb_allreqs);
 871	spin_lock_init(&buf->rb_reqslock);
 872	for (i = 0; i < buf->rb_max_requests; i++) {
 873		struct rpcrdma_req *req;
 874
 875		req = rpcrdma_create_req(r_xprt);
 876		if (IS_ERR(req)) {
 877			dprintk("RPC:       %s: request buffer %d alloc"
 878				" failed\n", __func__, i);
 879			rc = PTR_ERR(req);
 880			goto out;
 881		}
 882		req->rl_backchannel = false;
 883		list_add(&req->rl_free, &buf->rb_send_bufs);
 884	}
 885
 886	INIT_LIST_HEAD(&buf->rb_recv_bufs);
 887	for (i = 0; i < buf->rb_max_requests + 2; i++) {
 888		struct rpcrdma_rep *rep;
 889
 890		rep = rpcrdma_create_rep(r_xprt);
 891		if (IS_ERR(rep)) {
 892			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
 893				__func__, i);
 894			rc = PTR_ERR(rep);
 895			goto out;
 896		}
 897		list_add(&rep->rr_list, &buf->rb_recv_bufs);
 898	}
 899
 900	return 0;
 901out:
 902	rpcrdma_buffer_destroy(buf);
 903	return rc;
 904}
 905
 906static struct rpcrdma_req *
 907rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
 
 
 
 
 908{
 
 909	struct rpcrdma_req *req;
 910
 911	req = list_first_entry(&buf->rb_send_bufs,
 912			       struct rpcrdma_req, rl_free);
 913	list_del(&req->rl_free);
 914	return req;
 915}
 916
 917static struct rpcrdma_rep *
 918rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
 
 919{
 
 920	struct rpcrdma_rep *rep;
 921
 922	rep = list_first_entry(&buf->rb_recv_bufs,
 923			       struct rpcrdma_rep, rr_list);
 924	list_del(&rep->rr_list);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 925	return rep;
 
 
 
 
 
 
 
 926}
 927
 928static void
 929rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 930{
 931	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
 932	kfree(rep);
 933}
 934
 935void
 936rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 937{
 938	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
 939	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
 940	kfree(req);
 
 
 
 
 941}
 942
 943void
 944rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 945{
 946	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
 
 947
 948	while (!list_empty(&buf->rb_recv_bufs)) {
 949		struct rpcrdma_rep *rep;
 
 
 
 
 
 
 950
 951		rep = rpcrdma_buffer_get_rep_locked(buf);
 952		rpcrdma_destroy_rep(ia, rep);
 
 953	}
 
 954
 955	spin_lock(&buf->rb_reqslock);
 956	while (!list_empty(&buf->rb_allreqs)) {
 957		struct rpcrdma_req *req;
 958
 959		req = list_first_entry(&buf->rb_allreqs,
 960				       struct rpcrdma_req, rl_all);
 961		list_del(&req->rl_all);
 
 
 
 962
 963		spin_unlock(&buf->rb_reqslock);
 964		rpcrdma_destroy_req(ia, req);
 965		spin_lock(&buf->rb_reqslock);
 966	}
 967	spin_unlock(&buf->rb_reqslock);
 968
 969	ia->ri_ops->ro_destroy(buf);
 
 
 970}
 971
 972struct rpcrdma_mw *
 973rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
 
 
 
 
 
 974{
 975	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 976	struct rpcrdma_mw *mw = NULL;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 977
 978	spin_lock(&buf->rb_mwlock);
 979	if (!list_empty(&buf->rb_mws)) {
 980		mw = list_first_entry(&buf->rb_mws,
 981				      struct rpcrdma_mw, mw_list);
 982		list_del_init(&mw->mw_list);
 983	}
 984	spin_unlock(&buf->rb_mwlock);
 985
 986	if (!mw)
 987		pr_err("RPC:       %s: no MWs available\n", __func__);
 988	return mw;
 
 
 989}
 990
 991void
 992rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
 
 
 
 
 
 
 993{
 994	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 
 
 
 
 
 
 
 
 
 995
 996	spin_lock(&buf->rb_mwlock);
 997	list_add_tail(&mw->mw_list, &buf->rb_mws);
 998	spin_unlock(&buf->rb_mwlock);
 
 
 
 
 999}
1000
1001/*
1002 * Get a set of request/reply buffers.
 
1003 *
1004 * Reply buffer (if available) is attached to send buffer upon return.
 
1005 */
1006struct rpcrdma_req *
1007rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1008{
1009	struct rpcrdma_req *req;
 
1010
1011	spin_lock(&buffers->rb_lock);
1012	if (list_empty(&buffers->rb_send_bufs))
1013		goto out_reqbuf;
1014	req = rpcrdma_buffer_get_req_locked(buffers);
1015	if (list_empty(&buffers->rb_recv_bufs))
1016		goto out_repbuf;
1017	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1018	spin_unlock(&buffers->rb_lock);
1019	return req;
1020
1021out_reqbuf:
1022	spin_unlock(&buffers->rb_lock);
1023	pr_warn("RPC:       %s: out of request buffers\n", __func__);
1024	return NULL;
1025out_repbuf:
1026	spin_unlock(&buffers->rb_lock);
1027	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
1028	req->rl_reply = NULL;
1029	return req;
 
 
 
 
1030}
1031
1032/*
1033 * Put request/reply buffers back into pool.
1034 * Pre-decrement counter/array index.
 
 
 
 
1035 */
1036void
1037rpcrdma_buffer_put(struct rpcrdma_req *req)
1038{
1039	struct rpcrdma_buffer *buffers = req->rl_buffer;
1040	struct rpcrdma_rep *rep = req->rl_reply;
1041
1042	req->rl_niovs = 0;
1043	req->rl_reply = NULL;
1044
1045	spin_lock(&buffers->rb_lock);
1046	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1047	if (rep)
1048		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1049	spin_unlock(&buffers->rb_lock);
1050}
1051
1052/*
1053 * Recover reply buffers from pool.
1054 * This happens when recovering from disconnect.
 
 
 
1055 */
1056void
1057rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1058{
1059	struct rpcrdma_buffer *buffers = req->rl_buffer;
 
1060
1061	spin_lock(&buffers->rb_lock);
1062	if (!list_empty(&buffers->rb_recv_bufs))
1063		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1064	spin_unlock(&buffers->rb_lock);
1065}
1066
1067/*
1068 * Put reply buffers back into pool when not attached to
1069 * request. This happens in error conditions.
 
 
1070 */
1071void
1072rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1073{
1074	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1075
1076	spin_lock(&buffers->rb_lock);
1077	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
 
 
 
1078	spin_unlock(&buffers->rb_lock);
 
1079}
1080
1081/*
1082 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
 
 
 
1083 */
1084
1085void
1086rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1087{
1088	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1089		seg->mr_offset,
1090		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
 
 
1091}
1092
1093/**
1094 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1095 * @ia: controlling rpcrdma_ia
1096 * @size: size of buffer to be allocated, in bytes
1097 * @flags: GFP flags
1098 *
1099 * Returns pointer to private header of an area of internally
1100 * registered memory, or an ERR_PTR. The registered buffer follows
1101 * the end of the private header.
1102 *
1103 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1104 * receiving the payload of RDMA RECV operations. regbufs are not
1105 * used for RDMA READ/WRITE operations, thus are registered only for
1106 * LOCAL access.
1107 */
1108struct rpcrdma_regbuf *
1109rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1110{
1111	struct rpcrdma_regbuf *rb;
1112	struct ib_sge *iov;
1113
1114	rb = kmalloc(sizeof(*rb) + size, flags);
1115	if (rb == NULL)
1116		goto out;
1117
1118	iov = &rb->rg_iov;
1119	iov->addr = ib_dma_map_single(ia->ri_device,
1120				      (void *)rb->rg_base, size,
1121				      DMA_BIDIRECTIONAL);
1122	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1123		goto out_free;
 
 
1124
1125	iov->length = size;
1126	iov->lkey = ia->ri_pd->local_dma_lkey;
1127	rb->rg_size = size;
1128	rb->rg_owner = NULL;
1129	return rb;
1130
1131out_free:
1132	kfree(rb);
1133out:
1134	return ERR_PTR(-ENOMEM);
1135}
1136
1137/**
1138 * rpcrdma_free_regbuf - deregister and free registered buffer
1139 * @ia: controlling rpcrdma_ia
1140 * @rb: regbuf to be deregistered and freed
 
 
 
 
1141 */
1142void
1143rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1144{
1145	struct ib_sge *iov;
1146
1147	if (!rb)
1148		return;
 
1149
1150	iov = &rb->rg_iov;
1151	ib_dma_unmap_single(ia->ri_device,
1152			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
1153	kfree(rb);
 
 
1154}
1155
1156/*
1157 * Prepost any receive buffer, then post send.
 
 
1158 *
1159 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1160 */
1161int
1162rpcrdma_ep_post(struct rpcrdma_ia *ia,
1163		struct rpcrdma_ep *ep,
1164		struct rpcrdma_req *req)
1165{
1166	struct ib_device *device = ia->ri_device;
1167	struct ib_send_wr send_wr, *send_wr_fail;
1168	struct rpcrdma_rep *rep = req->rl_reply;
1169	struct ib_sge *iov = req->rl_send_iov;
1170	int i, rc;
1171
1172	if (rep) {
1173		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1174		if (rc)
1175			goto out;
1176		req->rl_reply = NULL;
1177	}
1178
1179	send_wr.next = NULL;
1180	send_wr.wr_cqe = &req->rl_cqe;
1181	send_wr.sg_list = iov;
1182	send_wr.num_sge = req->rl_niovs;
1183	send_wr.opcode = IB_WR_SEND;
1184
1185	for (i = 0; i < send_wr.num_sge; i++)
1186		ib_dma_sync_single_for_device(device, iov[i].addr,
1187					      iov[i].length, DMA_TO_DEVICE);
1188	dprintk("RPC:       %s: posting %d s/g entries\n",
1189		__func__, send_wr.num_sge);
1190
1191	if (DECR_CQCOUNT(ep) > 0)
1192		send_wr.send_flags = 0;
1193	else { /* Provider must take a send completion every now and then */
1194		INIT_CQCOUNT(ep);
1195		send_wr.send_flags = IB_SEND_SIGNALED;
1196	}
1197
1198	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1199	if (rc)
1200		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1201			rc);
1202out:
1203	return rc;
1204}
1205
1206/*
1207 * (Re)post a receive buffer.
1208 */
1209int
1210rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1211		     struct rpcrdma_ep *ep,
1212		     struct rpcrdma_rep *rep)
1213{
1214	struct ib_recv_wr recv_wr, *recv_wr_fail;
1215	int rc;
1216
1217	recv_wr.next = NULL;
1218	recv_wr.wr_cqe = &rep->rr_cqe;
1219	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1220	recv_wr.num_sge = 1;
1221
1222	ib_dma_sync_single_for_cpu(ia->ri_device,
1223				   rdmab_addr(rep->rr_rdmabuf),
1224				   rdmab_length(rep->rr_rdmabuf),
1225				   DMA_BIDIRECTIONAL);
1226
1227	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
 
 
 
1228
1229	if (rc)
1230		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1231			rc);
1232	return rc;
 
 
1233}
1234
1235/**
1236 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1237 * @r_xprt: transport associated with these backchannel resources
1238 * @min_reqs: minimum number of incoming requests expected
1239 *
1240 * Returns zero if all requested buffers were posted, or a negative errno.
1241 */
1242int
1243rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1244{
1245	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1246	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1247	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1248	struct rpcrdma_rep *rep;
1249	int rc;
1250
1251	while (count--) {
1252		spin_lock(&buffers->rb_lock);
1253		if (list_empty(&buffers->rb_recv_bufs))
1254			goto out_reqbuf;
1255		rep = rpcrdma_buffer_get_rep_locked(buffers);
1256		spin_unlock(&buffers->rb_lock);
1257
1258		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1259		if (rc)
1260			goto out_rc;
1261	}
1262
1263	return 0;
 
1264
1265out_reqbuf:
1266	spin_unlock(&buffers->rb_lock);
1267	pr_warn("%s: no extra receive buffers\n", __func__);
1268	return -ENOMEM;
 
1269
1270out_rc:
1271	rpcrdma_recv_buffer_put(rep);
1272	return rc;
1273}
1274
1275/* How many chunk list items fit within our inline buffers?
1276 */
1277unsigned int
1278rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1279{
1280	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1281	int bytes, segments;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1282
1283	bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1284	bytes -= RPCRDMA_HDRLEN_MIN;
1285	if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1286		pr_warn("RPC:       %s: inline threshold too small\n",
1287			__func__);
1288		return 0;
 
 
 
 
 
 
1289	}
 
 
1290
1291	segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1292	dprintk("RPC:       %s: max chunk list size = %d segments\n",
1293		__func__, segments);
1294	return segments;
1295}
v6.2
   1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
   2/*
   3 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   5 *
   6 * This software is available to you under a choice of one of two
   7 * licenses.  You may choose to be licensed under the terms of the GNU
   8 * General Public License (GPL) Version 2, available from the file
   9 * COPYING in the main directory of this source tree, or the BSD-type
  10 * license below:
  11 *
  12 * Redistribution and use in source and binary forms, with or without
  13 * modification, are permitted provided that the following conditions
  14 * are met:
  15 *
  16 *      Redistributions of source code must retain the above copyright
  17 *      notice, this list of conditions and the following disclaimer.
  18 *
  19 *      Redistributions in binary form must reproduce the above
  20 *      copyright notice, this list of conditions and the following
  21 *      disclaimer in the documentation and/or other materials provided
  22 *      with the distribution.
  23 *
  24 *      Neither the name of the Network Appliance, Inc. nor the names of
  25 *      its contributors may be used to endorse or promote products
  26 *      derived from this software without specific prior written
  27 *      permission.
  28 *
  29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40 */
  41
  42/*
  43 * verbs.c
  44 *
  45 * Encapsulates the major functions managing:
  46 *  o adapters
  47 *  o endpoints
  48 *  o connections
  49 *  o buffer memory
  50 */
  51
  52#include <linux/interrupt.h>
  53#include <linux/slab.h>
 
  54#include <linux/sunrpc/addr.h>
  55#include <linux/sunrpc/svc_rdma.h>
  56#include <linux/log2.h>
  57
  58#include <asm-generic/barrier.h>
  59#include <asm/bitops.h>
  60
  61#include <rdma/ib_cm.h>
 
 
  62
  63#include "xprt_rdma.h"
  64#include <trace/events/rpcrdma.h>
 
  65
  66static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
  67static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
  68static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
  69				       struct rpcrdma_sendctx *sc);
  70static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
  71static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
  72static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
  73static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
  74static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
  75static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
  76static void rpcrdma_ep_get(struct rpcrdma_ep *ep);
  77static int rpcrdma_ep_put(struct rpcrdma_ep *ep);
  78static struct rpcrdma_regbuf *
  79rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction);
  80static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
  81static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
  82
  83/* Wait for outstanding transport work to finish. ib_drain_qp
  84 * handles the drains in the wrong order for us, so open code
  85 * them here.
  86 */
  87static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 
 
 
 
  88{
  89	struct rpcrdma_ep *ep = r_xprt->rx_ep;
  90	struct rdma_cm_id *id = ep->re_id;
  91
  92	/* Wait for rpcrdma_post_recvs() to leave its critical
  93	 * section.
  94	 */
  95	if (atomic_inc_return(&ep->re_receiving) > 1)
  96		wait_for_completion(&ep->re_done);
  97
  98	/* Flush Receives, then wait for deferred Reply work
  99	 * to complete.
 100	 */
 101	ib_drain_rq(id->qp);
 102
 103	/* Deferred Reply processing might have scheduled
 104	 * local invalidations.
 105	 */
 106	ib_drain_sq(id->qp);
 107
 108	rpcrdma_ep_put(ep);
 
 
 
 
 109}
 110
 111/* Ensure xprt_force_disconnect() is invoked exactly once when a
 112 * connection is closed or lost. (The important thing is it needs
 113 * to be invoked "at least" once).
 114 */
 115void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
 116{
 117	if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
 118		xprt_force_disconnect(ep->re_xprt);
 
 
 
 
 
 
 
 
 119}
 120
 121/**
 122 * rpcrdma_flush_disconnect - Disconnect on flushed completion
 123 * @r_xprt: transport to disconnect
 124 * @wc: work completion entry
 125 *
 126 * Must be called in process context.
 127 */
 128void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
 
 
 
 
 
 
 
 
 
 
 
 129{
 130	if (wc->status != IB_WC_SUCCESS)
 131		rpcrdma_force_disconnect(r_xprt->rx_ep);
 
 
 132}
 133
 134/**
 135 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 136 * @cq:	completion queue
 137 * @wc:	WCE for a completed Send WR
 138 *
 139 */
 140static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 
 141{
 142	struct ib_cqe *cqe = wc->wr_cqe;
 143	struct rpcrdma_sendctx *sc =
 144		container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
 145	struct rpcrdma_xprt *r_xprt = cq->cq_context;
 
 
 
 
 
 
 
 
 146
 147	/* WARNING: Only wr_cqe and status are reliable at this point */
 148	trace_xprtrdma_wc_send(wc, &sc->sc_cid);
 149	rpcrdma_sendctx_put_locked(r_xprt, sc);
 150	rpcrdma_flush_disconnect(r_xprt, wc);
 151}
 152
 153/**
 154 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 155 * @cq:	completion queue
 156 * @wc:	WCE for a completed Receive WR
 157 *
 158 */
 159static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 
 160{
 161	struct ib_cqe *cqe = wc->wr_cqe;
 162	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 163					       rr_cqe);
 164	struct rpcrdma_xprt *r_xprt = cq->cq_context;
 165
 166	/* WARNING: Only wr_cqe and status are reliable at this point */
 167	trace_xprtrdma_wc_receive(wc, &rep->rr_cid);
 168	--r_xprt->rx_ep->re_receive_count;
 169	if (wc->status != IB_WC_SUCCESS)
 170		goto out_flushed;
 171
 172	/* status == SUCCESS means all fields in wc are trustworthy */
 173	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
 174	rep->rr_wc_flags = wc->wc_flags;
 175	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 176
 177	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
 
 
 
 
 178				   rdmab_addr(rep->rr_rdmabuf),
 179				   wc->byte_len, DMA_FROM_DEVICE);
 
 
 180
 181	rpcrdma_reply_handler(rep);
 
 182	return;
 183
 184out_flushed:
 185	rpcrdma_flush_disconnect(r_xprt, wc);
 186	rpcrdma_rep_put(&r_xprt->rx_buf, rep);
 
 
 
 
 187}
 188
 189static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
 190				      struct rdma_conn_param *param)
 191{
 192	const struct rpcrdma_connect_private *pmsg = param->private_data;
 193	unsigned int rsize, wsize;
 194
 195	/* Default settings for RPC-over-RDMA Version One */
 196	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 197	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 198
 199	if (pmsg &&
 200	    pmsg->cp_magic == rpcrdma_cmp_magic &&
 201	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
 202		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
 203		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
 204	}
 205
 206	if (rsize < ep->re_inline_recv)
 207		ep->re_inline_recv = rsize;
 208	if (wsize < ep->re_inline_send)
 209		ep->re_inline_send = wsize;
 210
 211	rpcrdma_set_max_header_sizes(ep);
 212}
 213
 214/**
 215 * rpcrdma_cm_event_handler - Handle RDMA CM events
 216 * @id: rdma_cm_id on which an event has occurred
 217 * @event: details of the event
 218 *
 219 * Called with @id's mutex held. Returns 1 if caller should
 220 * destroy @id, otherwise 0.
 221 */
 222static int
 223rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
 224{
 225	struct sockaddr *sap = (struct sockaddr *)&id->route.addr.dst_addr;
 226	struct rpcrdma_ep *ep = id->context;
 227
 228	might_sleep();
 
 
 
 
 
 229
 230	switch (event->event) {
 231	case RDMA_CM_EVENT_ADDR_RESOLVED:
 232	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 233		ep->re_async_rc = 0;
 234		complete(&ep->re_done);
 235		return 0;
 236	case RDMA_CM_EVENT_ADDR_ERROR:
 237		ep->re_async_rc = -EPROTO;
 238		complete(&ep->re_done);
 239		return 0;
 
 
 240	case RDMA_CM_EVENT_ROUTE_ERROR:
 241		ep->re_async_rc = -ENETUNREACH;
 242		complete(&ep->re_done);
 243		return 0;
 244	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 245		pr_info("rpcrdma: removing device %s for %pISpc\n",
 246			ep->re_id->device->name, sap);
 247		fallthrough;
 248	case RDMA_CM_EVENT_ADDR_CHANGE:
 249		ep->re_connect_status = -ENODEV;
 250		goto disconnected;
 251	case RDMA_CM_EVENT_ESTABLISHED:
 252		rpcrdma_ep_get(ep);
 253		ep->re_connect_status = 1;
 254		rpcrdma_update_cm_private(ep, &event->param.conn);
 255		trace_xprtrdma_inline_thresh(ep);
 256		wake_up_all(&ep->re_connect_wait);
 257		break;
 
 
 
 258	case RDMA_CM_EVENT_CONNECT_ERROR:
 259		ep->re_connect_status = -ENOTCONN;
 260		goto wake_connect_worker;
 261	case RDMA_CM_EVENT_UNREACHABLE:
 262		ep->re_connect_status = -ENETUNREACH;
 263		goto wake_connect_worker;
 264	case RDMA_CM_EVENT_REJECTED:
 265		ep->re_connect_status = -ECONNREFUSED;
 266		if (event->status == IB_CM_REJ_STALE_CONN)
 267			ep->re_connect_status = -ENOTCONN;
 268wake_connect_worker:
 269		wake_up_all(&ep->re_connect_wait);
 270		return 0;
 271	case RDMA_CM_EVENT_DISCONNECTED:
 272		ep->re_connect_status = -ECONNABORTED;
 273disconnected:
 274		rpcrdma_force_disconnect(ep);
 275		return rpcrdma_ep_put(ep);
 
 
 
 
 
 
 
 
 276	default:
 
 
 
 277		break;
 278	}
 279
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 280	return 0;
 281}
 282
 283static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
 284					    struct rpcrdma_ep *ep)
 
 
 
 
 
 
 
 
 
 285{
 286	unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
 287	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 288	struct rdma_cm_id *id;
 289	int rc;
 290
 291	init_completion(&ep->re_done);
 292
 293	id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
 294			    RDMA_PS_TCP, IB_QPT_RC);
 295	if (IS_ERR(id))
 
 
 
 296		return id;
 
 297
 298	ep->re_async_rc = -ETIMEDOUT;
 299	rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
 300			       RDMA_RESOLVE_TIMEOUT);
 301	if (rc)
 302		goto out;
 303	rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
 304	if (rc < 0)
 305		goto out;
 
 
 
 306
 307	rc = ep->re_async_rc;
 
 
 
 
 
 
 
 
 
 
 308	if (rc)
 309		goto out;
 310
 311	ep->re_async_rc = -ETIMEDOUT;
 312	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 
 
 
 
 
 
 
 
 313	if (rc)
 314		goto out;
 315	rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
 316	if (rc < 0)
 317		goto out;
 318	rc = ep->re_async_rc;
 319	if (rc)
 320		goto out;
 321
 322	return id;
 323
 
 324out:
 325	rdma_destroy_id(id);
 326	return ERR_PTR(rc);
 327}
 328
 329static void rpcrdma_ep_destroy(struct kref *kref)
 
 
 
 
 330{
 331	struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
 
 332
 333	if (ep->re_id->qp) {
 334		rdma_destroy_qp(ep->re_id);
 335		ep->re_id->qp = NULL;
 336	}
 337
 338	if (ep->re_attr.recv_cq)
 339		ib_free_cq(ep->re_attr.recv_cq);
 340	ep->re_attr.recv_cq = NULL;
 341	if (ep->re_attr.send_cq)
 342		ib_free_cq(ep->re_attr.send_cq);
 343	ep->re_attr.send_cq = NULL;
 344
 345	if (ep->re_pd)
 346		ib_dealloc_pd(ep->re_pd);
 347	ep->re_pd = NULL;
 348
 349	kfree(ep);
 350	module_put(THIS_MODULE);
 351}
 352
 353static noinline void rpcrdma_ep_get(struct rpcrdma_ep *ep)
 354{
 355	kref_get(&ep->re_kref);
 356}
 357
 358/* Returns:
 359 *     %0 if @ep still has a positive kref count, or
 360 *     %1 if @ep was destroyed successfully.
 
 361 */
 362static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
 
 363{
 364	return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
 365}
 366
 367static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 368{
 369	struct rpcrdma_connect_private *pmsg;
 370	struct ib_device *device;
 371	struct rdma_cm_id *id;
 372	struct rpcrdma_ep *ep;
 373	int rc;
 374
 375	ep = kzalloc(sizeof(*ep), XPRTRDMA_GFP_FLAGS);
 376	if (!ep)
 377		return -ENOTCONN;
 378	ep->re_xprt = &r_xprt->rx_xprt;
 379	kref_init(&ep->re_kref);
 380
 381	id = rpcrdma_create_id(r_xprt, ep);
 382	if (IS_ERR(id)) {
 383		kfree(ep);
 384		return PTR_ERR(id);
 385	}
 386	__module_get(THIS_MODULE);
 387	device = id->device;
 388	ep->re_id = id;
 389	reinit_completion(&ep->re_done);
 390
 391	ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
 392	ep->re_inline_send = xprt_rdma_max_inline_write;
 393	ep->re_inline_recv = xprt_rdma_max_inline_read;
 394	rc = frwr_query_device(ep, device);
 395	if (rc)
 396		goto out_destroy;
 397
 398	r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
 
 
 
 
 
 
 399
 400	ep->re_attr.srq = NULL;
 401	ep->re_attr.cap.max_inline_data = 0;
 402	ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 403	ep->re_attr.qp_type = IB_QPT_RC;
 404	ep->re_attr.port_num = ~0;
 405
 406	ep->re_send_batch = ep->re_max_requests >> 3;
 407	ep->re_send_count = ep->re_send_batch;
 408	init_waitqueue_head(&ep->re_connect_wait);
 409
 410	ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
 411					      ep->re_attr.cap.max_send_wr,
 412					      IB_POLL_WORKQUEUE);
 413	if (IS_ERR(ep->re_attr.send_cq)) {
 414		rc = PTR_ERR(ep->re_attr.send_cq);
 415		ep->re_attr.send_cq = NULL;
 416		goto out_destroy;
 417	}
 418
 419	ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
 420					      ep->re_attr.cap.max_recv_wr,
 421					      IB_POLL_WORKQUEUE);
 422	if (IS_ERR(ep->re_attr.recv_cq)) {
 423		rc = PTR_ERR(ep->re_attr.recv_cq);
 424		ep->re_attr.recv_cq = NULL;
 425		goto out_destroy;
 426	}
 427	ep->re_receive_count = 0;
 428
 429	/* Initialize cma parameters */
 430	memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
 431
 432	/* Prepare RDMA-CM private message */
 433	pmsg = &ep->re_cm_private;
 434	pmsg->cp_magic = rpcrdma_cmp_magic;
 435	pmsg->cp_version = RPCRDMA_CMP_VERSION;
 436	pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
 437	pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
 438	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
 439	ep->re_remote_cma.private_data = pmsg;
 440	ep->re_remote_cma.private_data_len = sizeof(*pmsg);
 441
 442	/* Client offers RDMA Read but does not initiate */
 443	ep->re_remote_cma.initiator_depth = 0;
 444	ep->re_remote_cma.responder_resources =
 445		min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
 446
 447	/* Limit transport retries so client can detect server
 448	 * GID changes quickly. RPC layer handles re-establishing
 449	 * transport connection and retransmission.
 450	 */
 451	ep->re_remote_cma.retry_count = 6;
 452
 453	/* RPC-over-RDMA handles its own flow control. In addition,
 454	 * make all RNR NAKs visible so we know that RPC-over-RDMA
 455	 * flow control is working correctly (no NAKs should be seen).
 456	 */
 457	ep->re_remote_cma.flow_control = 0;
 458	ep->re_remote_cma.rnr_retry_count = 0;
 459
 460	ep->re_pd = ib_alloc_pd(device, 0);
 461	if (IS_ERR(ep->re_pd)) {
 462		rc = PTR_ERR(ep->re_pd);
 463		ep->re_pd = NULL;
 464		goto out_destroy;
 465	}
 
 
 466
 467	rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
 468	if (rc)
 469		goto out_destroy;
 470
 471	r_xprt->rx_ep = ep;
 472	return 0;
 473
 474out_destroy:
 475	rpcrdma_ep_put(ep);
 476	rdma_destroy_id(id);
 
 
 
 
 477	return rc;
 478}
 479
 480/**
 481 * rpcrdma_xprt_connect - Connect an unconnected transport
 482 * @r_xprt: controlling transport instance
 483 *
 484 * Returns 0 on success or a negative errno.
 485 */
 486int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
 
 487{
 488	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 489	struct rpcrdma_ep *ep;
 490	int rc;
 
 
 
 
 491
 492	rc = rpcrdma_ep_create(r_xprt);
 493	if (rc)
 494		return rc;
 495	ep = r_xprt->rx_ep;
 496
 497	xprt_clear_connected(xprt);
 498	rpcrdma_reset_cwnd(r_xprt);
 
 
 
 
 
 
 
 
 499
 500	/* Bump the ep's reference count while there are
 501	 * outstanding Receives.
 502	 */
 503	rpcrdma_ep_get(ep);
 504	rpcrdma_post_recvs(r_xprt, 1, true);
 505
 506	rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
 507	if (rc)
 508		goto out;
 509
 510	if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 511		xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 512	wait_event_interruptible(ep->re_connect_wait,
 513				 ep->re_connect_status != 0);
 514	if (ep->re_connect_status <= 0) {
 515		rc = ep->re_connect_status;
 516		goto out;
 517	}
 
 518
 519	rc = rpcrdma_sendctxs_create(r_xprt);
 520	if (rc) {
 521		rc = -ENOTCONN;
 522		goto out;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 523	}
 524
 525	rc = rpcrdma_reqs_setup(r_xprt);
 526	if (rc) {
 527		rc = -ENOTCONN;
 528		goto out;
 
 
 
 
 529	}
 530	rpcrdma_mrs_create(r_xprt);
 531	frwr_wp_create(r_xprt);
 532
 533out:
 534	trace_xprtrdma_connect(r_xprt, rc);
 535	return rc;
 536}
 537
 538/**
 539 * rpcrdma_xprt_disconnect - Disconnect underlying transport
 540 * @r_xprt: controlling transport instance
 541 *
 542 * Caller serializes. Either the transport send lock is held,
 543 * or we're being called to destroy the transport.
 544 *
 545 * On return, @r_xprt is completely divested of all hardware
 546 * resources and prepared for the next ->connect operation.
 547 */
 548void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
 549{
 550	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 551	struct rdma_cm_id *id;
 552	int rc;
 553
 554	if (!ep)
 555		return;
 
 556
 557	id = ep->re_id;
 558	rc = rdma_disconnect(id);
 559	trace_xprtrdma_disconnect(r_xprt, rc);
 560
 561	rpcrdma_xprt_drain(r_xprt);
 562	rpcrdma_reps_unmap(r_xprt);
 563	rpcrdma_reqs_reset(r_xprt);
 564	rpcrdma_mrs_destroy(r_xprt);
 565	rpcrdma_sendctxs_destroy(r_xprt);
 
 
 566
 567	if (rpcrdma_ep_put(ep))
 568		rdma_destroy_id(id);
 569
 570	r_xprt->rx_ep = NULL;
 
 
 
 
 
 571}
 572
 573/* Fixed-size circular FIFO queue. This implementation is wait-free and
 574 * lock-free.
 575 *
 576 * Consumer is the code path that posts Sends. This path dequeues a
 577 * sendctx for use by a Send operation. Multiple consumer threads
 578 * are serialized by the RPC transport lock, which allows only one
 579 * ->send_request call at a time.
 580 *
 581 * Producer is the code path that handles Send completions. This path
 582 * enqueues a sendctx that has been completed. Multiple producer
 583 * threads are serialized by the ib_poll_cq() function.
 584 */
 585
 586/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
 587 * queue activity, and rpcrdma_xprt_drain has flushed all remaining
 588 * Send requests.
 589 */
 590static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
 591{
 592	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 593	unsigned long i;
 594
 595	if (!buf->rb_sc_ctxs)
 596		return;
 597	for (i = 0; i <= buf->rb_sc_last; i++)
 598		kfree(buf->rb_sc_ctxs[i]);
 599	kfree(buf->rb_sc_ctxs);
 600	buf->rb_sc_ctxs = NULL;
 601}
 602
 603static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
 604{
 605	struct rpcrdma_sendctx *sc;
 606
 607	sc = kzalloc(struct_size(sc, sc_sges, ep->re_attr.cap.max_send_sge),
 608		     XPRTRDMA_GFP_FLAGS);
 609	if (!sc)
 610		return NULL;
 611
 612	sc->sc_cqe.done = rpcrdma_wc_send;
 613	sc->sc_cid.ci_queue_id = ep->re_attr.send_cq->res.id;
 614	sc->sc_cid.ci_completion_id =
 615		atomic_inc_return(&ep->re_completion_ids);
 616	return sc;
 617}
 618
 619static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
 620{
 621	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 622	struct rpcrdma_sendctx *sc;
 623	unsigned long i;
 624
 625	/* Maximum number of concurrent outstanding Send WRs. Capping
 626	 * the circular queue size stops Send Queue overflow by causing
 627	 * the ->send_request call to fail temporarily before too many
 628	 * Sends are posted.
 629	 */
 630	i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
 631	buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), XPRTRDMA_GFP_FLAGS);
 632	if (!buf->rb_sc_ctxs)
 633		return -ENOMEM;
 634
 635	buf->rb_sc_last = i - 1;
 636	for (i = 0; i <= buf->rb_sc_last; i++) {
 637		sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
 638		if (!sc)
 639			return -ENOMEM;
 640
 641		buf->rb_sc_ctxs[i] = sc;
 
 
 
 642	}
 643
 644	buf->rb_sc_head = 0;
 645	buf->rb_sc_tail = 0;
 646	return 0;
 647}
 648
 649/* The sendctx queue is not guaranteed to have a size that is a
 650 * power of two, thus the helpers in circ_buf.h cannot be used.
 651 * The other option is to use modulus (%), which can be expensive.
 652 */
 653static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
 654					  unsigned long item)
 655{
 656	return likely(item < buf->rb_sc_last) ? item + 1 : 0;
 657}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 658
 659/**
 660 * rpcrdma_sendctx_get_locked - Acquire a send context
 661 * @r_xprt: controlling transport instance
 662 *
 663 * Returns pointer to a free send completion context; or NULL if
 664 * the queue is empty.
 665 *
 666 * Usage: Called to acquire an SGE array before preparing a Send WR.
 667 *
 668 * The caller serializes calls to this function (per transport), and
 669 * provides an effective memory barrier that flushes the new value
 670 * of rb_sc_head.
 671 */
 672struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
 673{
 674	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 675	struct rpcrdma_sendctx *sc;
 676	unsigned long next_head;
 677
 678	next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
 679
 680	if (next_head == READ_ONCE(buf->rb_sc_tail))
 681		goto out_emptyq;
 
 
 
 
 682
 683	/* ORDER: item must be accessed _before_ head is updated */
 684	sc = buf->rb_sc_ctxs[next_head];
 685
 686	/* Releasing the lock in the caller acts as a memory
 687	 * barrier that flushes rb_sc_head.
 
 
 
 688	 */
 689	buf->rb_sc_head = next_head;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 690
 691	return sc;
 692
 693out_emptyq:
 694	/* The queue is "empty" if there have not been enough Send
 695	 * completions recently. This is a sign the Send Queue is
 696	 * backing up. Cause the caller to pause and try again.
 697	 */
 698	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
 699	r_xprt->rx_stats.empty_sendctx_q++;
 700	return NULL;
 701}
 702
 703/**
 704 * rpcrdma_sendctx_put_locked - Release a send context
 705 * @r_xprt: controlling transport instance
 706 * @sc: send context to release
 707 *
 708 * Usage: Called from Send completion to return a sendctxt
 709 * to the queue.
 710 *
 711 * The caller serializes calls to this function (per transport).
 
 712 */
 713static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
 714				       struct rpcrdma_sendctx *sc)
 715{
 716	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 717	unsigned long next_tail;
 718
 719	/* Unmap SGEs of previously completed but unsignaled
 720	 * Sends by walking up the queue until @sc is found.
 721	 */
 722	next_tail = buf->rb_sc_tail;
 723	do {
 724		next_tail = rpcrdma_sendctx_next(buf, next_tail);
 725
 726		/* ORDER: item must be accessed _before_ tail is updated */
 727		rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
 728
 729	} while (buf->rb_sc_ctxs[next_tail] != sc);
 730
 731	/* Paired with READ_ONCE */
 732	smp_store_release(&buf->rb_sc_tail, next_tail);
 733
 734	xprt_write_space(&r_xprt->rx_xprt);
 735}
 736
 737static void
 738rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
 739{
 740	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 741	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 742	struct ib_device *device = ep->re_id->device;
 743	unsigned int count;
 744
 745	/* Try to allocate enough to perform one full-sized I/O */
 746	for (count = 0; count < ep->re_max_rdma_segs; count++) {
 747		struct rpcrdma_mr *mr;
 748		int rc;
 749
 750		mr = kzalloc_node(sizeof(*mr), XPRTRDMA_GFP_FLAGS,
 751				  ibdev_to_node(device));
 752		if (!mr)
 753			break;
 754
 755		rc = frwr_mr_init(r_xprt, mr);
 756		if (rc) {
 757			kfree(mr);
 758			break;
 759		}
 760
 761		spin_lock(&buf->rb_lock);
 762		rpcrdma_mr_push(mr, &buf->rb_mrs);
 763		list_add(&mr->mr_all, &buf->rb_all_mrs);
 764		spin_unlock(&buf->rb_lock);
 
 
 
 
 
 
 
 765	}
 766
 767	r_xprt->rx_stats.mrs_allocated += count;
 768	trace_xprtrdma_createmrs(r_xprt, count);
 769}
 770
 771static void
 772rpcrdma_mr_refresh_worker(struct work_struct *work)
 773{
 774	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
 775						  rb_refresh_worker);
 776	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
 777						   rx_buf);
 778
 779	rpcrdma_mrs_create(r_xprt);
 780	xprt_write_space(&r_xprt->rx_xprt);
 781}
 782
 783/**
 784 * rpcrdma_mrs_refresh - Wake the MR refresh worker
 785 * @r_xprt: controlling transport instance
 786 *
 787 */
 788void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
 789{
 790	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 791	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 792
 793	/* If there is no underlying connection, it's no use
 794	 * to wake the refresh worker.
 795	 */
 796	if (ep->re_connect_status != 1)
 797		return;
 798	queue_work(system_highpri_wq, &buf->rb_refresh_worker);
 799}
 800
 801/**
 802 * rpcrdma_req_create - Allocate an rpcrdma_req object
 803 * @r_xprt: controlling r_xprt
 804 * @size: initial size, in bytes, of send and receive buffers
 805 *
 806 * Returns an allocated and fully initialized rpcrdma_req or NULL.
 807 */
 808struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt,
 809				       size_t size)
 810{
 811	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 812	struct rpcrdma_req *req;
 813
 814	req = kzalloc(sizeof(*req), XPRTRDMA_GFP_FLAGS);
 815	if (req == NULL)
 816		goto out1;
 817
 818	req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE);
 819	if (!req->rl_sendbuf)
 820		goto out2;
 821
 822	req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE);
 823	if (!req->rl_recvbuf)
 824		goto out3;
 825
 826	INIT_LIST_HEAD(&req->rl_free_mrs);
 827	INIT_LIST_HEAD(&req->rl_registered);
 828	spin_lock(&buffer->rb_lock);
 829	list_add(&req->rl_all, &buffer->rb_allreqs);
 830	spin_unlock(&buffer->rb_lock);
 
 
 831	return req;
 832
 833out3:
 834	rpcrdma_regbuf_free(req->rl_sendbuf);
 835out2:
 836	kfree(req);
 837out1:
 838	return NULL;
 839}
 840
 841/**
 842 * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
 843 * @r_xprt: controlling transport instance
 844 * @req: rpcrdma_req object to set up
 845 *
 846 * Returns zero on success, and a negative errno on failure.
 847 */
 848int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 849{
 850	struct rpcrdma_regbuf *rb;
 851	size_t maxhdrsize;
 
 
 852
 853	/* Compute maximum header buffer size in bytes */
 854	maxhdrsize = rpcrdma_fixed_maxsz + 3 +
 855		     r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
 856	maxhdrsize *= sizeof(__be32);
 857	rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
 858				  DMA_TO_DEVICE);
 859	if (!rb)
 860		goto out;
 861
 862	if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
 
 
 
 863		goto out_free;
 
 864
 865	req->rl_rdmabuf = rb;
 866	xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
 867	return 0;
 
 
 868
 869out_free:
 870	rpcrdma_regbuf_free(rb);
 871out:
 872	return -ENOMEM;
 873}
 874
 875/* ASSUMPTION: the rb_allreqs list is stable for the duration,
 876 * and thus can be walked without holding rb_lock. Eg. the
 877 * caller is holding the transport send lock to exclude
 878 * device removal or disconnection.
 879 */
 880static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
 881{
 882	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 883	struct rpcrdma_req *req;
 884	int rc;
 
 
 
 
 
 885
 886	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
 887		rc = rpcrdma_req_setup(r_xprt, req);
 888		if (rc)
 889			return rc;
 890	}
 891	return 0;
 892}
 893
 894static void rpcrdma_req_reset(struct rpcrdma_req *req)
 895{
 896	/* Credits are valid for only one connection */
 897	req->rl_slot.rq_cong = 0;
 
 898
 899	rpcrdma_regbuf_free(req->rl_rdmabuf);
 900	req->rl_rdmabuf = NULL;
 
 
 
 
 
 
 
 
 901
 902	rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
 903	rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
 
 
 
 
 
 
 
 
 
 
 
 904
 905	frwr_reset(req);
 
 
 
 906}
 907
 908/* ASSUMPTION: the rb_allreqs list is stable for the duration,
 909 * and thus can be walked without holding rb_lock. Eg. the
 910 * caller is holding the transport send lock to exclude
 911 * device removal or disconnection.
 912 */
 913static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
 914{
 915	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 916	struct rpcrdma_req *req;
 917
 918	list_for_each_entry(req, &buf->rb_allreqs, rl_all)
 919		rpcrdma_req_reset(req);
 
 
 920}
 921
 922static noinline
 923struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
 924				       bool temp)
 925{
 926	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 927	struct rpcrdma_rep *rep;
 928
 929	rep = kzalloc(sizeof(*rep), XPRTRDMA_GFP_FLAGS);
 930	if (rep == NULL)
 931		goto out;
 932
 933	rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep->re_inline_recv,
 934					       DMA_FROM_DEVICE);
 935	if (!rep->rr_rdmabuf)
 936		goto out_free;
 937
 938	if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
 939		goto out_free_regbuf;
 940
 941	rep->rr_cid.ci_completion_id =
 942		atomic_inc_return(&r_xprt->rx_ep->re_completion_ids);
 943
 944	xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
 945		     rdmab_length(rep->rr_rdmabuf));
 946	rep->rr_cqe.done = rpcrdma_wc_receive;
 947	rep->rr_rxprt = r_xprt;
 948	rep->rr_recv_wr.next = NULL;
 949	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
 950	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 951	rep->rr_recv_wr.num_sge = 1;
 952	rep->rr_temp = temp;
 953
 954	spin_lock(&buf->rb_lock);
 955	list_add(&rep->rr_all, &buf->rb_all_reps);
 956	spin_unlock(&buf->rb_lock);
 957	return rep;
 958
 959out_free_regbuf:
 960	rpcrdma_regbuf_free(rep->rr_rdmabuf);
 961out_free:
 962	kfree(rep);
 963out:
 964	return NULL;
 965}
 966
 967static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
 
 968{
 969	rpcrdma_regbuf_free(rep->rr_rdmabuf);
 970	kfree(rep);
 971}
 972
 973static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
 
 974{
 975	struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf;
 976
 977	spin_lock(&buf->rb_lock);
 978	list_del(&rep->rr_all);
 979	spin_unlock(&buf->rb_lock);
 980
 981	rpcrdma_rep_free(rep);
 982}
 983
 984static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
 985{
 986	struct llist_node *node;
 987
 988	/* Calls to llist_del_first are required to be serialized */
 989	node = llist_del_first(&buf->rb_free_reps);
 990	if (!node)
 991		return NULL;
 992	return llist_entry(node, struct rpcrdma_rep, rr_node);
 993}
 994
 995/**
 996 * rpcrdma_rep_put - Release rpcrdma_rep back to free list
 997 * @buf: buffer pool
 998 * @rep: rep to release
 999 *
1000 */
1001void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
1002{
1003	llist_add(&rep->rr_node, &buf->rb_free_reps);
1004}
1005
1006/* Caller must ensure the QP is quiescent (RQ is drained) before
1007 * invoking this function, to guarantee rb_all_reps is not
1008 * changing.
1009 */
1010static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1011{
1012	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1013	struct rpcrdma_rep *rep;
1014
1015	list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
1016		rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1017		rep->rr_temp = true;	/* Mark this rep for destruction */
1018	}
1019}
1020
1021static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1022{
1023	struct rpcrdma_rep *rep;
1024
1025	spin_lock(&buf->rb_lock);
1026	while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
1027					       struct rpcrdma_rep,
1028					       rr_all)) != NULL) {
1029		list_del(&rep->rr_all);
1030		spin_unlock(&buf->rb_lock);
1031
1032		rpcrdma_rep_free(rep);
 
 
 
 
1033
1034		spin_lock(&buf->rb_lock);
1035	}
1036	spin_unlock(&buf->rb_lock);
1037}
1038
1039/**
1040 * rpcrdma_buffer_create - Create initial set of req/rep objects
1041 * @r_xprt: transport instance to (re)initialize
1042 *
1043 * Returns zero on success, otherwise a negative errno.
1044 */
1045int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1046{
1047	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1048	int i, rc;
1049
1050	buf->rb_bc_srv_max_requests = 0;
1051	spin_lock_init(&buf->rb_lock);
1052	INIT_LIST_HEAD(&buf->rb_mrs);
1053	INIT_LIST_HEAD(&buf->rb_all_mrs);
1054	INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1055
1056	INIT_LIST_HEAD(&buf->rb_send_bufs);
1057	INIT_LIST_HEAD(&buf->rb_allreqs);
1058	INIT_LIST_HEAD(&buf->rb_all_reps);
1059
1060	rc = -ENOMEM;
1061	for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
1062		struct rpcrdma_req *req;
1063
1064		req = rpcrdma_req_create(r_xprt,
1065					 RPCRDMA_V1_DEF_INLINE_SIZE * 2);
1066		if (!req)
1067			goto out;
1068		list_add(&req->rl_list, &buf->rb_send_bufs);
1069	}
1070
1071	init_llist_head(&buf->rb_free_reps);
1072
1073	return 0;
1074out:
1075	rpcrdma_buffer_destroy(buf);
1076	return rc;
1077}
1078
1079/**
1080 * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1081 * @req: unused object to be destroyed
1082 *
1083 * Relies on caller holding the transport send lock to protect
1084 * removing req->rl_all from buf->rb_all_reqs safely.
1085 */
1086void rpcrdma_req_destroy(struct rpcrdma_req *req)
1087{
1088	struct rpcrdma_mr *mr;
1089
1090	list_del(&req->rl_all);
1091
1092	while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1093		struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1094
1095		spin_lock(&buf->rb_lock);
1096		list_del(&mr->mr_all);
1097		spin_unlock(&buf->rb_lock);
1098
1099		frwr_mr_release(mr);
1100	}
1101
1102	rpcrdma_regbuf_free(req->rl_recvbuf);
1103	rpcrdma_regbuf_free(req->rl_sendbuf);
1104	rpcrdma_regbuf_free(req->rl_rdmabuf);
1105	kfree(req);
1106}
1107
1108/**
1109 * rpcrdma_mrs_destroy - Release all of a transport's MRs
1110 * @r_xprt: controlling transport instance
1111 *
1112 * Relies on caller holding the transport send lock to protect
1113 * removing mr->mr_list from req->rl_free_mrs safely.
1114 */
1115static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
 
1116{
1117	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1118	struct rpcrdma_mr *mr;
1119
1120	cancel_work_sync(&buf->rb_refresh_worker);
 
 
 
 
 
 
 
 
1121
1122	spin_lock(&buf->rb_lock);
1123	while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1124					      struct rpcrdma_mr,
1125					      mr_all)) != NULL) {
1126		list_del(&mr->mr_list);
1127		list_del(&mr->mr_all);
1128		spin_unlock(&buf->rb_lock);
1129
1130		frwr_mr_release(mr);
1131
1132		spin_lock(&buf->rb_lock);
1133	}
1134	spin_unlock(&buf->rb_lock);
1135}
1136
1137/**
1138 * rpcrdma_buffer_destroy - Release all hw resources
1139 * @buf: root control block for resources
1140 *
1141 * ORDERING: relies on a prior rpcrdma_xprt_drain :
1142 * - No more Send or Receive completions can occur
1143 * - All MRs, reps, and reqs are returned to their free lists
1144 */
1145void
1146rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1147{
1148	rpcrdma_reps_destroy(buf);
 
1149
1150	while (!list_empty(&buf->rb_send_bufs)) {
1151		struct rpcrdma_req *req;
1152
1153		req = list_first_entry(&buf->rb_send_bufs,
1154				       struct rpcrdma_req, rl_list);
1155		list_del(&req->rl_list);
1156		rpcrdma_req_destroy(req);
1157	}
1158}
1159
1160/**
1161 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1162 * @r_xprt: controlling transport
1163 *
1164 * Returns an initialized rpcrdma_mr or NULL if no free
1165 * rpcrdma_mr objects are available.
1166 */
1167struct rpcrdma_mr *
1168rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1169{
1170	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1171	struct rpcrdma_mr *mr;
1172
1173	spin_lock(&buf->rb_lock);
1174	mr = rpcrdma_mr_pop(&buf->rb_mrs);
1175	spin_unlock(&buf->rb_lock);
1176	return mr;
1177}
1178
1179/**
1180 * rpcrdma_reply_put - Put reply buffers back into pool
1181 * @buffers: buffer pool
1182 * @req: object to return
1183 *
1184 */
1185void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1186{
1187	if (req->rl_reply) {
1188		rpcrdma_rep_put(buffers, req->rl_reply);
1189		req->rl_reply = NULL;
1190	}
1191}
1192
1193/**
1194 * rpcrdma_buffer_get - Get a request buffer
1195 * @buffers: Buffer pool from which to obtain a buffer
1196 *
1197 * Returns a fresh rpcrdma_req, or NULL if none are available.
1198 */
1199struct rpcrdma_req *
1200rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1201{
1202	struct rpcrdma_req *req;
1203
1204	spin_lock(&buffers->rb_lock);
1205	req = list_first_entry_or_null(&buffers->rb_send_bufs,
1206				       struct rpcrdma_req, rl_list);
1207	if (req)
1208		list_del_init(&req->rl_list);
1209	spin_unlock(&buffers->rb_lock);
1210	return req;
1211}
1212
1213/**
1214 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1215 * @buffers: buffer pool
1216 * @req: object to return
1217 *
1218 */
1219void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 
 
1220{
1221	rpcrdma_reply_put(buffers, req);
1222
1223	spin_lock(&buffers->rb_lock);
1224	list_add(&req->rl_list, &buffers->rb_send_bufs);
1225	spin_unlock(&buffers->rb_lock);
1226}
1227
1228/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
 
 
 
 
 
 
 
 
1229 *
1230 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1231 * receiving the payload of RDMA RECV operations. During Long Calls
1232 * or Replies they may be registered externally via frwr_map.
 
1233 */
1234static struct rpcrdma_regbuf *
1235rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction)
1236{
1237	struct rpcrdma_regbuf *rb;
 
 
 
 
 
1238
1239	rb = kmalloc(sizeof(*rb), XPRTRDMA_GFP_FLAGS);
1240	if (!rb)
1241		return NULL;
1242	rb->rg_data = kmalloc(size, XPRTRDMA_GFP_FLAGS);
1243	if (!rb->rg_data) {
1244		kfree(rb);
1245		return NULL;
1246	}
1247
1248	rb->rg_device = NULL;
1249	rb->rg_direction = direction;
1250	rb->rg_iov.length = size;
 
1251	return rb;
 
 
 
 
 
1252}
1253
1254/**
1255 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1256 * @rb: regbuf to reallocate
1257 * @size: size of buffer to be allocated, in bytes
1258 * @flags: GFP flags
1259 *
1260 * Returns true if reallocation was successful. If false is
1261 * returned, @rb is left untouched.
1262 */
1263bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
 
1264{
1265	void *buf;
1266
1267	buf = kmalloc(size, flags);
1268	if (!buf)
1269		return false;
1270
1271	rpcrdma_regbuf_dma_unmap(rb);
1272	kfree(rb->rg_data);
1273
1274	rb->rg_data = buf;
1275	rb->rg_iov.length = size;
1276	return true;
1277}
1278
1279/**
1280 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1281 * @r_xprt: controlling transport instance
1282 * @rb: regbuf to be mapped
1283 *
1284 * Returns true if the buffer is now DMA mapped to @r_xprt's device
1285 */
1286bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1287			      struct rpcrdma_regbuf *rb)
1288{
1289	struct ib_device *device = r_xprt->rx_ep->re_id->device;
 
 
 
 
 
 
1290
1291	if (rb->rg_direction == DMA_NONE)
1292		return false;
 
 
 
 
1293
1294	rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1295					    rdmab_length(rb), rb->rg_direction);
1296	if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1297		trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1298		return false;
 
 
 
 
 
 
 
 
 
 
 
 
1299	}
1300
1301	rb->rg_device = device;
1302	rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
1303	return true;
 
 
 
1304}
1305
1306static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
 
 
 
 
 
 
1307{
1308	if (!rb)
1309		return;
 
 
 
 
 
1310
1311	if (!rpcrdma_regbuf_is_mapped(rb))
1312		return;
 
 
1313
1314	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1315			    rb->rg_direction);
1316	rb->rg_device = NULL;
1317}
1318
1319static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1320{
1321	rpcrdma_regbuf_dma_unmap(rb);
1322	if (rb)
1323		kfree(rb->rg_data);
1324	kfree(rb);
1325}
1326
1327/**
1328 * rpcrdma_post_recvs - Refill the Receive Queue
1329 * @r_xprt: controlling transport instance
1330 * @needed: current credit grant
1331 * @temp: mark Receive buffers to be deleted after one use
1332 *
1333 */
1334void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp)
1335{
1336	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1337	struct rpcrdma_ep *ep = r_xprt->rx_ep;
1338	struct ib_recv_wr *wr, *bad_wr;
 
1339	struct rpcrdma_rep *rep;
1340	int count, rc;
 
 
 
 
 
 
 
 
 
 
 
 
1341
1342	rc = 0;
1343	count = 0;
1344
1345	if (likely(ep->re_receive_count > needed))
1346		goto out;
1347	needed -= ep->re_receive_count;
1348	if (!temp)
1349		needed += RPCRDMA_MAX_RECV_BATCH;
1350
1351	if (atomic_inc_return(&ep->re_receiving) > 1)
1352		goto out;
 
 
1353
1354	/* fast path: all needed reps can be found on the free list */
1355	wr = NULL;
1356	while (needed) {
1357		rep = rpcrdma_rep_get_locked(buf);
1358		if (rep && rep->rr_temp) {
1359			rpcrdma_rep_destroy(rep);
1360			continue;
1361		}
1362		if (!rep)
1363			rep = rpcrdma_rep_create(r_xprt, temp);
1364		if (!rep)
1365			break;
1366
1367		rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id;
1368		trace_xprtrdma_post_recv(rep);
1369		rep->rr_recv_wr.next = wr;
1370		wr = &rep->rr_recv_wr;
1371		--needed;
1372		++count;
1373	}
1374	if (!wr)
1375		goto out;
1376
1377	rc = ib_post_recv(ep->re_id->qp, wr,
1378			  (const struct ib_recv_wr **)&bad_wr);
1379	if (rc) {
1380		trace_xprtrdma_post_recvs_err(r_xprt, rc);
1381		for (wr = bad_wr; wr;) {
1382			struct rpcrdma_rep *rep;
1383
1384			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1385			wr = wr->next;
1386			rpcrdma_rep_put(buf, rep);
1387			--count;
1388		}
1389	}
1390	if (atomic_dec_return(&ep->re_receiving) > 0)
1391		complete(&ep->re_done);
1392
1393out:
1394	trace_xprtrdma_post_recvs(r_xprt, count);
1395	ep->re_receive_count += count;
1396	return;
1397}