Linux Audio

Check our new training course

Loading...
Note: File does not exist in v3.1.
  1/*
  2 * Copyright (c) 2015 Oracle.  All rights reserved.
  3 *
  4 * Support for backward direction RPCs on RPC/RDMA (server-side).
  5 */
  6
  7#include <linux/sunrpc/svc_rdma.h>
  8#include "xprt_rdma.h"
  9
 10#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 11
 12#undef SVCRDMA_BACKCHANNEL_DEBUG
 13
 14int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
 15			     struct xdr_buf *rcvbuf)
 16{
 17	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 18	struct kvec *dst, *src = &rcvbuf->head[0];
 19	struct rpc_rqst *req;
 20	unsigned long cwnd;
 21	u32 credits;
 22	size_t len;
 23	__be32 xid;
 24	__be32 *p;
 25	int ret;
 26
 27	p = (__be32 *)src->iov_base;
 28	len = src->iov_len;
 29	xid = rmsgp->rm_xid;
 30
 31#ifdef SVCRDMA_BACKCHANNEL_DEBUG
 32	pr_info("%s: xid=%08x, length=%zu\n",
 33		__func__, be32_to_cpu(xid), len);
 34	pr_info("%s: RPC/RDMA: %*ph\n",
 35		__func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp);
 36	pr_info("%s:      RPC: %*ph\n",
 37		__func__, (int)len, p);
 38#endif
 39
 40	ret = -EAGAIN;
 41	if (src->iov_len < 24)
 42		goto out_shortreply;
 43
 44	spin_lock_bh(&xprt->transport_lock);
 45	req = xprt_lookup_rqst(xprt, xid);
 46	if (!req)
 47		goto out_notfound;
 48
 49	dst = &req->rq_private_buf.head[0];
 50	memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
 51	if (dst->iov_len < len)
 52		goto out_unlock;
 53	memcpy(dst->iov_base, p, len);
 54
 55	credits = be32_to_cpu(rmsgp->rm_credit);
 56	if (credits == 0)
 57		credits = 1;	/* don't deadlock */
 58	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
 59		credits = r_xprt->rx_buf.rb_bc_max_requests;
 60
 61	cwnd = xprt->cwnd;
 62	xprt->cwnd = credits << RPC_CWNDSHIFT;
 63	if (xprt->cwnd > cwnd)
 64		xprt_release_rqst_cong(req->rq_task);
 65
 66	ret = 0;
 67	xprt_complete_rqst(req->rq_task, rcvbuf->len);
 68	rcvbuf->len = 0;
 69
 70out_unlock:
 71	spin_unlock_bh(&xprt->transport_lock);
 72out:
 73	return ret;
 74
 75out_shortreply:
 76	dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
 77		xprt, src->iov_len);
 78	goto out;
 79
 80out_notfound:
 81	dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
 82		xprt, be32_to_cpu(xid));
 83
 84	goto out_unlock;
 85}
 86
 87/* Send a backwards direction RPC call.
 88 *
 89 * Caller holds the connection's mutex and has already marshaled
 90 * the RPC/RDMA request.
 91 *
 92 * This is similar to svc_rdma_reply, but takes an rpc_rqst
 93 * instead, does not support chunks, and avoids blocking memory
 94 * allocation.
 95 *
 96 * XXX: There is still an opportunity to block in svc_rdma_send()
 97 * if there are no SQ entries to post the Send. This may occur if
 98 * the adapter has a small maximum SQ depth.
 99 */
100static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
101			      struct rpc_rqst *rqst)
102{
103	struct xdr_buf *sndbuf = &rqst->rq_snd_buf;
104	struct svc_rdma_op_ctxt *ctxt;
105	struct svc_rdma_req_map *vec;
106	struct ib_send_wr send_wr;
107	int ret;
108
109	vec = svc_rdma_get_req_map(rdma);
110	ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
111	if (ret)
112		goto out_err;
113
114	ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
115	if (ret)
116		goto out_err;
117
118	ctxt = svc_rdma_get_context(rdma);
119	ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
120	ctxt->count = 1;
121
122	ctxt->direction = DMA_TO_DEVICE;
123	ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
124	ctxt->sge[0].length = sndbuf->len;
125	ctxt->sge[0].addr =
126	    ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
127			    sndbuf->len, DMA_TO_DEVICE);
128	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
129		ret = -EIO;
130		goto out_unmap;
131	}
132	atomic_inc(&rdma->sc_dma_used);
133
134	memset(&send_wr, 0, sizeof(send_wr));
135	ctxt->cqe.done = svc_rdma_wc_send;
136	send_wr.wr_cqe = &ctxt->cqe;
137	send_wr.sg_list = ctxt->sge;
138	send_wr.num_sge = 1;
139	send_wr.opcode = IB_WR_SEND;
140	send_wr.send_flags = IB_SEND_SIGNALED;
141
142	ret = svc_rdma_send(rdma, &send_wr);
143	if (ret) {
144		ret = -EIO;
145		goto out_unmap;
146	}
147
148out_err:
149	svc_rdma_put_req_map(rdma, vec);
150	dprintk("svcrdma: %s returns %d\n", __func__, ret);
151	return ret;
152
153out_unmap:
154	svc_rdma_unmap_dma(ctxt);
155	svc_rdma_put_context(ctxt, 1);
156	goto out_err;
157}
158
159/* Server-side transport endpoint wants a whole page for its send
160 * buffer. The client RPC code constructs the RPC header in this
161 * buffer before it invokes ->send_request.
162 *
163 * Returns NULL if there was a temporary allocation failure.
164 */
165static void *
166xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
167{
168	struct rpc_rqst *rqst = task->tk_rqstp;
169	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
170	struct svcxprt_rdma *rdma;
171	struct page *page;
172
173	rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
174
175	/* Prevent an infinite loop: try to make this case work */
176	if (size > PAGE_SIZE)
177		WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
178			  size);
179
180	page = alloc_page(RPCRDMA_DEF_GFP);
181	if (!page)
182		return NULL;
183
184	return page_address(page);
185}
186
187static void
188xprt_rdma_bc_free(void *buffer)
189{
190	/* No-op: ctxt and page have already been freed. */
191}
192
193static int
194rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
195{
196	struct rpc_xprt *xprt = rqst->rq_xprt;
197	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
198	struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)rqst->rq_buffer;
199	int rc;
200
201	/* Space in the send buffer for an RPC/RDMA header is reserved
202	 * via xprt->tsh_size.
203	 */
204	headerp->rm_xid = rqst->rq_xid;
205	headerp->rm_vers = rpcrdma_version;
206	headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
207	headerp->rm_type = rdma_msg;
208	headerp->rm_body.rm_chunks[0] = xdr_zero;
209	headerp->rm_body.rm_chunks[1] = xdr_zero;
210	headerp->rm_body.rm_chunks[2] = xdr_zero;
211
212#ifdef SVCRDMA_BACKCHANNEL_DEBUG
213	pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
214#endif
215
216	rc = svc_rdma_bc_sendto(rdma, rqst);
217	if (rc)
218		goto drop_connection;
219	return rc;
220
221drop_connection:
222	dprintk("svcrdma: failed to send bc call\n");
223	xprt_disconnect_done(xprt);
224	return -ENOTCONN;
225}
226
227/* Send an RPC call on the passive end of a transport
228 * connection.
229 */
230static int
231xprt_rdma_bc_send_request(struct rpc_task *task)
232{
233	struct rpc_rqst *rqst = task->tk_rqstp;
234	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
235	struct svcxprt_rdma *rdma;
236	int ret;
237
238	dprintk("svcrdma: sending bc call with xid: %08x\n",
239		be32_to_cpu(rqst->rq_xid));
240
241	if (!mutex_trylock(&sxprt->xpt_mutex)) {
242		rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
243		if (!mutex_trylock(&sxprt->xpt_mutex))
244			return -EAGAIN;
245		rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
246	}
247
248	ret = -ENOTCONN;
249	rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
250	if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
251		ret = rpcrdma_bc_send_request(rdma, rqst);
252
253	mutex_unlock(&sxprt->xpt_mutex);
254
255	if (ret < 0)
256		return ret;
257	return 0;
258}
259
260static void
261xprt_rdma_bc_close(struct rpc_xprt *xprt)
262{
263	dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
264}
265
266static void
267xprt_rdma_bc_put(struct rpc_xprt *xprt)
268{
269	dprintk("svcrdma: %s: xprt %p\n", __func__, xprt);
270
271	xprt_free(xprt);
272	module_put(THIS_MODULE);
273}
274
275static struct rpc_xprt_ops xprt_rdma_bc_procs = {
276	.reserve_xprt		= xprt_reserve_xprt_cong,
277	.release_xprt		= xprt_release_xprt_cong,
278	.alloc_slot		= xprt_alloc_slot,
279	.release_request	= xprt_release_rqst_cong,
280	.buf_alloc		= xprt_rdma_bc_allocate,
281	.buf_free		= xprt_rdma_bc_free,
282	.send_request		= xprt_rdma_bc_send_request,
283	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
284	.close			= xprt_rdma_bc_close,
285	.destroy		= xprt_rdma_bc_put,
286	.print_stats		= xprt_rdma_print_stats
287};
288
289static const struct rpc_timeout xprt_rdma_bc_timeout = {
290	.to_initval = 60 * HZ,
291	.to_maxval = 60 * HZ,
292};
293
294/* It shouldn't matter if the number of backchannel session slots
295 * doesn't match the number of RPC/RDMA credits. That just means
296 * one or the other will have extra slots that aren't used.
297 */
298static struct rpc_xprt *
299xprt_setup_rdma_bc(struct xprt_create *args)
300{
301	struct rpc_xprt *xprt;
302	struct rpcrdma_xprt *new_xprt;
303
304	if (args->addrlen > sizeof(xprt->addr)) {
305		dprintk("RPC:       %s: address too large\n", __func__);
306		return ERR_PTR(-EBADF);
307	}
308
309	xprt = xprt_alloc(args->net, sizeof(*new_xprt),
310			  RPCRDMA_MAX_BC_REQUESTS,
311			  RPCRDMA_MAX_BC_REQUESTS);
312	if (!xprt) {
313		dprintk("RPC:       %s: couldn't allocate rpc_xprt\n",
314			__func__);
315		return ERR_PTR(-ENOMEM);
316	}
317
318	xprt->timeout = &xprt_rdma_bc_timeout;
319	xprt_set_bound(xprt);
320	xprt_set_connected(xprt);
321	xprt->bind_timeout = RPCRDMA_BIND_TO;
322	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
323	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
324
325	xprt->prot = XPRT_TRANSPORT_BC_RDMA;
326	xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
327	xprt->ops = &xprt_rdma_bc_procs;
328
329	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
330	xprt->addrlen = args->addrlen;
331	xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
332	xprt->resvport = 0;
333
334	xprt->max_payload = xprt_rdma_max_inline_read;
335
336	new_xprt = rpcx_to_rdmax(xprt);
337	new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
338
339	xprt_get(xprt);
340	args->bc_xprt->xpt_bc_xprt = xprt;
341	xprt->bc_xprt = args->bc_xprt;
342
343	if (!try_module_get(THIS_MODULE))
344		goto out_fail;
345
346	/* Final put for backchannel xprt is in __svc_rdma_free */
347	xprt_get(xprt);
348	return xprt;
349
350out_fail:
351	xprt_rdma_free_addresses(xprt);
352	args->bc_xprt->xpt_bc_xprt = NULL;
353	xprt_put(xprt);
354	xprt_free(xprt);
355	return ERR_PTR(-EINVAL);
356}
357
358struct xprt_class xprt_rdma_bc = {
359	.list			= LIST_HEAD_INIT(xprt_rdma_bc.list),
360	.name			= "rdma backchannel",
361	.owner			= THIS_MODULE,
362	.ident			= XPRT_TRANSPORT_BC_RDMA,
363	.setup			= xprt_setup_rdma_bc,
364};