Linux Audio

Check our new training course

Loading...
v4.6
 
  1/*
 
  2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
  3 *
  4 * This software is available to you under a choice of one of two
  5 * licenses.  You may choose to be licensed under the terms of the GNU
  6 * General Public License (GPL) Version 2, available from the file
  7 * COPYING in the main directory of this source tree, or the BSD-type
  8 * license below:
  9 *
 10 * Redistribution and use in source and binary forms, with or without
 11 * modification, are permitted provided that the following conditions
 12 * are met:
 13 *
 14 *      Redistributions of source code must retain the above copyright
 15 *      notice, this list of conditions and the following disclaimer.
 16 *
 17 *      Redistributions in binary form must reproduce the above
 18 *      copyright notice, this list of conditions and the following
 19 *      disclaimer in the documentation and/or other materials provided
 20 *      with the distribution.
 21 *
 22 *      Neither the name of the Network Appliance, Inc. nor the names of
 23 *      its contributors may be used to endorse or promote products
 24 *      derived from this software without specific prior written
 25 *      permission.
 26 *
 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 38 */
 39
 40/*
 41 * transport.c
 42 *
 43 * This file contains the top-level implementation of an RPC RDMA
 44 * transport.
 45 *
 46 * Naming convention: functions beginning with xprt_ are part of the
 47 * transport switch. All others are RPC RDMA internal.
 48 */
 49
 50#include <linux/module.h>
 51#include <linux/slab.h>
 52#include <linux/seq_file.h>
 
 
 53#include <linux/sunrpc/addr.h>
 
 54
 55#include "xprt_rdma.h"
 
 56
 57#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 58# define RPCDBG_FACILITY	RPCDBG_TRANS
 59#endif
 60
 61/*
 62 * tunables
 63 */
 64
 65static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
 66unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
 67static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
 68static unsigned int xprt_rdma_inline_write_padding;
 69static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
 70		int xprt_rdma_pad_optimize = 1;
 71
 72#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 73
 74static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
 75static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
 76static unsigned int zero;
 
 77static unsigned int max_padding = PAGE_SIZE;
 78static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
 79static unsigned int max_memreg = RPCRDMA_LAST - 1;
 
 80
 81static struct ctl_table_header *sunrpc_table_header;
 82
 83static struct ctl_table xr_tunables_table[] = {
 84	{
 85		.procname	= "rdma_slot_table_entries",
 86		.data		= &xprt_rdma_slot_table_entries,
 87		.maxlen		= sizeof(unsigned int),
 88		.mode		= 0644,
 89		.proc_handler	= proc_dointvec_minmax,
 90		.extra1		= &min_slot_table_size,
 91		.extra2		= &max_slot_table_size
 92	},
 93	{
 94		.procname	= "rdma_max_inline_read",
 95		.data		= &xprt_rdma_max_inline_read,
 96		.maxlen		= sizeof(unsigned int),
 97		.mode		= 0644,
 98		.proc_handler	= proc_dointvec,
 
 
 99	},
100	{
101		.procname	= "rdma_max_inline_write",
102		.data		= &xprt_rdma_max_inline_write,
103		.maxlen		= sizeof(unsigned int),
104		.mode		= 0644,
105		.proc_handler	= proc_dointvec,
 
 
106	},
107	{
108		.procname	= "rdma_inline_write_padding",
109		.data		= &xprt_rdma_inline_write_padding,
110		.maxlen		= sizeof(unsigned int),
111		.mode		= 0644,
112		.proc_handler	= proc_dointvec_minmax,
113		.extra1		= &zero,
114		.extra2		= &max_padding,
115	},
116	{
117		.procname	= "rdma_memreg_strategy",
118		.data		= &xprt_rdma_memreg_strategy,
119		.maxlen		= sizeof(unsigned int),
120		.mode		= 0644,
121		.proc_handler	= proc_dointvec_minmax,
122		.extra1		= &min_memreg,
123		.extra2		= &max_memreg,
124	},
125	{
126		.procname	= "rdma_pad_optimize",
127		.data		= &xprt_rdma_pad_optimize,
128		.maxlen		= sizeof(unsigned int),
129		.mode		= 0644,
130		.proc_handler	= proc_dointvec,
131	},
132	{ },
133};
134
135static struct ctl_table sunrpc_table[] = {
136	{
137		.procname	= "sunrpc",
138		.mode		= 0555,
139		.child		= xr_tunables_table
140	},
141	{ },
142};
143
144#endif
145
146static struct rpc_xprt_ops xprt_rdma_procs;	/*forward reference */
147
148static void
149xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
150{
151	struct sockaddr_in *sin = (struct sockaddr_in *)sap;
152	char buf[20];
153
154	snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
155	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
156
157	xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
158}
159
160static void
161xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
162{
163	struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
164	char buf[40];
165
166	snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
167	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
168
169	xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
170}
171
172void
173xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
174{
175	char buf[128];
176
177	switch (sap->sa_family) {
178	case AF_INET:
179		xprt_rdma_format_addresses4(xprt, sap);
180		break;
181	case AF_INET6:
182		xprt_rdma_format_addresses6(xprt, sap);
183		break;
184	default:
185		pr_err("rpcrdma: Unrecognized address family\n");
186		return;
187	}
188
189	(void)rpc_ntop(sap, buf, sizeof(buf));
190	xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
191
192	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
193	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
194
195	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
196	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
197
198	xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
199}
200
201void
202xprt_rdma_free_addresses(struct rpc_xprt *xprt)
203{
204	unsigned int i;
205
206	for (i = 0; i < RPC_DISPLAY_MAX; i++)
207		switch (i) {
208		case RPC_DISPLAY_PROTO:
209		case RPC_DISPLAY_NETID:
210			continue;
211		default:
212			kfree(xprt->address_strings[i]);
213		}
214}
215
 
 
 
 
 
 
 
 
216static void
217xprt_rdma_connect_worker(struct work_struct *work)
218{
219	struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
220						   rx_connect_worker.work);
221	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
222	int rc = 0;
223
224	xprt_clear_connected(xprt);
225
226	dprintk("RPC:       %s: %sconnect\n", __func__,
227			r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
228	rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
229	if (rc)
230		xprt_wake_pending_tasks(xprt, rc);
231
232	dprintk("RPC:       %s: exit\n", __func__);
233	xprt_clear_connecting(xprt);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234}
235
 
 
 
 
 
 
 
236static void
237xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
238{
239	struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
240						   rx_xprt);
241
242	pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt);
243	rdma_disconnect(r_xprt->rx_ia.ri_id);
244}
245
246/*
247 * xprt_rdma_destroy
 
248 *
249 * Destroy the xprt.
250 * Free all memory associated with the object, including its own.
251 * NOTE: none of the *destroy methods free memory for their top-level
252 * objects, even though they may have allocated it (they do free
253 * private memory). It's up to the caller to handle it. In this
254 * case (RDMA transport), all structure memory is inlined with the
255 * struct rpcrdma_xprt.
256 */
257static void
258xprt_rdma_destroy(struct rpc_xprt *xprt)
259{
260	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
261
262	dprintk("RPC:       %s: called\n", __func__);
263
264	cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
265
266	xprt_clear_connected(xprt);
267
268	rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
269	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
270	rpcrdma_ia_close(&r_xprt->rx_ia);
271
272	xprt_rdma_free_addresses(xprt);
273
274	xprt_free(xprt);
275
276	dprintk("RPC:       %s: returning\n", __func__);
277
278	module_put(THIS_MODULE);
279}
280
 
281static const struct rpc_timeout xprt_rdma_default_timeout = {
282	.to_initval = 60 * HZ,
283	.to_maxval = 60 * HZ,
284};
285
286/**
287 * xprt_setup_rdma - Set up transport to use RDMA
288 *
289 * @args: rpc transport arguments
290 */
291static struct rpc_xprt *
292xprt_setup_rdma(struct xprt_create *args)
293{
294	struct rpcrdma_create_data_internal cdata;
295	struct rpc_xprt *xprt;
296	struct rpcrdma_xprt *new_xprt;
297	struct rpcrdma_ep *new_ep;
298	struct sockaddr *sap;
299	int rc;
300
301	if (args->addrlen > sizeof(xprt->addr)) {
302		dprintk("RPC:       %s: address too large\n", __func__);
303		return ERR_PTR(-EBADF);
304	}
305
306	xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
307			xprt_rdma_slot_table_entries,
308			xprt_rdma_slot_table_entries);
309	if (xprt == NULL) {
310		dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
311			__func__);
 
312		return ERR_PTR(-ENOMEM);
313	}
314
315	/* 60 second timeout, no retries */
316	xprt->timeout = &xprt_rdma_default_timeout;
 
 
317	xprt->bind_timeout = RPCRDMA_BIND_TO;
318	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
319	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
320
321	xprt->resvport = 0;		/* privileged port not needed */
322	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */
323	xprt->ops = &xprt_rdma_procs;
324
325	/*
326	 * Set up RDMA-specific connect data.
327	 */
328
329	sap = (struct sockaddr *)&cdata.addr;
330	memcpy(sap, args->dstaddr, args->addrlen);
331
332	/* Ensure xprt->addr holds valid server TCP (not RDMA)
333	 * address, for any side protocols which peek at it */
334	xprt->prot = IPPROTO_TCP;
335	xprt->addrlen = args->addrlen;
336	memcpy(&xprt->addr, sap, xprt->addrlen);
337
338	if (rpc_get_port(sap))
339		xprt_set_bound(xprt);
 
340
341	cdata.max_requests = xprt->max_reqs;
 
 
 
 
 
 
 
342
343	cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
344	cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
345
346	cdata.inline_wsize = xprt_rdma_max_inline_write;
347	if (cdata.inline_wsize > cdata.wsize)
348		cdata.inline_wsize = cdata.wsize;
349
350	cdata.inline_rsize = xprt_rdma_max_inline_read;
351	if (cdata.inline_rsize > cdata.rsize)
352		cdata.inline_rsize = cdata.rsize;
353
354	cdata.padding = xprt_rdma_inline_write_padding;
 
 
 
 
 
 
 
 
 
 
 
355
356	/*
357	 * Create new transport instance, which includes initialized
358	 *  o ia
359	 *  o endpoint
360	 *  o buffers
361	 */
362
363	new_xprt = rpcx_to_rdmax(xprt);
 
 
 
364
365	rc = rpcrdma_ia_open(new_xprt, sap, xprt_rdma_memreg_strategy);
366	if (rc)
367		goto out1;
 
 
 
 
 
 
 
 
 
368
369	/*
370	 * initialize and create ep
371	 */
372	new_xprt->rx_data = cdata;
373	new_ep = &new_xprt->rx_ep;
374	new_ep->rep_remote_addr = cdata.addr;
375
376	rc = rpcrdma_ep_create(&new_xprt->rx_ep,
377				&new_xprt->rx_ia, &new_xprt->rx_data);
378	if (rc)
379		goto out2;
380
381	/*
382	 * Allocate pre-registered send and receive buffers for headers and
383	 * any inline data. Also specify any padding which will be provided
384	 * from a preregistered zero buffer.
385	 */
386	rc = rpcrdma_buffer_create(new_xprt);
387	if (rc)
388		goto out3;
389
390	/*
391	 * Register a callback for connection events. This is necessary because
392	 * connection loss notification is async. We also catch connection loss
393	 * when reaping receives.
394	 */
395	INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
396			  xprt_rdma_connect_worker);
397
398	xprt_rdma_format_addresses(xprt, sap);
399	xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
400	if (xprt->max_payload == 0)
401		goto out4;
402	xprt->max_payload <<= PAGE_SHIFT;
403	dprintk("RPC:       %s: transport data payload maximum: %zu bytes\n",
404		__func__, xprt->max_payload);
 
 
 
 
 
 
 
 
 
 
 
405
406	if (!try_module_get(THIS_MODULE))
407		goto out4;
 
 
 
 
 
 
 
 
 
 
408
409	dprintk("RPC:       %s: %s:%s\n", __func__,
410		xprt->address_strings[RPC_DISPLAY_ADDR],
411		xprt->address_strings[RPC_DISPLAY_PORT]);
412	return xprt;
413
414out4:
415	xprt_rdma_free_addresses(xprt);
416	rc = -EINVAL;
417out3:
418	rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
419out2:
420	rpcrdma_ia_close(&new_xprt->rx_ia);
421out1:
422	xprt_free(xprt);
423	return ERR_PTR(rc);
 
 
 
 
 
 
 
 
 
 
 
424}
425
426/*
427 * Close a connection, during shutdown or timeout/reconnect
 
 
 
428 */
429static void
430xprt_rdma_close(struct rpc_xprt *xprt)
431{
432	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 
 
433
434	dprintk("RPC:       %s: closing\n", __func__);
435	if (r_xprt->rx_ep.rep_connected > 0)
436		xprt->reestablish_timeout = 0;
437	xprt_disconnect_done(xprt);
438	rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
 
 
 
439}
440
 
 
 
 
 
 
 
 
 
441static void
442xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
443{
444	struct sockaddr_in *sap;
 
445
446	sap = (struct sockaddr_in *)&xprt->addr;
447	sap->sin_port = htons(port);
448	sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
449	sap->sin_port = htons(port);
450	dprintk("RPC:       %s: %u\n", __func__, port);
 
 
 
 
 
 
451}
452
 
 
 
 
 
 
453static void
454xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
455{
456	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 
457
458	if (r_xprt->rx_ep.rep_connected != 0) {
459		/* Reconnect */
460		schedule_delayed_work(&r_xprt->rx_connect_worker,
461				      xprt->reestablish_timeout);
462		xprt->reestablish_timeout <<= 1;
463		if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
464			xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
465		else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
466			xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
467	} else {
468		schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
469		if (!RPC_IS_ASYNC(task))
470			flush_delayed_work(&r_xprt->rx_connect_worker);
 
471	}
 
472}
473
474/*
475 * The RDMA allocate/free functions need the task structure as a place
476 * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
477 * sequence.
478 *
479 * The RPC layer allocates both send and receive buffers in the same call
480 * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
481 * We may register rq_rcv_buf when using reply chunks.
482 */
483static void *
484xprt_rdma_allocate(struct rpc_task *task, size_t size)
485{
486	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
487	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
488	struct rpcrdma_regbuf *rb;
489	struct rpcrdma_req *req;
490	size_t min_size;
491	gfp_t flags;
492
493	req = rpcrdma_buffer_get(&r_xprt->rx_buf);
494	if (req == NULL)
495		return NULL;
496
497	flags = RPCRDMA_DEF_GFP;
498	if (RPC_IS_SWAPPER(task))
499		flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
500
501	if (req->rl_rdmabuf == NULL)
502		goto out_rdmabuf;
503	if (req->rl_sendbuf == NULL)
504		goto out_sendbuf;
505	if (size > req->rl_sendbuf->rg_size)
506		goto out_sendbuf;
507
508out:
509	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
510	req->rl_connect_cookie = 0;	/* our reserved value */
511	return req->rl_sendbuf->rg_base;
512
513out_rdmabuf:
514	min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
515	rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
516	if (IS_ERR(rb))
517		goto out_fail;
518	req->rl_rdmabuf = rb;
519
520out_sendbuf:
521	/* XDR encoding and RPC/RDMA marshaling of this request has not
522	 * yet occurred. Thus a lower bound is needed to prevent buffer
523	 * overrun during marshaling.
524	 *
525	 * RPC/RDMA marshaling may choose to send payload bearing ops
526	 * inline, if the result is smaller than the inline threshold.
527	 * The value of the "size" argument accounts for header
528	 * requirements but not for the payload in these cases.
529	 *
530	 * Likewise, allocate enough space to receive a reply up to the
531	 * size of the inline threshold.
532	 *
533	 * It's unlikely that both the send header and the received
534	 * reply will be large, but slush is provided here to allow
535	 * flexibility when marshaling.
536	 */
537	min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
538	min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
539	if (size < min_size)
540		size = min_size;
541
542	rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
543	if (IS_ERR(rb))
544		goto out_fail;
545	rb->rg_owner = req;
546
547	r_xprt->rx_stats.hardway_register_count += size;
548	rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
549	req->rl_sendbuf = rb;
550	goto out;
551
552out_fail:
553	rpcrdma_buffer_put(req);
554	r_xprt->rx_stats.failed_marshal_count++;
555	return NULL;
556}
557
558/*
559 * This function returns all RDMA resources to the pool.
 
 
 
560 */
561static void
562xprt_rdma_free(void *buffer)
563{
564	struct rpcrdma_req *req;
565	struct rpcrdma_xprt *r_xprt;
566	struct rpcrdma_regbuf *rb;
567	int i;
568
569	if (buffer == NULL)
570		return;
571
572	rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
573	req = rb->rg_owner;
574	if (req->rl_backchannel)
575		return;
576
577	r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
578
579	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
580
581	for (i = 0; req->rl_nchunks;) {
582		--req->rl_nchunks;
583		i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
584						    &req->rl_segments[i]);
585	}
586
587	rpcrdma_buffer_put(req);
 
 
 
 
588}
589
590/*
591 * send_request invokes the meat of RPC RDMA. It must do the following:
592 *  1.  Marshal the RPC request into an RPC RDMA request, which means
593 *	putting a header in front of data, and creating IOVs for RDMA
594 *	from those in the request.
595 *  2.  In marshaling, detect opportunities for RDMA, and use them.
596 *  3.  Post a recv message to set up asynch completion, then send
597 *	the request (rpcrdma_ep_post).
598 *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
 
 
 
 
 
 
599 */
600
601static int
602xprt_rdma_send_request(struct rpc_task *task)
603{
604	struct rpc_rqst *rqst = task->tk_rqstp;
605	struct rpc_xprt *xprt = rqst->rq_xprt;
606	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
607	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
608	int rc = 0;
609
610	rc = rpcrdma_marshal_req(rqst);
 
 
 
 
 
 
 
 
 
 
 
611	if (rc < 0)
612		goto failed_marshal;
613
614	if (req->rl_reply == NULL) 		/* e.g. reconnection */
615		rpcrdma_recv_buffer_get(req);
616
617	/* Must suppress retransmit to maintain credits */
618	if (req->rl_connect_cookie == xprt->connect_cookie)
619		goto drop_connection;
620	req->rl_connect_cookie = xprt->connect_cookie;
621
622	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
623		goto drop_connection;
624
625	rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
626	rqst->rq_bytes_sent = 0;
 
 
 
 
 
627	return 0;
628
629failed_marshal:
630	r_xprt->rx_stats.failed_marshal_count++;
631	dprintk("RPC:       %s: rpcrdma_marshal_req failed, status %i\n",
632		__func__, rc);
633	if (rc == -EIO)
634		return -EIO;
635drop_connection:
636	xprt_disconnect_done(xprt);
637	return -ENOTCONN;	/* implies disconnect */
638}
639
640void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
641{
642	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
643	long idle_time = 0;
644
645	if (xprt_connected(xprt))
646		idle_time = (long)(jiffies - xprt->last_used) / HZ;
647
648	seq_puts(seq, "\txprt:\trdma ");
649	seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
650		   0,	/* need a local port? */
651		   xprt->stat.bind_count,
652		   xprt->stat.connect_count,
653		   xprt->stat.connect_time,
654		   idle_time,
655		   xprt->stat.sends,
656		   xprt->stat.recvs,
657		   xprt->stat.bad_xids,
658		   xprt->stat.req_u,
659		   xprt->stat.bklog_u);
660	seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n",
661		   r_xprt->rx_stats.read_chunk_count,
662		   r_xprt->rx_stats.write_chunk_count,
663		   r_xprt->rx_stats.reply_chunk_count,
664		   r_xprt->rx_stats.total_rdma_request,
665		   r_xprt->rx_stats.total_rdma_reply,
666		   r_xprt->rx_stats.pullup_copy_count,
667		   r_xprt->rx_stats.fixup_copy_count,
668		   r_xprt->rx_stats.hardway_register_count,
669		   r_xprt->rx_stats.failed_marshal_count,
670		   r_xprt->rx_stats.bad_reply_count,
671		   r_xprt->rx_stats.nomsg_call_count);
 
 
 
 
 
 
 
672}
673
674static int
675xprt_rdma_enable_swap(struct rpc_xprt *xprt)
676{
677	return 0;
678}
679
680static void
681xprt_rdma_disable_swap(struct rpc_xprt *xprt)
682{
683}
684
685/*
686 * Plumbing for rpc transport switch and kernel module
687 */
688
689static struct rpc_xprt_ops xprt_rdma_procs = {
690	.reserve_xprt		= xprt_reserve_xprt_cong,
691	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
692	.alloc_slot		= xprt_alloc_slot,
 
693	.release_request	= xprt_release_rqst_cong,       /* ditto */
694	.set_retrans_timeout	= xprt_set_retrans_timeout_def, /* ditto */
 
695	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */
696	.set_port		= xprt_rdma_set_port,
697	.connect		= xprt_rdma_connect,
698	.buf_alloc		= xprt_rdma_allocate,
699	.buf_free		= xprt_rdma_free,
700	.send_request		= xprt_rdma_send_request,
701	.close			= xprt_rdma_close,
702	.destroy		= xprt_rdma_destroy,
 
703	.print_stats		= xprt_rdma_print_stats,
704	.enable_swap		= xprt_rdma_enable_swap,
705	.disable_swap		= xprt_rdma_disable_swap,
706	.inject_disconnect	= xprt_rdma_inject_disconnect,
707#if defined(CONFIG_SUNRPC_BACKCHANNEL)
708	.bc_setup		= xprt_rdma_bc_setup,
709	.bc_up			= xprt_rdma_bc_up,
 
710	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
711	.bc_destroy		= xprt_rdma_bc_destroy,
712#endif
713};
714
715static struct xprt_class xprt_rdma = {
716	.list			= LIST_HEAD_INIT(xprt_rdma.list),
717	.name			= "rdma",
718	.owner			= THIS_MODULE,
719	.ident			= XPRT_TRANSPORT_RDMA,
720	.setup			= xprt_setup_rdma,
721};
722
723void xprt_rdma_cleanup(void)
724{
725	int rc;
726
727	dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
728#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
729	if (sunrpc_table_header) {
730		unregister_sysctl_table(sunrpc_table_header);
731		sunrpc_table_header = NULL;
732	}
733#endif
734	rc = xprt_unregister_transport(&xprt_rdma);
735	if (rc)
736		dprintk("RPC:       %s: xprt_unregister returned %i\n",
737			__func__, rc);
738
739	rpcrdma_destroy_wq();
740	frwr_destroy_recovery_wq();
741
742	rc = xprt_unregister_transport(&xprt_rdma_bc);
743	if (rc)
744		dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
745			__func__, rc);
746}
747
748int xprt_rdma_init(void)
749{
750	int rc;
751
752	rc = frwr_alloc_recovery_wq();
753	if (rc)
754		return rc;
755
756	rc = rpcrdma_alloc_wq();
757	if (rc) {
758		frwr_destroy_recovery_wq();
759		return rc;
760	}
761
762	rc = xprt_register_transport(&xprt_rdma);
763	if (rc) {
764		rpcrdma_destroy_wq();
765		frwr_destroy_recovery_wq();
766		return rc;
767	}
768
769	rc = xprt_register_transport(&xprt_rdma_bc);
770	if (rc) {
771		xprt_unregister_transport(&xprt_rdma);
772		rpcrdma_destroy_wq();
773		frwr_destroy_recovery_wq();
774		return rc;
775	}
776
777	dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
778
779	dprintk("Defaults:\n");
780	dprintk("\tSlots %d\n"
781		"\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
782		xprt_rdma_slot_table_entries,
783		xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
784	dprintk("\tPadding %d\n\tMemreg %d\n",
785		xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
786
787#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
788	if (!sunrpc_table_header)
789		sunrpc_table_header = register_sysctl_table(sunrpc_table);
790#endif
791	return 0;
792}
v5.9
  1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
  2/*
  3 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
  4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
  5 *
  6 * This software is available to you under a choice of one of two
  7 * licenses.  You may choose to be licensed under the terms of the GNU
  8 * General Public License (GPL) Version 2, available from the file
  9 * COPYING in the main directory of this source tree, or the BSD-type
 10 * license below:
 11 *
 12 * Redistribution and use in source and binary forms, with or without
 13 * modification, are permitted provided that the following conditions
 14 * are met:
 15 *
 16 *      Redistributions of source code must retain the above copyright
 17 *      notice, this list of conditions and the following disclaimer.
 18 *
 19 *      Redistributions in binary form must reproduce the above
 20 *      copyright notice, this list of conditions and the following
 21 *      disclaimer in the documentation and/or other materials provided
 22 *      with the distribution.
 23 *
 24 *      Neither the name of the Network Appliance, Inc. nor the names of
 25 *      its contributors may be used to endorse or promote products
 26 *      derived from this software without specific prior written
 27 *      permission.
 28 *
 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 40 */
 41
 42/*
 43 * transport.c
 44 *
 45 * This file contains the top-level implementation of an RPC RDMA
 46 * transport.
 47 *
 48 * Naming convention: functions beginning with xprt_ are part of the
 49 * transport switch. All others are RPC RDMA internal.
 50 */
 51
 52#include <linux/module.h>
 53#include <linux/slab.h>
 54#include <linux/seq_file.h>
 55#include <linux/smp.h>
 56
 57#include <linux/sunrpc/addr.h>
 58#include <linux/sunrpc/svc_rdma.h>
 59
 60#include "xprt_rdma.h"
 61#include <trace/events/rpcrdma.h>
 62
 63#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 64# define RPCDBG_FACILITY	RPCDBG_TRANS
 65#endif
 66
 67/*
 68 * tunables
 69 */
 70
 71static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
 72unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
 73unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
 74unsigned int xprt_rdma_memreg_strategy		= RPCRDMA_FRWR;
 75int xprt_rdma_pad_optimize;
 
 76
 77#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 78
 79static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
 80static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
 81static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
 82static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
 83static unsigned int max_padding = PAGE_SIZE;
 84static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
 85static unsigned int max_memreg = RPCRDMA_LAST - 1;
 86static unsigned int dummy;
 87
 88static struct ctl_table_header *sunrpc_table_header;
 89
 90static struct ctl_table xr_tunables_table[] = {
 91	{
 92		.procname	= "rdma_slot_table_entries",
 93		.data		= &xprt_rdma_slot_table_entries,
 94		.maxlen		= sizeof(unsigned int),
 95		.mode		= 0644,
 96		.proc_handler	= proc_dointvec_minmax,
 97		.extra1		= &min_slot_table_size,
 98		.extra2		= &max_slot_table_size
 99	},
100	{
101		.procname	= "rdma_max_inline_read",
102		.data		= &xprt_rdma_max_inline_read,
103		.maxlen		= sizeof(unsigned int),
104		.mode		= 0644,
105		.proc_handler	= proc_dointvec_minmax,
106		.extra1		= &min_inline_size,
107		.extra2		= &max_inline_size,
108	},
109	{
110		.procname	= "rdma_max_inline_write",
111		.data		= &xprt_rdma_max_inline_write,
112		.maxlen		= sizeof(unsigned int),
113		.mode		= 0644,
114		.proc_handler	= proc_dointvec_minmax,
115		.extra1		= &min_inline_size,
116		.extra2		= &max_inline_size,
117	},
118	{
119		.procname	= "rdma_inline_write_padding",
120		.data		= &dummy,
121		.maxlen		= sizeof(unsigned int),
122		.mode		= 0644,
123		.proc_handler	= proc_dointvec_minmax,
124		.extra1		= SYSCTL_ZERO,
125		.extra2		= &max_padding,
126	},
127	{
128		.procname	= "rdma_memreg_strategy",
129		.data		= &xprt_rdma_memreg_strategy,
130		.maxlen		= sizeof(unsigned int),
131		.mode		= 0644,
132		.proc_handler	= proc_dointvec_minmax,
133		.extra1		= &min_memreg,
134		.extra2		= &max_memreg,
135	},
136	{
137		.procname	= "rdma_pad_optimize",
138		.data		= &xprt_rdma_pad_optimize,
139		.maxlen		= sizeof(unsigned int),
140		.mode		= 0644,
141		.proc_handler	= proc_dointvec,
142	},
143	{ },
144};
145
146static struct ctl_table sunrpc_table[] = {
147	{
148		.procname	= "sunrpc",
149		.mode		= 0555,
150		.child		= xr_tunables_table
151	},
152	{ },
153};
154
155#endif
156
157static const struct rpc_xprt_ops xprt_rdma_procs;
158
159static void
160xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
161{
162	struct sockaddr_in *sin = (struct sockaddr_in *)sap;
163	char buf[20];
164
165	snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
166	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
167
168	xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
169}
170
171static void
172xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
173{
174	struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
175	char buf[40];
176
177	snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
178	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
179
180	xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
181}
182
183void
184xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap)
185{
186	char buf[128];
187
188	switch (sap->sa_family) {
189	case AF_INET:
190		xprt_rdma_format_addresses4(xprt, sap);
191		break;
192	case AF_INET6:
193		xprt_rdma_format_addresses6(xprt, sap);
194		break;
195	default:
196		pr_err("rpcrdma: Unrecognized address family\n");
197		return;
198	}
199
200	(void)rpc_ntop(sap, buf, sizeof(buf));
201	xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
202
203	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
204	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
205
206	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
207	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
208
209	xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
210}
211
212void
213xprt_rdma_free_addresses(struct rpc_xprt *xprt)
214{
215	unsigned int i;
216
217	for (i = 0; i < RPC_DISPLAY_MAX; i++)
218		switch (i) {
219		case RPC_DISPLAY_PROTO:
220		case RPC_DISPLAY_NETID:
221			continue;
222		default:
223			kfree(xprt->address_strings[i]);
224		}
225}
226
227/**
228 * xprt_rdma_connect_worker - establish connection in the background
229 * @work: worker thread context
230 *
231 * Requester holds the xprt's send lock to prevent activity on this
232 * transport while a fresh connection is being established. RPC tasks
233 * sleep on the xprt's pending queue waiting for connect to complete.
234 */
235static void
236xprt_rdma_connect_worker(struct work_struct *work)
237{
238	struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
239						   rx_connect_worker.work);
240	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
241	int rc;
 
 
 
 
 
 
 
 
242
243	rc = rpcrdma_xprt_connect(r_xprt);
244	xprt_clear_connecting(xprt);
245	if (!rc) {
246		xprt->connect_cookie++;
247		xprt->stat.connect_count++;
248		xprt->stat.connect_time += (long)jiffies -
249					   xprt->stat.connect_start;
250		xprt_set_connected(xprt);
251		rc = -EAGAIN;
252	} else {
253		/* Force a call to xprt_rdma_close to clean up */
254		spin_lock(&xprt->transport_lock);
255		set_bit(XPRT_CLOSE_WAIT, &xprt->state);
256		spin_unlock(&xprt->transport_lock);
257	}
258	xprt_wake_pending_tasks(xprt, rc);
259}
260
261/**
262 * xprt_rdma_inject_disconnect - inject a connection fault
263 * @xprt: transport context
264 *
265 * If @xprt is connected, disconnect it to simulate spurious connection
266 * loss.
267 */
268static void
269xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
270{
271	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 
272
273	trace_xprtrdma_op_inject_dsc(r_xprt);
274	rdma_disconnect(r_xprt->rx_ep->re_id);
275}
276
277/**
278 * xprt_rdma_destroy - Full tear down of transport
279 * @xprt: doomed transport context
280 *
281 * Caller guarantees there will be no more calls to us with
282 * this @xprt.
 
 
 
 
 
283 */
284static void
285xprt_rdma_destroy(struct rpc_xprt *xprt)
286{
287	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
288
 
 
289	cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
290
291	rpcrdma_xprt_disconnect(r_xprt);
 
 
292	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 
293
294	xprt_rdma_free_addresses(xprt);
 
295	xprt_free(xprt);
296
 
 
297	module_put(THIS_MODULE);
298}
299
300/* 60 second timeout, no retries */
301static const struct rpc_timeout xprt_rdma_default_timeout = {
302	.to_initval = 60 * HZ,
303	.to_maxval = 60 * HZ,
304};
305
306/**
307 * xprt_setup_rdma - Set up transport to use RDMA
308 *
309 * @args: rpc transport arguments
310 */
311static struct rpc_xprt *
312xprt_setup_rdma(struct xprt_create *args)
313{
 
314	struct rpc_xprt *xprt;
315	struct rpcrdma_xprt *new_xprt;
 
316	struct sockaddr *sap;
317	int rc;
318
319	if (args->addrlen > sizeof(xprt->addr))
 
320		return ERR_PTR(-EBADF);
 
321
322	if (!try_module_get(THIS_MODULE))
323		return ERR_PTR(-EIO);
324
325	xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0,
326			  xprt_rdma_slot_table_entries);
327	if (!xprt) {
328		module_put(THIS_MODULE);
329		return ERR_PTR(-ENOMEM);
330	}
331
 
332	xprt->timeout = &xprt_rdma_default_timeout;
333	xprt->connect_timeout = xprt->timeout->to_initval;
334	xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
335	xprt->bind_timeout = RPCRDMA_BIND_TO;
336	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
337	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
338
339	xprt->resvport = 0;		/* privileged port not needed */
 
340	xprt->ops = &xprt_rdma_procs;
341
342	/*
343	 * Set up RDMA-specific connect data.
344	 */
345	sap = args->dstaddr;
 
 
346
347	/* Ensure xprt->addr holds valid server TCP (not RDMA)
348	 * address, for any side protocols which peek at it */
349	xprt->prot = IPPROTO_TCP;
350	xprt->addrlen = args->addrlen;
351	memcpy(&xprt->addr, sap, xprt->addrlen);
352
353	if (rpc_get_port(sap))
354		xprt_set_bound(xprt);
355	xprt_rdma_format_addresses(xprt, sap);
356
357	new_xprt = rpcx_to_rdmax(xprt);
358	rc = rpcrdma_buffer_create(new_xprt);
359	if (rc) {
360		xprt_rdma_free_addresses(xprt);
361		xprt_free(xprt);
362		module_put(THIS_MODULE);
363		return ERR_PTR(rc);
364	}
365
366	INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
367			  xprt_rdma_connect_worker);
368
369	xprt->max_payload = RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
 
 
370
371	return xprt;
372}
 
373
374/**
375 * xprt_rdma_close - close a transport connection
376 * @xprt: transport context
377 *
378 * Called during autoclose or device removal.
379 *
380 * Caller holds @xprt's send lock to prevent activity on this
381 * transport while the connection is torn down.
382 */
383void xprt_rdma_close(struct rpc_xprt *xprt)
384{
385	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
386
387	rpcrdma_xprt_disconnect(r_xprt);
 
 
 
 
 
388
389	xprt->reestablish_timeout = 0;
390	++xprt->connect_cookie;
391	xprt_disconnect_done(xprt);
392}
393
394/**
395 * xprt_rdma_set_port - update server port with rpcbind result
396 * @xprt: controlling RPC transport
397 * @port: new port value
398 *
399 * Transport connect status is unchanged.
400 */
401static void
402xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
403{
404	struct sockaddr *sap = (struct sockaddr *)&xprt->addr;
405	char buf[8];
406
407	rpc_set_port(sap, port);
 
 
 
 
 
408
409	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
410	snprintf(buf, sizeof(buf), "%u", port);
411	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
 
412
413	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
414	snprintf(buf, sizeof(buf), "%4hx", port);
415	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
 
 
 
 
 
416
417	trace_xprtrdma_op_setport(container_of(xprt, struct rpcrdma_xprt,
418					       rx_xprt));
419}
 
 
 
 
420
421/**
422 * xprt_rdma_timer - invoked when an RPC times out
423 * @xprt: controlling RPC transport
424 * @task: RPC task that timed out
425 *
426 * Invoked when the transport is still connected, but an RPC
427 * retransmit timeout occurs.
428 *
429 * Since RDMA connections don't have a keep-alive, forcibly
430 * disconnect and retry to connect. This drives full
431 * detection of the network path, and retransmissions of
432 * all pending RPCs.
433 */
434static void
435xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
436{
437	xprt_force_disconnect(xprt);
438}
439
440/**
441 * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
442 * @xprt: controlling transport instance
443 * @connect_timeout: reconnect timeout after client disconnects
444 * @reconnect_timeout: reconnect timeout after server disconnects
445 *
446 */
447static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt,
448					  unsigned long connect_timeout,
449					  unsigned long reconnect_timeout)
450{
451	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
452
453	trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
 
 
 
454
455	spin_lock(&xprt->transport_lock);
456
457	if (connect_timeout < xprt->connect_timeout) {
458		struct rpc_timeout to;
459		unsigned long initval;
460
461		to = *xprt->timeout;
462		initval = connect_timeout;
463		if (initval < RPCRDMA_INIT_REEST_TO << 1)
464			initval = RPCRDMA_INIT_REEST_TO << 1;
465		to.to_initval = initval;
466		to.to_maxval = initval;
467		r_xprt->rx_timeout = to;
468		xprt->timeout = &r_xprt->rx_timeout;
469		xprt->connect_timeout = connect_timeout;
470	}
471
472	if (reconnect_timeout < xprt->max_reconnect_timeout)
473		xprt->max_reconnect_timeout = reconnect_timeout;
474
475	spin_unlock(&xprt->transport_lock);
476}
477
478/**
479 * xprt_rdma_connect - schedule an attempt to reconnect
480 * @xprt: transport state
481 * @task: RPC scheduler context (unused)
482 *
483 */
484static void
485xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
486{
487	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
488	struct rpcrdma_ep *ep = r_xprt->rx_ep;
489	unsigned long delay;
490
491	delay = 0;
492	if (ep && ep->re_connect_status != 0) {
493		delay = xprt_reconnect_delay(xprt);
494		xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
495	}
496	trace_xprtrdma_op_connect(r_xprt, delay);
497	queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
498			   delay);
499}
500
501/**
502 * xprt_rdma_alloc_slot - allocate an rpc_rqst
503 * @xprt: controlling RPC transport
504 * @task: RPC task requesting a fresh rpc_rqst
505 *
506 * tk_status values:
507 *	%0 if task->tk_rqstp points to a fresh rpc_rqst
508 *	%-EAGAIN if no rpc_rqst is available; queued on backlog
509 */
510static void
511xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
512{
513	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
514	struct rpcrdma_req *req;
515
516	req = rpcrdma_buffer_get(&r_xprt->rx_buf);
517	if (!req)
518		goto out_sleep;
519	task->tk_rqstp = &req->rl_slot;
520	task->tk_status = 0;
521	return;
522
523out_sleep:
524	set_bit(XPRT_CONGESTED, &xprt->state);
525	rpc_sleep_on(&xprt->backlog, task, NULL);
526	task->tk_status = -EAGAIN;
527}
528
529/**
530 * xprt_rdma_free_slot - release an rpc_rqst
531 * @xprt: controlling RPC transport
532 * @rqst: rpc_rqst to release
533 *
534 */
535static void
536xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
537{
538	struct rpcrdma_xprt *r_xprt =
539		container_of(xprt, struct rpcrdma_xprt, rx_xprt);
540
541	memset(rqst, 0, sizeof(*rqst));
542	rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
543	if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
544		clear_bit(XPRT_CONGESTED, &xprt->state);
545}
546
547static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
548				 struct rpcrdma_regbuf *rb, size_t size,
549				 gfp_t flags)
550{
551	if (unlikely(rdmab_length(rb) < size)) {
552		if (!rpcrdma_regbuf_realloc(rb, size, flags))
553			return false;
554		r_xprt->rx_stats.hardway_register_count += size;
555	}
556	return true;
557}
558
559/**
560 * xprt_rdma_allocate - allocate transport resources for an RPC
561 * @task: RPC task
562 *
563 * Return values:
564 *        0:	Success; rq_buffer points to RPC buffer to use
565 *   ENOMEM:	Out of memory, call again later
566 *      EIO:	A permanent error occurred, do not retry
567 */
568static int
569xprt_rdma_allocate(struct rpc_task *task)
570{
571	struct rpc_rqst *rqst = task->tk_rqstp;
572	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
573	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 
 
574	gfp_t flags;
575
 
 
 
 
576	flags = RPCRDMA_DEF_GFP;
577	if (RPC_IS_SWAPPER(task))
578		flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
579
580	if (!rpcrdma_check_regbuf(r_xprt, req->rl_sendbuf, rqst->rq_callsize,
581				  flags))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
582		goto out_fail;
583	if (!rpcrdma_check_regbuf(r_xprt, req->rl_recvbuf, rqst->rq_rcvsize,
584				  flags))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
585		goto out_fail;
 
586
587	rqst->rq_buffer = rdmab_data(req->rl_sendbuf);
588	rqst->rq_rbuffer = rdmab_data(req->rl_recvbuf);
589	trace_xprtrdma_op_allocate(task, req);
590	return 0;
591
592out_fail:
593	trace_xprtrdma_op_allocate(task, NULL);
594	return -ENOMEM;
 
595}
596
597/**
598 * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
599 * @task: RPC task
600 *
601 * Caller guarantees rqst->rq_buffer is non-NULL.
602 */
603static void
604xprt_rdma_free(struct rpc_task *task)
605{
606	struct rpc_rqst *rqst = task->tk_rqstp;
607	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
608	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 
 
 
 
 
 
 
 
 
 
 
609
610	trace_xprtrdma_op_free(task, req);
611
612	if (!list_empty(&req->rl_registered))
613		frwr_unmap_sync(r_xprt, req);
 
 
 
614
615	/* XXX: If the RPC is completing because of a signal and
616	 * not because a reply was received, we ought to ensure
617	 * that the Send completion has fired, so that memory
618	 * involved with the Send is not still visible to the NIC.
619	 */
620}
621
622/**
623 * xprt_rdma_send_request - marshal and send an RPC request
624 * @rqst: RPC message in rq_snd_buf
625 *
626 * Caller holds the transport's write lock.
627 *
628 * Returns:
629 *	%0 if the RPC message has been sent
630 *	%-ENOTCONN if the caller should reconnect and call again
631 *	%-EAGAIN if the caller should call again
632 *	%-ENOBUFS if the caller should call again after a delay
633 *	%-EMSGSIZE if encoding ran out of buffer space. The request
634 *		was not sent. Do not try to send this message again.
635 *	%-EIO if an I/O error occurred. The request was not sent.
636 *		Do not try to send this message again.
637 */
 
638static int
639xprt_rdma_send_request(struct rpc_rqst *rqst)
640{
 
641	struct rpc_xprt *xprt = rqst->rq_xprt;
642	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
643	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
644	int rc = 0;
645
646#if defined(CONFIG_SUNRPC_BACKCHANNEL)
647	if (unlikely(!rqst->rq_buffer))
648		return xprt_rdma_bc_send_reply(rqst);
649#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
650
651	if (!xprt_connected(xprt))
652		return -ENOTCONN;
653
654	if (!xprt_request_get_cong(xprt, rqst))
655		return -EBADSLT;
656
657	rc = rpcrdma_marshal_req(r_xprt, rqst);
658	if (rc < 0)
659		goto failed_marshal;
660
 
 
 
661	/* Must suppress retransmit to maintain credits */
662	if (rqst->rq_connect_cookie == xprt->connect_cookie)
663		goto drop_connection;
664	rqst->rq_xtime = ktime_get();
665
666	if (rpcrdma_post_sends(r_xprt, req))
667		goto drop_connection;
668
669	rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
670
671	/* An RPC with no reply will throw off credit accounting,
672	 * so drop the connection to reset the credit grant.
673	 */
674	if (!rpc_reply_expected(rqst->rq_task))
675		goto drop_connection;
676	return 0;
677
678failed_marshal:
679	if (rc != -ENOTCONN)
680		return rc;
 
 
 
681drop_connection:
682	xprt_rdma_close(xprt);
683	return -ENOTCONN;
684}
685
686void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
687{
688	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
689	long idle_time = 0;
690
691	if (xprt_connected(xprt))
692		idle_time = (long)(jiffies - xprt->last_used) / HZ;
693
694	seq_puts(seq, "\txprt:\trdma ");
695	seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ",
696		   0,	/* need a local port? */
697		   xprt->stat.bind_count,
698		   xprt->stat.connect_count,
699		   xprt->stat.connect_time / HZ,
700		   idle_time,
701		   xprt->stat.sends,
702		   xprt->stat.recvs,
703		   xprt->stat.bad_xids,
704		   xprt->stat.req_u,
705		   xprt->stat.bklog_u);
706	seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
707		   r_xprt->rx_stats.read_chunk_count,
708		   r_xprt->rx_stats.write_chunk_count,
709		   r_xprt->rx_stats.reply_chunk_count,
710		   r_xprt->rx_stats.total_rdma_request,
711		   r_xprt->rx_stats.total_rdma_reply,
712		   r_xprt->rx_stats.pullup_copy_count,
713		   r_xprt->rx_stats.fixup_copy_count,
714		   r_xprt->rx_stats.hardway_register_count,
715		   r_xprt->rx_stats.failed_marshal_count,
716		   r_xprt->rx_stats.bad_reply_count,
717		   r_xprt->rx_stats.nomsg_call_count);
718	seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
719		   r_xprt->rx_stats.mrs_recycled,
720		   r_xprt->rx_stats.mrs_orphaned,
721		   r_xprt->rx_stats.mrs_allocated,
722		   r_xprt->rx_stats.local_inv_needed,
723		   r_xprt->rx_stats.empty_sendctx_q,
724		   r_xprt->rx_stats.reply_waits_for_send);
725}
726
727static int
728xprt_rdma_enable_swap(struct rpc_xprt *xprt)
729{
730	return 0;
731}
732
733static void
734xprt_rdma_disable_swap(struct rpc_xprt *xprt)
735{
736}
737
738/*
739 * Plumbing for rpc transport switch and kernel module
740 */
741
742static const struct rpc_xprt_ops xprt_rdma_procs = {
743	.reserve_xprt		= xprt_reserve_xprt_cong,
744	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
745	.alloc_slot		= xprt_rdma_alloc_slot,
746	.free_slot		= xprt_rdma_free_slot,
747	.release_request	= xprt_release_rqst_cong,       /* ditto */
748	.wait_for_reply_request	= xprt_wait_for_reply_request_def, /* ditto */
749	.timer			= xprt_rdma_timer,
750	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */
751	.set_port		= xprt_rdma_set_port,
752	.connect		= xprt_rdma_connect,
753	.buf_alloc		= xprt_rdma_allocate,
754	.buf_free		= xprt_rdma_free,
755	.send_request		= xprt_rdma_send_request,
756	.close			= xprt_rdma_close,
757	.destroy		= xprt_rdma_destroy,
758	.set_connect_timeout	= xprt_rdma_set_connect_timeout,
759	.print_stats		= xprt_rdma_print_stats,
760	.enable_swap		= xprt_rdma_enable_swap,
761	.disable_swap		= xprt_rdma_disable_swap,
762	.inject_disconnect	= xprt_rdma_inject_disconnect,
763#if defined(CONFIG_SUNRPC_BACKCHANNEL)
764	.bc_setup		= xprt_rdma_bc_setup,
765	.bc_maxpayload		= xprt_rdma_bc_maxpayload,
766	.bc_num_slots		= xprt_rdma_bc_max_slots,
767	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
768	.bc_destroy		= xprt_rdma_bc_destroy,
769#endif
770};
771
772static struct xprt_class xprt_rdma = {
773	.list			= LIST_HEAD_INIT(xprt_rdma.list),
774	.name			= "rdma",
775	.owner			= THIS_MODULE,
776	.ident			= XPRT_TRANSPORT_RDMA,
777	.setup			= xprt_setup_rdma,
778};
779
780void xprt_rdma_cleanup(void)
781{
 
 
 
782#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
783	if (sunrpc_table_header) {
784		unregister_sysctl_table(sunrpc_table_header);
785		sunrpc_table_header = NULL;
786	}
787#endif
 
 
 
 
 
 
 
788
789	xprt_unregister_transport(&xprt_rdma);
790	xprt_unregister_transport(&xprt_rdma_bc);
 
 
791}
792
793int xprt_rdma_init(void)
794{
795	int rc;
796
 
 
 
 
 
 
 
 
 
 
797	rc = xprt_register_transport(&xprt_rdma);
798	if (rc)
 
 
799		return rc;
 
800
801	rc = xprt_register_transport(&xprt_rdma_bc);
802	if (rc) {
803		xprt_unregister_transport(&xprt_rdma);
 
 
804		return rc;
805	}
 
 
 
 
 
 
 
 
 
 
806
807#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
808	if (!sunrpc_table_header)
809		sunrpc_table_header = register_sysctl_table(sunrpc_table);
810#endif
811	return 0;
812}