Linux Audio

Check our new training course

Loading...
v4.17
   1/*
   2 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
   3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   4 *
   5 * This software is available to you under a choice of one of two
   6 * licenses.  You may choose to be licensed under the terms of the GNU
   7 * General Public License (GPL) Version 2, available from the file
   8 * COPYING in the main directory of this source tree, or the BSD-type
   9 * license below:
  10 *
  11 * Redistribution and use in source and binary forms, with or without
  12 * modification, are permitted provided that the following conditions
  13 * are met:
  14 *
  15 *      Redistributions of source code must retain the above copyright
  16 *      notice, this list of conditions and the following disclaimer.
  17 *
  18 *      Redistributions in binary form must reproduce the above
  19 *      copyright notice, this list of conditions and the following
  20 *      disclaimer in the documentation and/or other materials provided
  21 *      with the distribution.
  22 *
  23 *      Neither the name of the Network Appliance, Inc. nor the names of
  24 *      its contributors may be used to endorse or promote products
  25 *      derived from this software without specific prior written
  26 *      permission.
  27 *
  28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39 */
  40
  41/*
  42 * verbs.c
  43 *
  44 * Encapsulates the major functions managing:
  45 *  o adapters
  46 *  o endpoints
  47 *  o connections
  48 *  o buffer memory
  49 */
  50
  51#include <linux/interrupt.h>
  52#include <linux/slab.h>
 
  53#include <linux/sunrpc/addr.h>
  54#include <linux/sunrpc/svc_rdma.h>
  55
  56#include <asm-generic/barrier.h>
  57#include <asm/bitops.h>
  58
  59#include <rdma/ib_cm.h>
  60
  61#include "xprt_rdma.h"
  62
  63/*
  64 * Globals/Macros
  65 */
  66
  67#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  68# define RPCDBG_FACILITY	RPCDBG_TRANS
  69#endif
  70
  71/*
  72 * internal functions
  73 */
  74static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
  75static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
  76static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
  77
  78struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
  79
  80int
  81rpcrdma_alloc_wq(void)
  82{
  83	struct workqueue_struct *recv_wq;
  84
  85	recv_wq = alloc_workqueue("xprtrdma_receive",
  86				  WQ_MEM_RECLAIM | WQ_HIGHPRI,
  87				  0);
  88	if (!recv_wq)
  89		return -ENOMEM;
  90
  91	rpcrdma_receive_wq = recv_wq;
  92	return 0;
  93}
  94
  95void
  96rpcrdma_destroy_wq(void)
  97{
  98	struct workqueue_struct *wq;
  99
 100	if (rpcrdma_receive_wq) {
 101		wq = rpcrdma_receive_wq;
 102		rpcrdma_receive_wq = NULL;
 103		destroy_workqueue(wq);
 104	}
 105}
 106
 107static void
 108rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
 109{
 110	struct rpcrdma_ep *ep = context;
 111	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
 112						   rx_ep);
 113
 114	trace_xprtrdma_qp_error(r_xprt, event);
 115	pr_err("rpcrdma: %s on device %s ep %p\n",
 116	       ib_event_msg(event->event), event->device->name, context);
 117
 
 
 
 118	if (ep->rep_connected == 1) {
 119		ep->rep_connected = -EIO;
 120		rpcrdma_conn_func(ep);
 121		wake_up_all(&ep->rep_connect_wait);
 122	}
 123}
 124
 125/**
 126 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 127 * @cq:	completion queue (ignored)
 128 * @wc:	completed WR
 129 *
 130 */
 131static void
 132rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 133{
 134	struct ib_cqe *cqe = wc->wr_cqe;
 135	struct rpcrdma_sendctx *sc =
 136		container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
 137
 138	/* WARNING: Only wr_cqe and status are reliable at this point */
 139	trace_xprtrdma_wc_send(sc, wc);
 140	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
 141		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
 142		       ib_wc_status_msg(wc->status),
 143		       wc->status, wc->vendor_err);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 144
 145	rpcrdma_sendctx_put_locked(sc);
 
 
 
 
 
 
 146}
 147
 148/**
 149 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
 150 * @cq:	completion queue (ignored)
 151 * @wc:	completed WR
 152 *
 153 */
 154static void
 155rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 156{
 157	struct ib_cqe *cqe = wc->wr_cqe;
 158	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 159					       rr_cqe);
 160
 161	/* WARNING: Only wr_id and status are reliable at this point */
 162	trace_xprtrdma_wc_receive(rep, wc);
 163	if (wc->status != IB_WC_SUCCESS)
 164		goto out_fail;
 165
 166	/* status == SUCCESS means all fields in wc are trustworthy */
 167	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
 168	rep->rr_wc_flags = wc->wc_flags;
 169	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 
 
 170
 171	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
 
 172				   rdmab_addr(rep->rr_rdmabuf),
 173				   wc->byte_len, DMA_FROM_DEVICE);
 
 
 174
 175out_schedule:
 176	rpcrdma_reply_handler(rep);
 177	return;
 178
 179out_fail:
 180	if (wc->status != IB_WC_WR_FLUSH_ERR)
 181		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
 182		       ib_wc_status_msg(wc->status),
 183		       wc->status, wc->vendor_err);
 184	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
 185	goto out_schedule;
 186}
 187
 188static void
 189rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
 190			       struct rdma_conn_param *param)
 191{
 192	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 193	const struct rpcrdma_connect_private *pmsg = param->private_data;
 194	unsigned int rsize, wsize;
 195
 196	/* Default settings for RPC-over-RDMA Version One */
 197	r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
 198	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 199	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
 200
 201	if (pmsg &&
 202	    pmsg->cp_magic == rpcrdma_cmp_magic &&
 203	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
 204		r_xprt->rx_ia.ri_implicit_roundup = true;
 205		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
 206		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
 207	}
 208
 209	if (rsize < cdata->inline_rsize)
 210		cdata->inline_rsize = rsize;
 211	if (wsize < cdata->inline_wsize)
 212		cdata->inline_wsize = wsize;
 213	dprintk("RPC:       %s: max send %u, max recv %u\n",
 214		__func__, cdata->inline_wsize, cdata->inline_rsize);
 215	rpcrdma_set_max_header_sizes(r_xprt);
 216}
 217
 218static int
 219rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 220{
 221	struct rpcrdma_xprt *xprt = id->context;
 222	struct rpcrdma_ia *ia = &xprt->rx_ia;
 223	struct rpcrdma_ep *ep = &xprt->rx_ep;
 
 
 
 
 
 224	int connstate = 0;
 225
 226	trace_xprtrdma_conn_upcall(xprt, event);
 227	switch (event->event) {
 228	case RDMA_CM_EVENT_ADDR_RESOLVED:
 229	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 230		ia->ri_async_rc = 0;
 231		complete(&ia->ri_done);
 232		break;
 233	case RDMA_CM_EVENT_ADDR_ERROR:
 234		ia->ri_async_rc = -EHOSTUNREACH;
 
 
 235		complete(&ia->ri_done);
 236		break;
 237	case RDMA_CM_EVENT_ROUTE_ERROR:
 238		ia->ri_async_rc = -ENETUNREACH;
 
 
 239		complete(&ia->ri_done);
 240		break;
 241	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 242#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 243		pr_info("rpcrdma: removing device %s for %s:%s\n",
 244			ia->ri_device->name,
 245			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt));
 246#endif
 247		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
 248		ep->rep_connected = -ENODEV;
 249		xprt_force_disconnect(&xprt->rx_xprt);
 250		wait_for_completion(&ia->ri_remove_done);
 251
 252		ia->ri_id = NULL;
 253		ia->ri_device = NULL;
 254		/* Return 1 to ensure the core destroys the id. */
 255		return 1;
 256	case RDMA_CM_EVENT_ESTABLISHED:
 257		++xprt->rx_xprt.connect_cookie;
 258		connstate = 1;
 259		rpcrdma_update_connect_private(xprt, &event->param.conn);
 
 
 
 
 
 
 260		goto connected;
 261	case RDMA_CM_EVENT_CONNECT_ERROR:
 262		connstate = -ENOTCONN;
 263		goto connected;
 264	case RDMA_CM_EVENT_UNREACHABLE:
 265		connstate = -ENETDOWN;
 266		goto connected;
 267	case RDMA_CM_EVENT_REJECTED:
 268		dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
 269			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
 270			rdma_reject_msg(id, event->status));
 271		connstate = -ECONNREFUSED;
 272		if (event->status == IB_CM_REJ_STALE_CONN)
 273			connstate = -EAGAIN;
 274		goto connected;
 275	case RDMA_CM_EVENT_DISCONNECTED:
 276		++xprt->rx_xprt.connect_cookie;
 277		connstate = -ECONNABORTED;
 
 
 
 278connected:
 279		xprt->rx_buf.rb_credits = 1;
 
 
 280		ep->rep_connected = connstate;
 281		rpcrdma_conn_func(ep);
 282		wake_up_all(&ep->rep_connect_wait);
 283		/*FALLTHROUGH*/
 284	default:
 285		dprintk("RPC:       %s: %s:%s on %s/%s (ep 0x%p): %s\n",
 286			__func__,
 287			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
 288			ia->ri_device->name, ia->ri_ops->ro_displayname,
 289			ep, rdma_event_msg(event->event));
 290		break;
 291	}
 292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 293	return 0;
 294}
 295
 
 
 
 
 
 
 
 
 296static struct rdma_cm_id *
 297rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
 
 298{
 299	unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
 300	struct rdma_cm_id *id;
 301	int rc;
 302
 303	trace_xprtrdma_conn_start(xprt);
 304
 305	init_completion(&ia->ri_done);
 306	init_completion(&ia->ri_remove_done);
 307
 308	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
 309			    IB_QPT_RC);
 310	if (IS_ERR(id)) {
 311		rc = PTR_ERR(id);
 312		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
 313			__func__, rc);
 314		return id;
 315	}
 316
 317	ia->ri_async_rc = -ETIMEDOUT;
 318	rc = rdma_resolve_addr(id, NULL,
 319			       (struct sockaddr *)&xprt->rx_xprt.addr,
 320			       RDMA_RESOLVE_TIMEOUT);
 321	if (rc) {
 322		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
 323			__func__, rc);
 324		goto out;
 325	}
 326	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 327	if (rc < 0) {
 328		trace_xprtrdma_conn_tout(xprt);
 329		goto out;
 330	}
 331
 
 
 
 
 
 
 
 
 
 
 332	rc = ia->ri_async_rc;
 333	if (rc)
 334		goto out;
 335
 336	ia->ri_async_rc = -ETIMEDOUT;
 337	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 338	if (rc) {
 339		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
 340			__func__, rc);
 341		goto out;
 342	}
 343	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
 344	if (rc < 0) {
 345		trace_xprtrdma_conn_tout(xprt);
 346		goto out;
 347	}
 
 
 348	rc = ia->ri_async_rc;
 349	if (rc)
 350		goto out;
 351
 352	return id;
 353
 
 354out:
 355	rdma_destroy_id(id);
 356	return ERR_PTR(rc);
 357}
 358
 359/*
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 360 * Exported functions.
 361 */
 362
 363/**
 364 * rpcrdma_ia_open - Open and initialize an Interface Adapter.
 365 * @xprt: transport with IA to (re)initialize
 366 *
 367 * Returns 0 on success, negative errno if an appropriate
 368 * Interface Adapter could not be found and opened.
 369 */
 370int
 371rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
 372{
 373	struct rpcrdma_ia *ia = &xprt->rx_ia;
 374	int rc;
 375
 376	ia->ri_id = rpcrdma_create_id(xprt, ia);
 
 
 377	if (IS_ERR(ia->ri_id)) {
 378		rc = PTR_ERR(ia->ri_id);
 379		goto out_err;
 380	}
 381	ia->ri_device = ia->ri_id->device;
 382
 383	ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
 384	if (IS_ERR(ia->ri_pd)) {
 385		rc = PTR_ERR(ia->ri_pd);
 386		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
 387		goto out_err;
 
 388	}
 389
 390	switch (xprt_rdma_memreg_strategy) {
 391	case RPCRDMA_FRWR:
 392		if (frwr_is_supported(ia)) {
 393			ia->ri_ops = &rpcrdma_frwr_memreg_ops;
 394			break;
 
 
 395		}
 396		/*FALLTHROUGH*/
 397	case RPCRDMA_MTHCAFMR:
 398		if (fmr_is_supported(ia)) {
 399			ia->ri_ops = &rpcrdma_fmr_memreg_ops;
 400			break;
 
 
 401		}
 402		/*FALLTHROUGH*/
 
 
 
 
 
 
 
 
 
 
 
 403	default:
 404		pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
 405		       ia->ri_device->name, xprt_rdma_memreg_strategy);
 406		rc = -EINVAL;
 407		goto out_err;
 408	}
 
 
 409
 
 410	return 0;
 411
 412out_err:
 413	rpcrdma_ia_close(ia);
 414	return rc;
 415}
 416
 417/**
 418 * rpcrdma_ia_remove - Handle device driver unload
 419 * @ia: interface adapter being removed
 420 *
 421 * Divest transport H/W resources associated with this adapter,
 422 * but allow it to be restored later.
 423 */
 424void
 425rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 426{
 427	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 428						   rx_ia);
 429	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 430	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 431	struct rpcrdma_req *req;
 432	struct rpcrdma_rep *rep;
 433
 434	cancel_delayed_work_sync(&buf->rb_refresh_worker);
 435
 436	/* This is similar to rpcrdma_ep_destroy, but:
 437	 * - Don't cancel the connect worker.
 438	 * - Don't call rpcrdma_ep_disconnect, which waits
 439	 *   for another conn upcall, which will deadlock.
 440	 * - rdma_disconnect is unneeded, the underlying
 441	 *   connection is already gone.
 442	 */
 443	if (ia->ri_id->qp) {
 444		ib_drain_qp(ia->ri_id->qp);
 445		rdma_destroy_qp(ia->ri_id);
 446		ia->ri_id->qp = NULL;
 447	}
 448	ib_free_cq(ep->rep_attr.recv_cq);
 449	ep->rep_attr.recv_cq = NULL;
 450	ib_free_cq(ep->rep_attr.send_cq);
 451	ep->rep_attr.send_cq = NULL;
 452
 453	/* The ULP is responsible for ensuring all DMA
 454	 * mappings and MRs are gone.
 455	 */
 456	list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
 457		rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
 458	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
 459		rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
 460		rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
 461		rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
 462	}
 463	rpcrdma_mrs_destroy(buf);
 464	ib_dealloc_pd(ia->ri_pd);
 465	ia->ri_pd = NULL;
 466
 467	/* Allow waiters to continue */
 468	complete(&ia->ri_remove_done);
 469
 470	trace_xprtrdma_remove(r_xprt);
 471}
 472
 473/**
 474 * rpcrdma_ia_close - Clean up/close an IA.
 475 * @ia: interface adapter to close
 476 *
 477 */
 478void
 479rpcrdma_ia_close(struct rpcrdma_ia *ia)
 480{
 
 481	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 482		if (ia->ri_id->qp)
 483			rdma_destroy_qp(ia->ri_id);
 484		rdma_destroy_id(ia->ri_id);
 
 485	}
 486	ia->ri_id = NULL;
 487	ia->ri_device = NULL;
 488
 489	/* If the pd is still busy, xprtrdma missed freeing a resource */
 490	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
 491		ib_dealloc_pd(ia->ri_pd);
 492	ia->ri_pd = NULL;
 493}
 494
 495/*
 496 * Create unconnected endpoint.
 497 */
 498int
 499rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 500		  struct rpcrdma_create_data_internal *cdata)
 501{
 502	struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
 503	unsigned int max_qp_wr, max_sge;
 504	struct ib_cq *sendcq, *recvcq;
 
 505	int rc;
 506
 507	max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge,
 508			RPCRDMA_MAX_SEND_SGES);
 509	if (max_sge < RPCRDMA_MIN_SEND_SGES) {
 510		pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
 511		return -ENOMEM;
 512	}
 513	ia->ri_max_send_sges = max_sge;
 514
 515	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
 516		dprintk("RPC:       %s: insufficient wqe's available\n",
 517			__func__);
 518		return -ENOMEM;
 519	}
 520	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
 521
 522	/* check provider's send/recv wr limits */
 523	if (cdata->max_requests > max_qp_wr)
 524		cdata->max_requests = max_qp_wr;
 525
 526	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 527	ep->rep_attr.qp_context = ep;
 528	ep->rep_attr.srq = NULL;
 529	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 530	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
 531	ep->rep_attr.cap.max_send_wr += 1;	/* drain cqe */
 532	rc = ia->ri_ops->ro_open(ia, ep, cdata);
 533	if (rc)
 534		return rc;
 535	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
 536	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 537	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
 538	ep->rep_attr.cap.max_send_sge = max_sge;
 539	ep->rep_attr.cap.max_recv_sge = 1;
 540	ep->rep_attr.cap.max_inline_data = 0;
 541	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 542	ep->rep_attr.qp_type = IB_QPT_RC;
 543	ep->rep_attr.port_num = ~0;
 544
 545	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 546		"iovs: send %d recv %d\n",
 547		__func__,
 548		ep->rep_attr.cap.max_send_wr,
 549		ep->rep_attr.cap.max_recv_wr,
 550		ep->rep_attr.cap.max_send_sge,
 551		ep->rep_attr.cap.max_recv_sge);
 552
 553	/* set trigger for requesting send completion */
 554	ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
 555				   cdata->max_requests >> 2);
 556	ep->rep_send_count = ep->rep_send_batch;
 
 557	init_waitqueue_head(&ep->rep_connect_wait);
 558	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 559
 560	sendcq = ib_alloc_cq(ia->ri_device, NULL,
 561			     ep->rep_attr.cap.max_send_wr + 1,
 562			     1, IB_POLL_WORKQUEUE);
 563	if (IS_ERR(sendcq)) {
 564		rc = PTR_ERR(sendcq);
 565		dprintk("RPC:       %s: failed to create send CQ: %i\n",
 566			__func__, rc);
 567		goto out1;
 568	}
 569
 570	recvcq = ib_alloc_cq(ia->ri_device, NULL,
 571			     ep->rep_attr.cap.max_recv_wr + 1,
 572			     0, IB_POLL_WORKQUEUE);
 573	if (IS_ERR(recvcq)) {
 574		rc = PTR_ERR(recvcq);
 575		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
 576			__func__, rc);
 577		goto out2;
 578	}
 579
 580	ep->rep_attr.send_cq = sendcq;
 581	ep->rep_attr.recv_cq = recvcq;
 582
 583	/* Initialize cma parameters */
 584	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
 585
 586	/* Prepare RDMA-CM private message */
 587	pmsg->cp_magic = rpcrdma_cmp_magic;
 588	pmsg->cp_version = RPCRDMA_CMP_VERSION;
 589	pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
 590	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
 591	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
 592	ep->rep_remote_cma.private_data = pmsg;
 593	ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
 594
 595	/* Client offers RDMA Read but does not initiate */
 596	ep->rep_remote_cma.initiator_depth = 0;
 597	ep->rep_remote_cma.responder_resources =
 598		min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
 599
 600	/* Limit transport retries so client can detect server
 601	 * GID changes quickly. RPC layer handles re-establishing
 602	 * transport connection and retransmission.
 603	 */
 604	ep->rep_remote_cma.retry_count = 6;
 605
 606	/* RPC-over-RDMA handles its own flow control. In addition,
 607	 * make all RNR NAKs visible so we know that RPC-over-RDMA
 608	 * flow control is working correctly (no NAKs should be seen).
 609	 */
 610	ep->rep_remote_cma.flow_control = 0;
 611	ep->rep_remote_cma.rnr_retry_count = 0;
 612
 613	return 0;
 614
 615out2:
 616	ib_free_cq(sendcq);
 617out1:
 
 
 618	return rc;
 619}
 620
 621/*
 622 * rpcrdma_ep_destroy
 623 *
 624 * Disconnect and destroy endpoint. After this, the only
 625 * valid operations on the ep are to free it (if dynamically
 626 * allocated) or re-create it.
 627 */
 628void
 629rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 630{
 
 
 
 
 
 631	cancel_delayed_work_sync(&ep->rep_connect_worker);
 632
 633	if (ia->ri_id && ia->ri_id->qp) {
 634		rpcrdma_ep_disconnect(ep, ia);
 
 
 
 
 
 635		rdma_destroy_qp(ia->ri_id);
 636		ia->ri_id->qp = NULL;
 637	}
 638
 639	if (ep->rep_attr.recv_cq)
 640		ib_free_cq(ep->rep_attr.recv_cq);
 641	if (ep->rep_attr.send_cq)
 642		ib_free_cq(ep->rep_attr.send_cq);
 643}
 644
 645/* Re-establish a connection after a device removal event.
 646 * Unlike a normal reconnection, a fresh PD and a new set
 647 * of MRs and buffers is needed.
 648 */
 649static int
 650rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
 651			 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 652{
 653	int rc, err;
 654
 655	trace_xprtrdma_reinsert(r_xprt);
 656
 657	rc = -EHOSTUNREACH;
 658	if (rpcrdma_ia_open(r_xprt))
 659		goto out1;
 660
 661	rc = -ENOMEM;
 662	err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
 663	if (err) {
 664		pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
 665		goto out2;
 666	}
 667
 668	rc = -ENETUNREACH;
 669	err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
 670	if (err) {
 671		pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
 672		goto out3;
 673	}
 674
 675	rpcrdma_mrs_create(r_xprt);
 676	return 0;
 677
 678out3:
 679	rpcrdma_ep_destroy(ep, ia);
 680out2:
 681	rpcrdma_ia_close(ia);
 682out1:
 683	return rc;
 684}
 685
 686static int
 687rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
 688		     struct rpcrdma_ia *ia)
 689{
 690	struct rdma_cm_id *id, *old;
 691	int err, rc;
 692
 693	trace_xprtrdma_reconnect(r_xprt);
 694
 695	rpcrdma_ep_disconnect(ep, ia);
 696
 697	rc = -EHOSTUNREACH;
 698	id = rpcrdma_create_id(r_xprt, ia);
 699	if (IS_ERR(id))
 700		goto out;
 701
 702	/* As long as the new ID points to the same device as the
 703	 * old ID, we can reuse the transport's existing PD and all
 704	 * previously allocated MRs. Also, the same device means
 705	 * the transport's previous DMA mappings are still valid.
 706	 *
 707	 * This is a sanity check only. There should be no way these
 708	 * point to two different devices here.
 709	 */
 710	old = id;
 711	rc = -ENETUNREACH;
 712	if (ia->ri_device != id->device) {
 713		pr_err("rpcrdma: can't reconnect on different device!\n");
 714		goto out_destroy;
 715	}
 716
 717	err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
 718	if (err) {
 719		dprintk("RPC:       %s: rdma_create_qp returned %d\n",
 720			__func__, err);
 721		goto out_destroy;
 722	}
 723
 724	/* Atomically replace the transport's ID and QP. */
 725	rc = 0;
 726	old = ia->ri_id;
 727	ia->ri_id = id;
 728	rdma_destroy_qp(old);
 729
 730out_destroy:
 731	rdma_destroy_id(old);
 732out:
 733	return rc;
 734}
 735
 736/*
 737 * Connect unconnected endpoint.
 738 */
 739int
 740rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 741{
 742	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
 743						   rx_ia);
 744	unsigned int extras;
 745	int rc;
 746
 
 
 747retry:
 748	switch (ep->rep_connected) {
 749	case 0:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 750		dprintk("RPC:       %s: connecting...\n", __func__);
 751		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
 752		if (rc) {
 753			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 754				__func__, rc);
 755			rc = -ENETUNREACH;
 756			goto out_noupdate;
 757		}
 758		break;
 759	case -ENODEV:
 760		rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
 761		if (rc)
 762			goto out_noupdate;
 763		break;
 764	default:
 765		rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
 766		if (rc)
 767			goto out;
 768	}
 769
 770	ep->rep_connected = 0;
 771
 772	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 773	if (rc) {
 774		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
 775				__func__, rc);
 776		goto out;
 777	}
 778
 779	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 
 
 
 
 
 
 
 
 
 
 
 
 780	if (ep->rep_connected <= 0) {
 781		if (ep->rep_connected == -EAGAIN)
 
 
 
 
 
 
 
 
 
 782			goto retry;
 
 783		rc = ep->rep_connected;
 784		goto out;
 785	}
 
 786
 787	dprintk("RPC:       %s: connected\n", __func__);
 788	extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
 789	if (extras)
 790		rpcrdma_ep_post_extra_recv(r_xprt, extras);
 
 
 
 
 
 
 
 
 
 
 791
 792out:
 793	if (rc)
 794		ep->rep_connected = rc;
 795
 796out_noupdate:
 797	return rc;
 798}
 799
 800/*
 801 * rpcrdma_ep_disconnect
 802 *
 803 * This is separate from destroy to facilitate the ability
 804 * to reconnect without recreating the endpoint.
 805 *
 806 * This call is not reentrant, and must not be made in parallel
 807 * on the same endpoint.
 808 */
 809void
 810rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 811{
 812	int rc;
 813
 
 814	rc = rdma_disconnect(ia->ri_id);
 815	if (!rc)
 816		/* returns without wait if not connected */
 817		wait_event_interruptible(ep->rep_connect_wait,
 818							ep->rep_connected != 1);
 819	else
 
 
 
 820		ep->rep_connected = rc;
 821	trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
 822					       rx_ep), rc);
 823
 824	ib_drain_qp(ia->ri_id->qp);
 825}
 826
 827/* Fixed-size circular FIFO queue. This implementation is wait-free and
 828 * lock-free.
 829 *
 830 * Consumer is the code path that posts Sends. This path dequeues a
 831 * sendctx for use by a Send operation. Multiple consumer threads
 832 * are serialized by the RPC transport lock, which allows only one
 833 * ->send_request call at a time.
 834 *
 835 * Producer is the code path that handles Send completions. This path
 836 * enqueues a sendctx that has been completed. Multiple producer
 837 * threads are serialized by the ib_poll_cq() function.
 838 */
 839
 840/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
 841 * queue activity, and ib_drain_qp has flushed all remaining Send
 842 * requests.
 843 */
 844static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
 845{
 846	unsigned long i;
 847
 848	for (i = 0; i <= buf->rb_sc_last; i++)
 849		kfree(buf->rb_sc_ctxs[i]);
 850	kfree(buf->rb_sc_ctxs);
 851}
 852
 853static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
 854{
 855	struct rpcrdma_sendctx *sc;
 856
 857	sc = kzalloc(sizeof(*sc) +
 858		     ia->ri_max_send_sges * sizeof(struct ib_sge),
 859		     GFP_KERNEL);
 860	if (!sc)
 861		return NULL;
 862
 863	sc->sc_wr.wr_cqe = &sc->sc_cqe;
 864	sc->sc_wr.sg_list = sc->sc_sges;
 865	sc->sc_wr.opcode = IB_WR_SEND;
 866	sc->sc_cqe.done = rpcrdma_wc_send;
 867	return sc;
 868}
 869
 870static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
 871{
 872	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 873	struct rpcrdma_sendctx *sc;
 874	unsigned long i;
 875
 876	/* Maximum number of concurrent outstanding Send WRs. Capping
 877	 * the circular queue size stops Send Queue overflow by causing
 878	 * the ->send_request call to fail temporarily before too many
 879	 * Sends are posted.
 880	 */
 881	i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
 882	dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
 883	buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
 884	if (!buf->rb_sc_ctxs)
 885		return -ENOMEM;
 886
 887	buf->rb_sc_last = i - 1;
 888	for (i = 0; i <= buf->rb_sc_last; i++) {
 889		sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
 890		if (!sc)
 891			goto out_destroy;
 892
 893		sc->sc_xprt = r_xprt;
 894		buf->rb_sc_ctxs[i] = sc;
 895	}
 896
 897	return 0;
 898
 899out_destroy:
 900	rpcrdma_sendctxs_destroy(buf);
 901	return -ENOMEM;
 902}
 903
 904/* The sendctx queue is not guaranteed to have a size that is a
 905 * power of two, thus the helpers in circ_buf.h cannot be used.
 906 * The other option is to use modulus (%), which can be expensive.
 907 */
 908static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
 909					  unsigned long item)
 910{
 911	return likely(item < buf->rb_sc_last) ? item + 1 : 0;
 912}
 913
 914/**
 915 * rpcrdma_sendctx_get_locked - Acquire a send context
 916 * @buf: transport buffers from which to acquire an unused context
 917 *
 918 * Returns pointer to a free send completion context; or NULL if
 919 * the queue is empty.
 920 *
 921 * Usage: Called to acquire an SGE array before preparing a Send WR.
 922 *
 923 * The caller serializes calls to this function (per rpcrdma_buffer),
 924 * and provides an effective memory barrier that flushes the new value
 925 * of rb_sc_head.
 926 */
 927struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
 928{
 929	struct rpcrdma_xprt *r_xprt;
 930	struct rpcrdma_sendctx *sc;
 931	unsigned long next_head;
 932
 933	next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
 934
 935	if (next_head == READ_ONCE(buf->rb_sc_tail))
 936		goto out_emptyq;
 937
 938	/* ORDER: item must be accessed _before_ head is updated */
 939	sc = buf->rb_sc_ctxs[next_head];
 940
 941	/* Releasing the lock in the caller acts as a memory
 942	 * barrier that flushes rb_sc_head.
 943	 */
 944	buf->rb_sc_head = next_head;
 945
 946	return sc;
 947
 948out_emptyq:
 949	/* The queue is "empty" if there have not been enough Send
 950	 * completions recently. This is a sign the Send Queue is
 951	 * backing up. Cause the caller to pause and try again.
 952	 */
 953	dprintk("RPC:       %s: empty sendctx queue\n", __func__);
 954	r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
 955	r_xprt->rx_stats.empty_sendctx_q++;
 956	return NULL;
 957}
 958
 959/**
 960 * rpcrdma_sendctx_put_locked - Release a send context
 961 * @sc: send context to release
 962 *
 963 * Usage: Called from Send completion to return a sendctxt
 964 * to the queue.
 965 *
 966 * The caller serializes calls to this function (per rpcrdma_buffer).
 967 */
 968void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
 969{
 970	struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
 971	unsigned long next_tail;
 972
 973	/* Unmap SGEs of previously completed by unsignaled
 974	 * Sends by walking up the queue until @sc is found.
 975	 */
 976	next_tail = buf->rb_sc_tail;
 977	do {
 978		next_tail = rpcrdma_sendctx_next(buf, next_tail);
 979
 980		/* ORDER: item must be accessed _before_ tail is updated */
 981		rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
 982
 983	} while (buf->rb_sc_ctxs[next_tail] != sc);
 984
 985	/* Paired with READ_ONCE */
 986	smp_store_release(&buf->rb_sc_tail, next_tail);
 987}
 988
 989static void
 990rpcrdma_mr_recovery_worker(struct work_struct *work)
 991{
 992	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
 993						  rb_recovery_worker.work);
 994	struct rpcrdma_mr *mr;
 995
 996	spin_lock(&buf->rb_recovery_lock);
 997	while (!list_empty(&buf->rb_stale_mrs)) {
 998		mr = rpcrdma_mr_pop(&buf->rb_stale_mrs);
 999		spin_unlock(&buf->rb_recovery_lock);
1000
1001		trace_xprtrdma_recover_mr(mr);
1002		mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr);
1003
1004		spin_lock(&buf->rb_recovery_lock);
1005	}
1006	spin_unlock(&buf->rb_recovery_lock);
1007}
1008
1009void
1010rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr)
1011{
1012	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1013	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1014
1015	spin_lock(&buf->rb_recovery_lock);
1016	rpcrdma_mr_push(mr, &buf->rb_stale_mrs);
1017	spin_unlock(&buf->rb_recovery_lock);
1018
1019	schedule_delayed_work(&buf->rb_recovery_worker, 0);
1020}
1021
1022static void
1023rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
1024{
1025	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1026	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1027	unsigned int count;
1028	LIST_HEAD(free);
1029	LIST_HEAD(all);
1030
1031	for (count = 0; count < 3; count++) {
1032		struct rpcrdma_mr *mr;
1033		int rc;
1034
1035		mr = kzalloc(sizeof(*mr), GFP_KERNEL);
1036		if (!mr)
1037			break;
1038
1039		rc = ia->ri_ops->ro_init_mr(ia, mr);
1040		if (rc) {
1041			kfree(mr);
1042			break;
1043		}
1044
1045		mr->mr_xprt = r_xprt;
1046
1047		list_add(&mr->mr_list, &free);
1048		list_add(&mr->mr_all, &all);
1049	}
1050
1051	spin_lock(&buf->rb_mrlock);
1052	list_splice(&free, &buf->rb_mrs);
1053	list_splice(&all, &buf->rb_all);
1054	r_xprt->rx_stats.mrs_allocated += count;
1055	spin_unlock(&buf->rb_mrlock);
1056	trace_xprtrdma_createmrs(r_xprt, count);
1057
1058	xprt_write_space(&r_xprt->rx_xprt);
1059}
1060
1061static void
1062rpcrdma_mr_refresh_worker(struct work_struct *work)
1063{
1064	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
1065						  rb_refresh_worker.work);
1066	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1067						   rx_buf);
1068
1069	rpcrdma_mrs_create(r_xprt);
1070}
1071
1072struct rpcrdma_req *
1073rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1074{
1075	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1076	struct rpcrdma_regbuf *rb;
1077	struct rpcrdma_req *req;
1078
1079	req = kzalloc(sizeof(*req), GFP_KERNEL);
1080	if (req == NULL)
1081		return ERR_PTR(-ENOMEM);
1082
1083	rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
1084				  DMA_TO_DEVICE, GFP_KERNEL);
1085	if (IS_ERR(rb)) {
1086		kfree(req);
1087		return ERR_PTR(-ENOMEM);
1088	}
1089	req->rl_rdmabuf = rb;
1090	xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
1091	req->rl_buffer = buffer;
1092	INIT_LIST_HEAD(&req->rl_registered);
1093
1094	spin_lock(&buffer->rb_reqslock);
1095	list_add(&req->rl_all, &buffer->rb_allreqs);
1096	spin_unlock(&buffer->rb_reqslock);
 
 
1097	return req;
1098}
1099
1100/**
1101 * rpcrdma_create_rep - Allocate an rpcrdma_rep object
1102 * @r_xprt: controlling transport
1103 *
1104 * Returns 0 on success or a negative errno on failure.
1105 */
1106int
1107rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1108{
1109	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1110	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1111	struct rpcrdma_rep *rep;
1112	int rc;
1113
1114	rc = -ENOMEM;
1115	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1116	if (rep == NULL)
1117		goto out;
1118
1119	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
1120					       DMA_FROM_DEVICE, GFP_KERNEL);
1121	if (IS_ERR(rep->rr_rdmabuf)) {
1122		rc = PTR_ERR(rep->rr_rdmabuf);
1123		goto out_free;
1124	}
1125	xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
1126		     rdmab_length(rep->rr_rdmabuf));
1127
1128	rep->rr_cqe.done = rpcrdma_wc_receive;
 
1129	rep->rr_rxprt = r_xprt;
1130	INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
1131	rep->rr_recv_wr.next = NULL;
1132	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1133	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1134	rep->rr_recv_wr.num_sge = 1;
1135
1136	spin_lock(&buf->rb_lock);
1137	list_add(&rep->rr_list, &buf->rb_recv_bufs);
1138	spin_unlock(&buf->rb_lock);
1139	return 0;
1140
1141out_free:
1142	kfree(rep);
1143out:
1144	dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1145		__func__, rc);
1146	return rc;
1147}
1148
1149int
1150rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1151{
1152	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 
1153	int i, rc;
1154
1155	buf->rb_max_requests = r_xprt->rx_data.max_requests;
1156	buf->rb_bc_srv_max_requests = 0;
1157	spin_lock_init(&buf->rb_mrlock);
1158	spin_lock_init(&buf->rb_lock);
1159	spin_lock_init(&buf->rb_recovery_lock);
1160	INIT_LIST_HEAD(&buf->rb_mrs);
1161	INIT_LIST_HEAD(&buf->rb_all);
1162	INIT_LIST_HEAD(&buf->rb_stale_mrs);
1163	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
1164			  rpcrdma_mr_refresh_worker);
1165	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
1166			  rpcrdma_mr_recovery_worker);
1167
1168	rpcrdma_mrs_create(r_xprt);
 
 
1169
1170	INIT_LIST_HEAD(&buf->rb_send_bufs);
1171	INIT_LIST_HEAD(&buf->rb_allreqs);
1172	spin_lock_init(&buf->rb_reqslock);
1173	for (i = 0; i < buf->rb_max_requests; i++) {
1174		struct rpcrdma_req *req;
1175
1176		req = rpcrdma_create_req(r_xprt);
1177		if (IS_ERR(req)) {
1178			dprintk("RPC:       %s: request buffer %d alloc"
1179				" failed\n", __func__, i);
1180			rc = PTR_ERR(req);
1181			goto out;
1182		}
1183		list_add(&req->rl_list, &buf->rb_send_bufs);
 
1184	}
1185
1186	INIT_LIST_HEAD(&buf->rb_recv_bufs);
1187	for (i = 0; i <= buf->rb_max_requests; i++) {
1188		rc = rpcrdma_create_rep(r_xprt);
1189		if (rc)
 
 
 
 
 
1190			goto out;
 
 
1191	}
1192
1193	rc = rpcrdma_sendctxs_create(r_xprt);
1194	if (rc)
1195		goto out;
1196
1197	return 0;
1198out:
1199	rpcrdma_buffer_destroy(buf);
1200	return rc;
1201}
1202
1203static struct rpcrdma_req *
1204rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
1205{
1206	struct rpcrdma_req *req;
1207
1208	req = list_first_entry(&buf->rb_send_bufs,
1209			       struct rpcrdma_req, rl_list);
1210	list_del_init(&req->rl_list);
1211	return req;
1212}
1213
1214static struct rpcrdma_rep *
1215rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
1216{
1217	struct rpcrdma_rep *rep;
1218
1219	rep = list_first_entry(&buf->rb_recv_bufs,
1220			       struct rpcrdma_rep, rr_list);
1221	list_del(&rep->rr_list);
1222	return rep;
1223}
1224
1225static void
1226rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
1227{
1228	rpcrdma_free_regbuf(rep->rr_rdmabuf);
1229	kfree(rep);
1230}
1231
1232void
1233rpcrdma_destroy_req(struct rpcrdma_req *req)
1234{
1235	rpcrdma_free_regbuf(req->rl_recvbuf);
1236	rpcrdma_free_regbuf(req->rl_sendbuf);
1237	rpcrdma_free_regbuf(req->rl_rdmabuf);
1238	kfree(req);
1239}
1240
1241static void
1242rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
1243{
1244	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1245						   rx_buf);
1246	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1247	struct rpcrdma_mr *mr;
1248	unsigned int count;
1249
1250	count = 0;
1251	spin_lock(&buf->rb_mrlock);
1252	while (!list_empty(&buf->rb_all)) {
1253		mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
1254		list_del(&mr->mr_all);
1255
1256		spin_unlock(&buf->rb_mrlock);
1257
1258		/* Ensure MW is not on any rl_registered list */
1259		if (!list_empty(&mr->mr_list))
1260			list_del(&mr->mr_list);
1261
1262		ia->ri_ops->ro_release_mr(mr);
1263		count++;
1264		spin_lock(&buf->rb_mrlock);
1265	}
1266	spin_unlock(&buf->rb_mrlock);
1267	r_xprt->rx_stats.mrs_allocated = 0;
1268
1269	dprintk("RPC:       %s: released %u MRs\n", __func__, count);
1270}
1271
1272void
1273rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1274{
1275	cancel_delayed_work_sync(&buf->rb_recovery_worker);
1276	cancel_delayed_work_sync(&buf->rb_refresh_worker);
1277
1278	rpcrdma_sendctxs_destroy(buf);
1279
1280	while (!list_empty(&buf->rb_recv_bufs)) {
1281		struct rpcrdma_rep *rep;
1282
1283		rep = rpcrdma_buffer_get_rep_locked(buf);
1284		rpcrdma_destroy_rep(rep);
1285	}
1286	buf->rb_send_count = 0;
1287
1288	spin_lock(&buf->rb_reqslock);
1289	while (!list_empty(&buf->rb_allreqs)) {
1290		struct rpcrdma_req *req;
1291
1292		req = list_first_entry(&buf->rb_allreqs,
1293				       struct rpcrdma_req, rl_all);
1294		list_del(&req->rl_all);
1295
1296		spin_unlock(&buf->rb_reqslock);
1297		rpcrdma_destroy_req(req);
1298		spin_lock(&buf->rb_reqslock);
1299	}
1300	spin_unlock(&buf->rb_reqslock);
1301	buf->rb_recv_count = 0;
1302
1303	rpcrdma_mrs_destroy(buf);
1304}
1305
1306/**
1307 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1308 * @r_xprt: controlling transport
1309 *
1310 * Returns an initialized rpcrdma_mr or NULL if no free
1311 * rpcrdma_mr objects are available.
1312 */
1313struct rpcrdma_mr *
1314rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1315{
1316	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1317	struct rpcrdma_mr *mr = NULL;
1318
1319	spin_lock(&buf->rb_mrlock);
1320	if (!list_empty(&buf->rb_mrs))
1321		mr = rpcrdma_mr_pop(&buf->rb_mrs);
1322	spin_unlock(&buf->rb_mrlock);
1323
1324	if (!mr)
1325		goto out_nomrs;
1326	return mr;
1327
1328out_nomrs:
1329	trace_xprtrdma_nomrs(r_xprt);
1330	if (r_xprt->rx_ep.rep_connected != -ENODEV)
1331		schedule_delayed_work(&buf->rb_refresh_worker, 0);
1332
1333	/* Allow the reply handler and refresh worker to run */
1334	cond_resched();
1335
1336	return NULL;
1337}
1338
1339static void
1340__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
1341{
1342	spin_lock(&buf->rb_mrlock);
1343	rpcrdma_mr_push(mr, &buf->rb_mrs);
1344	spin_unlock(&buf->rb_mrlock);
1345}
1346
1347/**
1348 * rpcrdma_mr_put - Release an rpcrdma_mr object
1349 * @mr: object to release
1350 *
1351 */
1352void
1353rpcrdma_mr_put(struct rpcrdma_mr *mr)
1354{
1355	__rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
1356}
1357
1358/**
1359 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
1360 * @mr: object to release
1361 *
1362 */
1363void
1364rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
1365{
1366	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1367
1368	trace_xprtrdma_dma_unmap(mr);
1369	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
1370			mr->mr_sg, mr->mr_nents, mr->mr_dir);
1371	__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
1372}
1373
1374static struct rpcrdma_rep *
1375rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
1376{
1377	/* If an RPC previously completed without a reply (say, a
1378	 * credential problem or a soft timeout occurs) then hold off
1379	 * on supplying more Receive buffers until the number of new
1380	 * pending RPCs catches up to the number of posted Receives.
1381	 */
1382	if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
1383		return NULL;
1384
1385	if (unlikely(list_empty(&buffers->rb_recv_bufs)))
1386		return NULL;
1387	buffers->rb_recv_count++;
1388	return rpcrdma_buffer_get_rep_locked(buffers);
1389}
1390
1391/*
1392 * Get a set of request/reply buffers.
1393 *
1394 * Reply buffer (if available) is attached to send buffer upon return.
1395 */
1396struct rpcrdma_req *
1397rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1398{
1399	struct rpcrdma_req *req;
1400
1401	spin_lock(&buffers->rb_lock);
1402	if (list_empty(&buffers->rb_send_bufs))
1403		goto out_reqbuf;
1404	buffers->rb_send_count++;
1405	req = rpcrdma_buffer_get_req_locked(buffers);
1406	req->rl_reply = rpcrdma_buffer_get_rep(buffers);
 
 
1407	spin_unlock(&buffers->rb_lock);
1408
1409	return req;
1410
1411out_reqbuf:
1412	spin_unlock(&buffers->rb_lock);
 
1413	return NULL;
 
 
 
 
 
1414}
1415
1416/*
1417 * Put request/reply buffers back into pool.
1418 * Pre-decrement counter/array index.
1419 */
1420void
1421rpcrdma_buffer_put(struct rpcrdma_req *req)
1422{
1423	struct rpcrdma_buffer *buffers = req->rl_buffer;
1424	struct rpcrdma_rep *rep = req->rl_reply;
1425
 
1426	req->rl_reply = NULL;
1427
1428	spin_lock(&buffers->rb_lock);
1429	buffers->rb_send_count--;
1430	list_add_tail(&req->rl_list, &buffers->rb_send_bufs);
1431	if (rep) {
1432		buffers->rb_recv_count--;
1433		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1434	}
1435	spin_unlock(&buffers->rb_lock);
1436}
1437
1438/*
1439 * Recover reply buffers from pool.
1440 * This happens when recovering from disconnect.
1441 */
1442void
1443rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1444{
1445	struct rpcrdma_buffer *buffers = req->rl_buffer;
1446
1447	spin_lock(&buffers->rb_lock);
1448	req->rl_reply = rpcrdma_buffer_get_rep(buffers);
 
1449	spin_unlock(&buffers->rb_lock);
1450}
1451
1452/*
1453 * Put reply buffers back into pool when not attached to
1454 * request. This happens in error conditions.
1455 */
1456void
1457rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1458{
1459	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1460
1461	spin_lock(&buffers->rb_lock);
1462	buffers->rb_recv_count--;
1463	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1464	spin_unlock(&buffers->rb_lock);
1465}
1466
 
 
 
 
 
 
 
 
 
 
 
 
1467/**
1468 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
 
1469 * @size: size of buffer to be allocated, in bytes
1470 * @direction: direction of data movement
1471 * @flags: GFP flags
1472 *
1473 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
1474 * can be persistently DMA-mapped for I/O.
 
1475 *
1476 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1477 * receiving the payload of RDMA RECV operations. During Long Calls
1478 * or Replies they may be registered externally via ro_map.
 
1479 */
1480struct rpcrdma_regbuf *
1481rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
1482		     gfp_t flags)
1483{
1484	struct rpcrdma_regbuf *rb;
 
1485
1486	rb = kmalloc(sizeof(*rb) + size, flags);
1487	if (rb == NULL)
1488		return ERR_PTR(-ENOMEM);
1489
1490	rb->rg_device = NULL;
1491	rb->rg_direction = direction;
1492	rb->rg_iov.length = size;
 
 
 
1493
 
 
 
 
1494	return rb;
 
 
 
 
 
1495}
1496
1497/**
1498 * __rpcrdma_map_regbuf - DMA-map a regbuf
1499 * @ia: controlling rpcrdma_ia
1500 * @rb: regbuf to be mapped
1501 */
1502bool
1503__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1504{
1505	struct ib_device *device = ia->ri_device;
1506
1507	if (rb->rg_direction == DMA_NONE)
1508		return false;
1509
1510	rb->rg_iov.addr = ib_dma_map_single(device,
1511					    (void *)rb->rg_base,
1512					    rdmab_length(rb),
1513					    rb->rg_direction);
1514	if (ib_dma_mapping_error(device, rdmab_addr(rb)))
1515		return false;
1516
1517	rb->rg_device = device;
1518	rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
1519	return true;
1520}
1521
1522static void
1523rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
1524{
1525	if (!rb)
1526		return;
1527
1528	if (!rpcrdma_regbuf_is_mapped(rb))
1529		return;
1530
1531	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
1532			    rdmab_length(rb), rb->rg_direction);
1533	rb->rg_device = NULL;
1534}
1535
1536/**
1537 * rpcrdma_free_regbuf - deregister and free registered buffer
1538 * @rb: regbuf to be deregistered and freed
1539 */
1540void
1541rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
1542{
1543	rpcrdma_dma_unmap_regbuf(rb);
1544	kfree(rb);
1545}
1546
1547/*
1548 * Prepost any receive buffer, then post send.
1549 *
1550 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1551 */
1552int
1553rpcrdma_ep_post(struct rpcrdma_ia *ia,
1554		struct rpcrdma_ep *ep,
1555		struct rpcrdma_req *req)
1556{
1557	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1558	int rc;
 
 
 
1559
1560	if (req->rl_reply) {
1561		rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
1562		if (rc)
1563			return rc;
1564		req->rl_reply = NULL;
1565	}
1566
1567	if (!ep->rep_send_count ||
1568	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1569		send_wr->send_flags |= IB_SEND_SIGNALED;
1570		ep->rep_send_count = ep->rep_send_batch;
1571	} else {
1572		send_wr->send_flags &= ~IB_SEND_SIGNALED;
1573		--ep->rep_send_count;
 
 
 
 
 
 
 
 
 
 
1574	}
1575
1576	rc = ia->ri_ops->ro_send(ia, req);
1577	trace_xprtrdma_post_send(req, rc);
1578	if (rc)
1579		return -ENOTCONN;
1580	return 0;
 
 
1581}
1582
 
 
 
1583int
1584rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
 
1585		     struct rpcrdma_rep *rep)
1586{
1587	struct ib_recv_wr *recv_wr_fail;
1588	int rc;
1589
1590	if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
1591		goto out_map;
1592	rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail);
1593	trace_xprtrdma_post_recv(rep, rc);
1594	if (rc)
1595		return -ENOTCONN;
1596	return 0;
 
 
1597
1598out_map:
1599	pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
1600	return -EIO;
 
 
 
1601}
1602
1603/**
1604 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1605 * @r_xprt: transport associated with these backchannel resources
1606 * @count: minimum number of incoming requests expected
1607 *
1608 * Returns zero if all requested buffers were posted, or a negative errno.
1609 */
1610int
1611rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1612{
1613	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1614	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
1615	struct rpcrdma_rep *rep;
1616	int rc;
1617
1618	while (count--) {
1619		spin_lock(&buffers->rb_lock);
1620		if (list_empty(&buffers->rb_recv_bufs))
1621			goto out_reqbuf;
1622		rep = rpcrdma_buffer_get_rep_locked(buffers);
1623		spin_unlock(&buffers->rb_lock);
1624
1625		rc = rpcrdma_ep_post_recv(ia, rep);
1626		if (rc)
1627			goto out_rc;
1628	}
1629
1630	return 0;
1631
1632out_reqbuf:
1633	spin_unlock(&buffers->rb_lock);
1634	trace_xprtrdma_noreps(r_xprt);
1635	return -ENOMEM;
1636
1637out_rc:
1638	rpcrdma_recv_buffer_put(rep);
1639	return rc;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1640}
v4.6
   1/*
 
   2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the BSD-type
   8 * license below:
   9 *
  10 * Redistribution and use in source and binary forms, with or without
  11 * modification, are permitted provided that the following conditions
  12 * are met:
  13 *
  14 *      Redistributions of source code must retain the above copyright
  15 *      notice, this list of conditions and the following disclaimer.
  16 *
  17 *      Redistributions in binary form must reproduce the above
  18 *      copyright notice, this list of conditions and the following
  19 *      disclaimer in the documentation and/or other materials provided
  20 *      with the distribution.
  21 *
  22 *      Neither the name of the Network Appliance, Inc. nor the names of
  23 *      its contributors may be used to endorse or promote products
  24 *      derived from this software without specific prior written
  25 *      permission.
  26 *
  27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38 */
  39
  40/*
  41 * verbs.c
  42 *
  43 * Encapsulates the major functions managing:
  44 *  o adapters
  45 *  o endpoints
  46 *  o connections
  47 *  o buffer memory
  48 */
  49
  50#include <linux/interrupt.h>
  51#include <linux/slab.h>
  52#include <linux/prefetch.h>
  53#include <linux/sunrpc/addr.h>
 
 
 
  54#include <asm/bitops.h>
  55#include <linux/module.h> /* try_module_get()/module_put() */
 
  56
  57#include "xprt_rdma.h"
  58
  59/*
  60 * Globals/Macros
  61 */
  62
  63#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  64# define RPCDBG_FACILITY	RPCDBG_TRANS
  65#endif
  66
  67/*
  68 * internal functions
  69 */
 
 
 
  70
  71static struct workqueue_struct *rpcrdma_receive_wq;
  72
  73int
  74rpcrdma_alloc_wq(void)
  75{
  76	struct workqueue_struct *recv_wq;
  77
  78	recv_wq = alloc_workqueue("xprtrdma_receive",
  79				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
  80				  0);
  81	if (!recv_wq)
  82		return -ENOMEM;
  83
  84	rpcrdma_receive_wq = recv_wq;
  85	return 0;
  86}
  87
  88void
  89rpcrdma_destroy_wq(void)
  90{
  91	struct workqueue_struct *wq;
  92
  93	if (rpcrdma_receive_wq) {
  94		wq = rpcrdma_receive_wq;
  95		rpcrdma_receive_wq = NULL;
  96		destroy_workqueue(wq);
  97	}
  98}
  99
 100static void
 101rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
 102{
 103	struct rpcrdma_ep *ep = context;
 
 
 
 
 
 
 104
 105	pr_err("RPC:       %s: %s on device %s ep %p\n",
 106	       __func__, ib_event_msg(event->event),
 107		event->device->name, context);
 108	if (ep->rep_connected == 1) {
 109		ep->rep_connected = -EIO;
 110		rpcrdma_conn_func(ep);
 111		wake_up_all(&ep->rep_connect_wait);
 112	}
 113}
 114
 115/**
 116 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 117 * @cq:	completion queue (ignored)
 118 * @wc:	completed WR
 119 *
 120 */
 121static void
 122rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 123{
 
 
 
 
 124	/* WARNING: Only wr_cqe and status are reliable at this point */
 
 125	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
 126		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
 127		       ib_wc_status_msg(wc->status),
 128		       wc->status, wc->vendor_err);
 129}
 130
 131static void
 132rpcrdma_receive_worker(struct work_struct *work)
 133{
 134	struct rpcrdma_rep *rep =
 135			container_of(work, struct rpcrdma_rep, rr_work);
 136
 137	rpcrdma_reply_handler(rep);
 138}
 139
 140/* Perform basic sanity checking to avoid using garbage
 141 * to update the credit grant value.
 142 */
 143static void
 144rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
 145{
 146	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
 147	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
 148	u32 credits;
 149
 150	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
 151		return;
 152
 153	credits = be32_to_cpu(rmsgp->rm_credit);
 154	if (credits == 0)
 155		credits = 1;	/* don't deadlock */
 156	else if (credits > buffer->rb_max_requests)
 157		credits = buffer->rb_max_requests;
 158
 159	atomic_set(&buffer->rb_credits, credits);
 160}
 161
 162/**
 163 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 164 * @cq:	completion queue (ignored)
 165 * @wc:	completed WR
 166 *
 167 */
 168static void
 169rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
 170{
 171	struct ib_cqe *cqe = wc->wr_cqe;
 172	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
 173					       rr_cqe);
 174
 175	/* WARNING: Only wr_id and status are reliable at this point */
 
 176	if (wc->status != IB_WC_SUCCESS)
 177		goto out_fail;
 178
 179	/* status == SUCCESS means all fields in wc are trustworthy */
 180	if (wc->opcode != IB_WC_RECV)
 181		return;
 182
 183	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
 184		__func__, rep, wc->byte_len);
 185
 186	rep->rr_len = wc->byte_len;
 187	ib_dma_sync_single_for_cpu(rep->rr_device,
 188				   rdmab_addr(rep->rr_rdmabuf),
 189				   rep->rr_len, DMA_FROM_DEVICE);
 190
 191	rpcrdma_update_granted_credits(rep);
 192
 193out_schedule:
 194	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 195	return;
 196
 197out_fail:
 198	if (wc->status != IB_WC_WR_FLUSH_ERR)
 199		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
 200		       ib_wc_status_msg(wc->status),
 201		       wc->status, wc->vendor_err);
 202	rep->rr_len = RPCRDMA_BAD_LEN;
 203	goto out_schedule;
 204}
 205
 206static void
 207rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 
 208{
 209	struct ib_wc wc;
 
 
 210
 211	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
 212		rpcrdma_receive_wc(NULL, &wc);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 213}
 214
 215static int
 216rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 217{
 218	struct rpcrdma_xprt *xprt = id->context;
 219	struct rpcrdma_ia *ia = &xprt->rx_ia;
 220	struct rpcrdma_ep *ep = &xprt->rx_ep;
 221#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 222	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
 223#endif
 224	struct ib_qp_attr *attr = &ia->ri_qp_attr;
 225	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
 226	int connstate = 0;
 227
 
 228	switch (event->event) {
 229	case RDMA_CM_EVENT_ADDR_RESOLVED:
 230	case RDMA_CM_EVENT_ROUTE_RESOLVED:
 231		ia->ri_async_rc = 0;
 232		complete(&ia->ri_done);
 233		break;
 234	case RDMA_CM_EVENT_ADDR_ERROR:
 235		ia->ri_async_rc = -EHOSTUNREACH;
 236		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
 237			__func__, ep);
 238		complete(&ia->ri_done);
 239		break;
 240	case RDMA_CM_EVENT_ROUTE_ERROR:
 241		ia->ri_async_rc = -ENETUNREACH;
 242		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
 243			__func__, ep);
 244		complete(&ia->ri_done);
 245		break;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 246	case RDMA_CM_EVENT_ESTABLISHED:
 
 247		connstate = 1;
 248		ib_query_qp(ia->ri_id->qp, attr,
 249			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
 250			    iattr);
 251		dprintk("RPC:       %s: %d responder resources"
 252			" (%d initiator)\n",
 253			__func__, attr->max_dest_rd_atomic,
 254			attr->max_rd_atomic);
 255		goto connected;
 256	case RDMA_CM_EVENT_CONNECT_ERROR:
 257		connstate = -ENOTCONN;
 258		goto connected;
 259	case RDMA_CM_EVENT_UNREACHABLE:
 260		connstate = -ENETDOWN;
 261		goto connected;
 262	case RDMA_CM_EVENT_REJECTED:
 
 
 
 263		connstate = -ECONNREFUSED;
 
 
 264		goto connected;
 265	case RDMA_CM_EVENT_DISCONNECTED:
 
 266		connstate = -ECONNABORTED;
 267		goto connected;
 268	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 269		connstate = -ENODEV;
 270connected:
 271		dprintk("RPC:       %s: %sconnected\n",
 272					__func__, connstate > 0 ? "" : "dis");
 273		atomic_set(&xprt->rx_buf.rb_credits, 1);
 274		ep->rep_connected = connstate;
 275		rpcrdma_conn_func(ep);
 276		wake_up_all(&ep->rep_connect_wait);
 277		/*FALLTHROUGH*/
 278	default:
 279		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
 280			__func__, sap, rpc_get_port(sap), ep,
 281			rdma_event_msg(event->event));
 
 
 282		break;
 283	}
 284
 285#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 286	if (connstate == 1) {
 287		int ird = attr->max_dest_rd_atomic;
 288		int tird = ep->rep_remote_cma.responder_resources;
 289
 290		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
 291			sap, rpc_get_port(sap),
 292			ia->ri_device->name,
 293			ia->ri_ops->ro_displayname,
 294			xprt->rx_buf.rb_max_requests,
 295			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
 296	} else if (connstate < 0) {
 297		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
 298			sap, rpc_get_port(sap), connstate);
 299	}
 300#endif
 301
 302	return 0;
 303}
 304
 305static void rpcrdma_destroy_id(struct rdma_cm_id *id)
 306{
 307	if (id) {
 308		module_put(id->device->owner);
 309		rdma_destroy_id(id);
 310	}
 311}
 312
 313static struct rdma_cm_id *
 314rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 315			struct rpcrdma_ia *ia, struct sockaddr *addr)
 316{
 
 317	struct rdma_cm_id *id;
 318	int rc;
 319
 
 
 320	init_completion(&ia->ri_done);
 
 321
 322	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
 323			    IB_QPT_RC);
 324	if (IS_ERR(id)) {
 325		rc = PTR_ERR(id);
 326		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
 327			__func__, rc);
 328		return id;
 329	}
 330
 331	ia->ri_async_rc = -ETIMEDOUT;
 332	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
 
 
 333	if (rc) {
 334		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
 335			__func__, rc);
 336		goto out;
 337	}
 338	wait_for_completion_interruptible_timeout(&ia->ri_done,
 339				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 
 
 
 340
 341	/* FIXME:
 342	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
 343	 * be pinned while there are active NFS/RDMA mounts to prevent
 344	 * hangs and crashes at umount time.
 345	 */
 346	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
 347		dprintk("RPC:       %s: Failed to get device module\n",
 348			__func__);
 349		ia->ri_async_rc = -ENODEV;
 350	}
 351	rc = ia->ri_async_rc;
 352	if (rc)
 353		goto out;
 354
 355	ia->ri_async_rc = -ETIMEDOUT;
 356	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 357	if (rc) {
 358		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
 359			__func__, rc);
 360		goto put;
 
 
 
 
 
 361	}
 362	wait_for_completion_interruptible_timeout(&ia->ri_done,
 363				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 364	rc = ia->ri_async_rc;
 365	if (rc)
 366		goto put;
 367
 368	return id;
 369put:
 370	module_put(id->device->owner);
 371out:
 372	rdma_destroy_id(id);
 373	return ERR_PTR(rc);
 374}
 375
 376/*
 377 * Drain any cq, prior to teardown.
 378 */
 379static void
 380rpcrdma_clean_cq(struct ib_cq *cq)
 381{
 382	struct ib_wc wc;
 383	int count = 0;
 384
 385	while (1 == ib_poll_cq(cq, 1, &wc))
 386		++count;
 387
 388	if (count)
 389		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
 390			__func__, count, wc.opcode);
 391}
 392
 393/*
 394 * Exported functions.
 395 */
 396
 397/*
 398 * Open and initialize an Interface Adapter.
 399 *  o initializes fields of struct rpcrdma_ia, including
 400 *    interface and provider attributes and protection zone.
 
 
 401 */
 402int
 403rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 404{
 405	struct rpcrdma_ia *ia = &xprt->rx_ia;
 406	int rc;
 407
 408	ia->ri_dma_mr = NULL;
 409
 410	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
 411	if (IS_ERR(ia->ri_id)) {
 412		rc = PTR_ERR(ia->ri_id);
 413		goto out1;
 414	}
 415	ia->ri_device = ia->ri_id->device;
 416
 417	ia->ri_pd = ib_alloc_pd(ia->ri_device);
 418	if (IS_ERR(ia->ri_pd)) {
 419		rc = PTR_ERR(ia->ri_pd);
 420		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
 421			__func__, rc);
 422		goto out2;
 423	}
 424
 425	if (memreg == RPCRDMA_FRMR) {
 426		if (!(ia->ri_device->attrs.device_cap_flags &
 427				IB_DEVICE_MEM_MGT_EXTENSIONS) ||
 428		    (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
 429			dprintk("RPC:       %s: FRMR registration "
 430				"not supported by HCA\n", __func__);
 431			memreg = RPCRDMA_MTHCAFMR;
 432		}
 433	}
 434	if (memreg == RPCRDMA_MTHCAFMR) {
 435		if (!ia->ri_device->alloc_fmr) {
 436			dprintk("RPC:       %s: MTHCAFMR registration "
 437				"not supported by HCA\n", __func__);
 438			rc = -EINVAL;
 439			goto out3;
 440		}
 441	}
 442
 443	switch (memreg) {
 444	case RPCRDMA_FRMR:
 445		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
 446		break;
 447	case RPCRDMA_ALLPHYSICAL:
 448		ia->ri_ops = &rpcrdma_physical_memreg_ops;
 449		break;
 450	case RPCRDMA_MTHCAFMR:
 451		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
 452		break;
 453	default:
 454		printk(KERN_ERR "RPC: Unsupported memory "
 455				"registration mode: %d\n", memreg);
 456		rc = -ENOMEM;
 457		goto out3;
 458	}
 459	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
 460		__func__, ia->ri_ops->ro_displayname);
 461
 462	rwlock_init(&ia->ri_qplock);
 463	return 0;
 464
 465out3:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 466	ib_dealloc_pd(ia->ri_pd);
 467	ia->ri_pd = NULL;
 468out2:
 469	rpcrdma_destroy_id(ia->ri_id);
 470	ia->ri_id = NULL;
 471out1:
 472	return rc;
 473}
 474
 475/*
 476 * Clean up/close an IA.
 477 *   o if event handles and PD have been initialized, free them.
 478 *   o close the IA
 479 */
 480void
 481rpcrdma_ia_close(struct rpcrdma_ia *ia)
 482{
 483	dprintk("RPC:       %s: entering\n", __func__);
 484	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 485		if (ia->ri_id->qp)
 486			rdma_destroy_qp(ia->ri_id);
 487		rpcrdma_destroy_id(ia->ri_id);
 488		ia->ri_id = NULL;
 489	}
 
 
 490
 491	/* If the pd is still busy, xprtrdma missed freeing a resource */
 492	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
 493		ib_dealloc_pd(ia->ri_pd);
 
 494}
 495
 496/*
 497 * Create unconnected endpoint.
 498 */
 499int
 500rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 501				struct rpcrdma_create_data_internal *cdata)
 502{
 
 
 503	struct ib_cq *sendcq, *recvcq;
 504	unsigned int max_qp_wr;
 505	int rc;
 506
 507	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
 508		dprintk("RPC:       %s: insufficient sge's available\n",
 509			__func__);
 
 510		return -ENOMEM;
 511	}
 
 512
 513	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
 514		dprintk("RPC:       %s: insufficient wqe's available\n",
 515			__func__);
 516		return -ENOMEM;
 517	}
 518	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS;
 519
 520	/* check provider's send/recv wr limits */
 521	if (cdata->max_requests > max_qp_wr)
 522		cdata->max_requests = max_qp_wr;
 523
 524	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 525	ep->rep_attr.qp_context = ep;
 526	ep->rep_attr.srq = NULL;
 527	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 528	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
 
 529	rc = ia->ri_ops->ro_open(ia, ep, cdata);
 530	if (rc)
 531		return rc;
 532	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
 533	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 534	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
 
 535	ep->rep_attr.cap.max_recv_sge = 1;
 536	ep->rep_attr.cap.max_inline_data = 0;
 537	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
 538	ep->rep_attr.qp_type = IB_QPT_RC;
 539	ep->rep_attr.port_num = ~0;
 540
 541	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 542		"iovs: send %d recv %d\n",
 543		__func__,
 544		ep->rep_attr.cap.max_send_wr,
 545		ep->rep_attr.cap.max_recv_wr,
 546		ep->rep_attr.cap.max_send_sge,
 547		ep->rep_attr.cap.max_recv_sge);
 548
 549	/* set trigger for requesting send completion */
 550	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
 551	if (ep->rep_cqinit <= 2)
 552		ep->rep_cqinit = 0;	/* always signal? */
 553	INIT_CQCOUNT(ep);
 554	init_waitqueue_head(&ep->rep_connect_wait);
 555	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 556
 557	sendcq = ib_alloc_cq(ia->ri_device, NULL,
 558			     ep->rep_attr.cap.max_send_wr + 1,
 559			     0, IB_POLL_SOFTIRQ);
 560	if (IS_ERR(sendcq)) {
 561		rc = PTR_ERR(sendcq);
 562		dprintk("RPC:       %s: failed to create send CQ: %i\n",
 563			__func__, rc);
 564		goto out1;
 565	}
 566
 567	recvcq = ib_alloc_cq(ia->ri_device, NULL,
 568			     ep->rep_attr.cap.max_recv_wr + 1,
 569			     0, IB_POLL_SOFTIRQ);
 570	if (IS_ERR(recvcq)) {
 571		rc = PTR_ERR(recvcq);
 572		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
 573			__func__, rc);
 574		goto out2;
 575	}
 576
 577	ep->rep_attr.send_cq = sendcq;
 578	ep->rep_attr.recv_cq = recvcq;
 579
 580	/* Initialize cma parameters */
 
 581
 582	/* RPC/RDMA does not use private data */
 583	ep->rep_remote_cma.private_data = NULL;
 584	ep->rep_remote_cma.private_data_len = 0;
 
 
 
 
 
 585
 586	/* Client offers RDMA Read but does not initiate */
 587	ep->rep_remote_cma.initiator_depth = 0;
 588	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
 589		ep->rep_remote_cma.responder_resources = 32;
 590	else
 591		ep->rep_remote_cma.responder_resources =
 592						ia->ri_device->attrs.max_qp_rd_atom;
 
 
 
 593
 594	ep->rep_remote_cma.retry_count = 7;
 
 
 
 595	ep->rep_remote_cma.flow_control = 0;
 596	ep->rep_remote_cma.rnr_retry_count = 0;
 597
 598	return 0;
 599
 600out2:
 601	ib_free_cq(sendcq);
 602out1:
 603	if (ia->ri_dma_mr)
 604		ib_dereg_mr(ia->ri_dma_mr);
 605	return rc;
 606}
 607
 608/*
 609 * rpcrdma_ep_destroy
 610 *
 611 * Disconnect and destroy endpoint. After this, the only
 612 * valid operations on the ep are to free it (if dynamically
 613 * allocated) or re-create it.
 614 */
 615void
 616rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 617{
 618	int rc;
 619
 620	dprintk("RPC:       %s: entering, connected is %d\n",
 621		__func__, ep->rep_connected);
 622
 623	cancel_delayed_work_sync(&ep->rep_connect_worker);
 624
 625	if (ia->ri_id->qp)
 626		rpcrdma_ep_disconnect(ep, ia);
 627
 628	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
 629	rpcrdma_clean_cq(ep->rep_attr.send_cq);
 630
 631	if (ia->ri_id->qp) {
 632		rdma_destroy_qp(ia->ri_id);
 633		ia->ri_id->qp = NULL;
 634	}
 635
 636	ib_free_cq(ep->rep_attr.recv_cq);
 637	ib_free_cq(ep->rep_attr.send_cq);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 638
 639	if (ia->ri_dma_mr) {
 640		rc = ib_dereg_mr(ia->ri_dma_mr);
 641		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
 642			__func__, rc);
 
 643	}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 644}
 645
 646/*
 647 * Connect unconnected endpoint.
 648 */
 649int
 650rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 651{
 652	struct rdma_cm_id *id, *old;
 653	int rc = 0;
 654	int retry_count = 0;
 
 655
 656	if (ep->rep_connected != 0) {
 657		struct rpcrdma_xprt *xprt;
 658retry:
 659		dprintk("RPC:       %s: reconnecting...\n", __func__);
 660
 661		rpcrdma_ep_disconnect(ep, ia);
 662		rpcrdma_flush_cqs(ep);
 663
 664		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 665		id = rpcrdma_create_id(xprt, ia,
 666				(struct sockaddr *)&xprt->rx_data.addr);
 667		if (IS_ERR(id)) {
 668			rc = -EHOSTUNREACH;
 669			goto out;
 670		}
 671		/* TEMP TEMP TEMP - fail if new device:
 672		 * Deregister/remarshal *all* requests!
 673		 * Close and recreate adapter, pd, etc!
 674		 * Re-determine all attributes still sane!
 675		 * More stuff I haven't thought of!
 676		 * Rrrgh!
 677		 */
 678		if (ia->ri_device != id->device) {
 679			printk("RPC:       %s: can't reconnect on "
 680				"different device!\n", __func__);
 681			rpcrdma_destroy_id(id);
 682			rc = -ENETUNREACH;
 683			goto out;
 684		}
 685		/* END TEMP */
 686		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
 687		if (rc) {
 688			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 689				__func__, rc);
 690			rpcrdma_destroy_id(id);
 691			rc = -ENETUNREACH;
 692			goto out;
 693		}
 694
 695		write_lock(&ia->ri_qplock);
 696		old = ia->ri_id;
 697		ia->ri_id = id;
 698		write_unlock(&ia->ri_qplock);
 699
 700		rdma_destroy_qp(old);
 701		rpcrdma_destroy_id(old);
 702	} else {
 703		dprintk("RPC:       %s: connecting...\n", __func__);
 704		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
 705		if (rc) {
 706			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
 707				__func__, rc);
 708			/* do not update ep->rep_connected */
 709			return -ENETUNREACH;
 710		}
 
 
 
 
 
 
 
 
 
 
 711	}
 712
 713	ep->rep_connected = 0;
 714
 715	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 716	if (rc) {
 717		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
 718				__func__, rc);
 719		goto out;
 720	}
 721
 722	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 723
 724	/*
 725	 * Check state. A non-peer reject indicates no listener
 726	 * (ECONNREFUSED), which may be a transient state. All
 727	 * others indicate a transport condition which has already
 728	 * undergone a best-effort.
 729	 */
 730	if (ep->rep_connected == -ECONNREFUSED &&
 731	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
 732		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
 733		goto retry;
 734	}
 735	if (ep->rep_connected <= 0) {
 736		/* Sometimes, the only way to reliably connect to remote
 737		 * CMs is to use same nonzero values for ORD and IRD. */
 738		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
 739		    (ep->rep_remote_cma.responder_resources == 0 ||
 740		     ep->rep_remote_cma.initiator_depth !=
 741				ep->rep_remote_cma.responder_resources)) {
 742			if (ep->rep_remote_cma.responder_resources == 0)
 743				ep->rep_remote_cma.responder_resources = 1;
 744			ep->rep_remote_cma.initiator_depth =
 745				ep->rep_remote_cma.responder_resources;
 746			goto retry;
 747		}
 748		rc = ep->rep_connected;
 749	} else {
 750		struct rpcrdma_xprt *r_xprt;
 751		unsigned int extras;
 752
 753		dprintk("RPC:       %s: connected\n", __func__);
 754
 755		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 756		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
 757
 758		if (extras) {
 759			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
 760			if (rc) {
 761				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
 762					__func__, rc);
 763				rc = 0;
 764			}
 765		}
 766	}
 767
 768out:
 769	if (rc)
 770		ep->rep_connected = rc;
 
 
 771	return rc;
 772}
 773
 774/*
 775 * rpcrdma_ep_disconnect
 776 *
 777 * This is separate from destroy to facilitate the ability
 778 * to reconnect without recreating the endpoint.
 779 *
 780 * This call is not reentrant, and must not be made in parallel
 781 * on the same endpoint.
 782 */
 783void
 784rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 785{
 786	int rc;
 787
 788	rpcrdma_flush_cqs(ep);
 789	rc = rdma_disconnect(ia->ri_id);
 790	if (!rc) {
 791		/* returns without wait if not connected */
 792		wait_event_interruptible(ep->rep_connect_wait,
 793							ep->rep_connected != 1);
 794		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
 795			(ep->rep_connected == 1) ? "still " : "dis");
 796	} else {
 797		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
 798		ep->rep_connected = rc;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 799	}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 800}
 801
 802struct rpcrdma_req *
 803rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 804{
 805	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 
 806	struct rpcrdma_req *req;
 807
 808	req = kzalloc(sizeof(*req), GFP_KERNEL);
 809	if (req == NULL)
 810		return ERR_PTR(-ENOMEM);
 811
 812	INIT_LIST_HEAD(&req->rl_free);
 
 
 
 
 
 
 
 
 
 
 813	spin_lock(&buffer->rb_reqslock);
 814	list_add(&req->rl_all, &buffer->rb_allreqs);
 815	spin_unlock(&buffer->rb_reqslock);
 816	req->rl_cqe.done = rpcrdma_wc_send;
 817	req->rl_buffer = &r_xprt->rx_buf;
 818	return req;
 819}
 820
 821struct rpcrdma_rep *
 
 
 
 
 
 
 822rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 823{
 824	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 825	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 826	struct rpcrdma_rep *rep;
 827	int rc;
 828
 829	rc = -ENOMEM;
 830	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
 831	if (rep == NULL)
 832		goto out;
 833
 834	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
 835					       GFP_KERNEL);
 836	if (IS_ERR(rep->rr_rdmabuf)) {
 837		rc = PTR_ERR(rep->rr_rdmabuf);
 838		goto out_free;
 839	}
 
 
 840
 841	rep->rr_device = ia->ri_device;
 842	rep->rr_cqe.done = rpcrdma_receive_wc;
 843	rep->rr_rxprt = r_xprt;
 844	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
 845	return rep;
 
 
 
 
 
 
 
 
 846
 847out_free:
 848	kfree(rep);
 849out:
 850	return ERR_PTR(rc);
 
 
 851}
 852
 853int
 854rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 855{
 856	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 857	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 858	int i, rc;
 859
 860	buf->rb_max_requests = r_xprt->rx_data.max_requests;
 861	buf->rb_bc_srv_max_requests = 0;
 
 862	spin_lock_init(&buf->rb_lock);
 863	atomic_set(&buf->rb_credits, 1);
 
 
 
 
 
 
 
 864
 865	rc = ia->ri_ops->ro_init(r_xprt);
 866	if (rc)
 867		goto out;
 868
 869	INIT_LIST_HEAD(&buf->rb_send_bufs);
 870	INIT_LIST_HEAD(&buf->rb_allreqs);
 871	spin_lock_init(&buf->rb_reqslock);
 872	for (i = 0; i < buf->rb_max_requests; i++) {
 873		struct rpcrdma_req *req;
 874
 875		req = rpcrdma_create_req(r_xprt);
 876		if (IS_ERR(req)) {
 877			dprintk("RPC:       %s: request buffer %d alloc"
 878				" failed\n", __func__, i);
 879			rc = PTR_ERR(req);
 880			goto out;
 881		}
 882		req->rl_backchannel = false;
 883		list_add(&req->rl_free, &buf->rb_send_bufs);
 884	}
 885
 886	INIT_LIST_HEAD(&buf->rb_recv_bufs);
 887	for (i = 0; i < buf->rb_max_requests + 2; i++) {
 888		struct rpcrdma_rep *rep;
 889
 890		rep = rpcrdma_create_rep(r_xprt);
 891		if (IS_ERR(rep)) {
 892			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
 893				__func__, i);
 894			rc = PTR_ERR(rep);
 895			goto out;
 896		}
 897		list_add(&rep->rr_list, &buf->rb_recv_bufs);
 898	}
 899
 
 
 
 
 900	return 0;
 901out:
 902	rpcrdma_buffer_destroy(buf);
 903	return rc;
 904}
 905
 906static struct rpcrdma_req *
 907rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
 908{
 909	struct rpcrdma_req *req;
 910
 911	req = list_first_entry(&buf->rb_send_bufs,
 912			       struct rpcrdma_req, rl_free);
 913	list_del(&req->rl_free);
 914	return req;
 915}
 916
 917static struct rpcrdma_rep *
 918rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
 919{
 920	struct rpcrdma_rep *rep;
 921
 922	rep = list_first_entry(&buf->rb_recv_bufs,
 923			       struct rpcrdma_rep, rr_list);
 924	list_del(&rep->rr_list);
 925	return rep;
 926}
 927
 928static void
 929rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 930{
 931	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
 932	kfree(rep);
 933}
 934
 935void
 936rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 937{
 938	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
 939	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
 
 940	kfree(req);
 941}
 942
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 943void
 944rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 945{
 946	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
 
 
 
 947
 948	while (!list_empty(&buf->rb_recv_bufs)) {
 949		struct rpcrdma_rep *rep;
 950
 951		rep = rpcrdma_buffer_get_rep_locked(buf);
 952		rpcrdma_destroy_rep(ia, rep);
 953	}
 
 954
 955	spin_lock(&buf->rb_reqslock);
 956	while (!list_empty(&buf->rb_allreqs)) {
 957		struct rpcrdma_req *req;
 958
 959		req = list_first_entry(&buf->rb_allreqs,
 960				       struct rpcrdma_req, rl_all);
 961		list_del(&req->rl_all);
 962
 963		spin_unlock(&buf->rb_reqslock);
 964		rpcrdma_destroy_req(ia, req);
 965		spin_lock(&buf->rb_reqslock);
 966	}
 967	spin_unlock(&buf->rb_reqslock);
 
 968
 969	ia->ri_ops->ro_destroy(buf);
 970}
 971
 972struct rpcrdma_mw *
 973rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
 
 
 
 
 
 
 
 974{
 975	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 976	struct rpcrdma_mw *mw = NULL;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 977
 978	spin_lock(&buf->rb_mwlock);
 979	if (!list_empty(&buf->rb_mws)) {
 980		mw = list_first_entry(&buf->rb_mws,
 981				      struct rpcrdma_mw, mw_list);
 982		list_del_init(&mw->mw_list);
 983	}
 984	spin_unlock(&buf->rb_mwlock);
 985
 986	if (!mw)
 987		pr_err("RPC:       %s: no MWs available\n", __func__);
 988	return mw;
 
 
 
 
 
 
 
 
 
 
 
 
 989}
 990
 
 
 
 
 
 991void
 992rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
 
 
 
 
 
 
 
 
 
 
 
 993{
 994	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 
 
 
 
 
 
 995
 996	spin_lock(&buf->rb_mwlock);
 997	list_add_tail(&mw->mw_list, &buf->rb_mws);
 998	spin_unlock(&buf->rb_mwlock);
 
 999}
1000
1001/*
1002 * Get a set of request/reply buffers.
1003 *
1004 * Reply buffer (if available) is attached to send buffer upon return.
1005 */
1006struct rpcrdma_req *
1007rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1008{
1009	struct rpcrdma_req *req;
1010
1011	spin_lock(&buffers->rb_lock);
1012	if (list_empty(&buffers->rb_send_bufs))
1013		goto out_reqbuf;
 
1014	req = rpcrdma_buffer_get_req_locked(buffers);
1015	if (list_empty(&buffers->rb_recv_bufs))
1016		goto out_repbuf;
1017	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1018	spin_unlock(&buffers->rb_lock);
 
1019	return req;
1020
1021out_reqbuf:
1022	spin_unlock(&buffers->rb_lock);
1023	pr_warn("RPC:       %s: out of request buffers\n", __func__);
1024	return NULL;
1025out_repbuf:
1026	spin_unlock(&buffers->rb_lock);
1027	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
1028	req->rl_reply = NULL;
1029	return req;
1030}
1031
1032/*
1033 * Put request/reply buffers back into pool.
1034 * Pre-decrement counter/array index.
1035 */
1036void
1037rpcrdma_buffer_put(struct rpcrdma_req *req)
1038{
1039	struct rpcrdma_buffer *buffers = req->rl_buffer;
1040	struct rpcrdma_rep *rep = req->rl_reply;
1041
1042	req->rl_niovs = 0;
1043	req->rl_reply = NULL;
1044
1045	spin_lock(&buffers->rb_lock);
1046	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1047	if (rep)
 
 
1048		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
 
1049	spin_unlock(&buffers->rb_lock);
1050}
1051
1052/*
1053 * Recover reply buffers from pool.
1054 * This happens when recovering from disconnect.
1055 */
1056void
1057rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1058{
1059	struct rpcrdma_buffer *buffers = req->rl_buffer;
1060
1061	spin_lock(&buffers->rb_lock);
1062	if (!list_empty(&buffers->rb_recv_bufs))
1063		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1064	spin_unlock(&buffers->rb_lock);
1065}
1066
1067/*
1068 * Put reply buffers back into pool when not attached to
1069 * request. This happens in error conditions.
1070 */
1071void
1072rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1073{
1074	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1075
1076	spin_lock(&buffers->rb_lock);
 
1077	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1078	spin_unlock(&buffers->rb_lock);
1079}
1080
1081/*
1082 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1083 */
1084
1085void
1086rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1087{
1088	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1089		seg->mr_offset,
1090		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
1091}
1092
1093/**
1094 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1095 * @ia: controlling rpcrdma_ia
1096 * @size: size of buffer to be allocated, in bytes
 
1097 * @flags: GFP flags
1098 *
1099 * Returns pointer to private header of an area of internally
1100 * registered memory, or an ERR_PTR. The registered buffer follows
1101 * the end of the private header.
1102 *
1103 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1104 * receiving the payload of RDMA RECV operations. regbufs are not
1105 * used for RDMA READ/WRITE operations, thus are registered only for
1106 * LOCAL access.
1107 */
1108struct rpcrdma_regbuf *
1109rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
 
1110{
1111	struct rpcrdma_regbuf *rb;
1112	struct ib_sge *iov;
1113
1114	rb = kmalloc(sizeof(*rb) + size, flags);
1115	if (rb == NULL)
1116		goto out;
1117
1118	iov = &rb->rg_iov;
1119	iov->addr = ib_dma_map_single(ia->ri_device,
1120				      (void *)rb->rg_base, size,
1121				      DMA_BIDIRECTIONAL);
1122	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1123		goto out_free;
1124
1125	iov->length = size;
1126	iov->lkey = ia->ri_pd->local_dma_lkey;
1127	rb->rg_size = size;
1128	rb->rg_owner = NULL;
1129	return rb;
1130
1131out_free:
1132	kfree(rb);
1133out:
1134	return ERR_PTR(-ENOMEM);
1135}
1136
1137/**
1138 * rpcrdma_free_regbuf - deregister and free registered buffer
1139 * @ia: controlling rpcrdma_ia
1140 * @rb: regbuf to be deregistered and freed
1141 */
1142void
1143rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1144{
1145	struct ib_sge *iov;
 
 
 
1146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1147	if (!rb)
1148		return;
1149
1150	iov = &rb->rg_iov;
1151	ib_dma_unmap_single(ia->ri_device,
1152			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
 
 
 
 
 
 
 
 
 
 
 
 
 
1153	kfree(rb);
1154}
1155
1156/*
1157 * Prepost any receive buffer, then post send.
1158 *
1159 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1160 */
1161int
1162rpcrdma_ep_post(struct rpcrdma_ia *ia,
1163		struct rpcrdma_ep *ep,
1164		struct rpcrdma_req *req)
1165{
1166	struct ib_device *device = ia->ri_device;
1167	struct ib_send_wr send_wr, *send_wr_fail;
1168	struct rpcrdma_rep *rep = req->rl_reply;
1169	struct ib_sge *iov = req->rl_send_iov;
1170	int i, rc;
1171
1172	if (rep) {
1173		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1174		if (rc)
1175			goto out;
1176		req->rl_reply = NULL;
1177	}
1178
1179	send_wr.next = NULL;
1180	send_wr.wr_cqe = &req->rl_cqe;
1181	send_wr.sg_list = iov;
1182	send_wr.num_sge = req->rl_niovs;
1183	send_wr.opcode = IB_WR_SEND;
1184
1185	for (i = 0; i < send_wr.num_sge; i++)
1186		ib_dma_sync_single_for_device(device, iov[i].addr,
1187					      iov[i].length, DMA_TO_DEVICE);
1188	dprintk("RPC:       %s: posting %d s/g entries\n",
1189		__func__, send_wr.num_sge);
1190
1191	if (DECR_CQCOUNT(ep) > 0)
1192		send_wr.send_flags = 0;
1193	else { /* Provider must take a send completion every now and then */
1194		INIT_CQCOUNT(ep);
1195		send_wr.send_flags = IB_SEND_SIGNALED;
1196	}
1197
1198	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
 
1199	if (rc)
1200		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1201			rc);
1202out:
1203	return rc;
1204}
1205
1206/*
1207 * (Re)post a receive buffer.
1208 */
1209int
1210rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1211		     struct rpcrdma_ep *ep,
1212		     struct rpcrdma_rep *rep)
1213{
1214	struct ib_recv_wr recv_wr, *recv_wr_fail;
1215	int rc;
1216
1217	recv_wr.next = NULL;
1218	recv_wr.wr_cqe = &rep->rr_cqe;
1219	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1220	recv_wr.num_sge = 1;
1221
1222	ib_dma_sync_single_for_cpu(ia->ri_device,
1223				   rdmab_addr(rep->rr_rdmabuf),
1224				   rdmab_length(rep->rr_rdmabuf),
1225				   DMA_BIDIRECTIONAL);
1226
1227	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1228
1229	if (rc)
1230		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1231			rc);
1232	return rc;
1233}
1234
1235/**
1236 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1237 * @r_xprt: transport associated with these backchannel resources
1238 * @min_reqs: minimum number of incoming requests expected
1239 *
1240 * Returns zero if all requested buffers were posted, or a negative errno.
1241 */
1242int
1243rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1244{
1245	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1246	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1247	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1248	struct rpcrdma_rep *rep;
1249	int rc;
1250
1251	while (count--) {
1252		spin_lock(&buffers->rb_lock);
1253		if (list_empty(&buffers->rb_recv_bufs))
1254			goto out_reqbuf;
1255		rep = rpcrdma_buffer_get_rep_locked(buffers);
1256		spin_unlock(&buffers->rb_lock);
1257
1258		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1259		if (rc)
1260			goto out_rc;
1261	}
1262
1263	return 0;
1264
1265out_reqbuf:
1266	spin_unlock(&buffers->rb_lock);
1267	pr_warn("%s: no extra receive buffers\n", __func__);
1268	return -ENOMEM;
1269
1270out_rc:
1271	rpcrdma_recv_buffer_put(rep);
1272	return rc;
1273}
1274
1275/* How many chunk list items fit within our inline buffers?
1276 */
1277unsigned int
1278rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1279{
1280	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1281	int bytes, segments;
1282
1283	bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1284	bytes -= RPCRDMA_HDRLEN_MIN;
1285	if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1286		pr_warn("RPC:       %s: inline threshold too small\n",
1287			__func__);
1288		return 0;
1289	}
1290
1291	segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1292	dprintk("RPC:       %s: max chunk list size = %d segments\n",
1293		__func__, segments);
1294	return segments;
1295}