Linux Audio

Check our new training course

Loading...
v6.8
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3
  4(c) 2007 Network Appliance, Inc.  All Rights Reserved.
  5(c) 2009 NetApp.  All Rights Reserved.
  6
  7
  8******************************************************************************/
  9
 10#include <linux/tcp.h>
 11#include <linux/slab.h>
 12#include <linux/sunrpc/xprt.h>
 13#include <linux/export.h>
 14#include <linux/sunrpc/bc_xprt.h>
 15
 16#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 17#define RPCDBG_FACILITY	RPCDBG_TRANS
 18#endif
 19
 20#define BC_MAX_SLOTS	64U
 21
 22unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
 23{
 24	return BC_MAX_SLOTS;
 25}
 26
 27/*
 28 * Helper routines that track the number of preallocation elements
 29 * on the transport.
 30 */
 31static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
 32{
 33	return xprt->bc_alloc_count < xprt->bc_alloc_max;
 34}
 35
 36/*
 37 * Free the preallocated rpc_rqst structure and the memory
 38 * buffers hanging off of it.
 39 */
 40static void xprt_free_allocation(struct rpc_rqst *req)
 41{
 42	struct xdr_buf *xbufp;
 43
 44	dprintk("RPC:        free allocations for req= %p\n", req);
 45	WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
 46	xbufp = &req->rq_rcv_buf;
 47	free_page((unsigned long)xbufp->head[0].iov_base);
 48	xbufp = &req->rq_snd_buf;
 49	free_page((unsigned long)xbufp->head[0].iov_base);
 50	kfree(req);
 51}
 52
 53static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
 54{
 55	buf->head[0].iov_len = PAGE_SIZE;
 56	buf->tail[0].iov_len = 0;
 57	buf->pages = NULL;
 58	buf->page_len = 0;
 59	buf->flags = 0;
 60	buf->len = 0;
 61	buf->buflen = PAGE_SIZE;
 62}
 63
 64static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
 65{
 66	struct page *page;
 67	/* Preallocate one XDR receive buffer */
 68	page = alloc_page(gfp_flags);
 69	if (page == NULL)
 70		return -ENOMEM;
 71	xdr_buf_init(buf, page_address(page), PAGE_SIZE);
 72	return 0;
 73}
 74
 75static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
 76{
 77	gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
 78	struct rpc_rqst *req;
 79
 80	/* Pre-allocate one backchannel rpc_rqst */
 81	req = kzalloc(sizeof(*req), gfp_flags);
 82	if (req == NULL)
 83		return NULL;
 84
 85	req->rq_xprt = xprt;
 
 86
 87	/* Preallocate one XDR receive buffer */
 88	if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
 89		printk(KERN_ERR "Failed to create bc receive xbuf\n");
 90		goto out_free;
 91	}
 92	req->rq_rcv_buf.len = PAGE_SIZE;
 93
 94	/* Preallocate one XDR send buffer */
 95	if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
 96		printk(KERN_ERR "Failed to create bc snd xbuf\n");
 97		goto out_free;
 98	}
 99	return req;
100out_free:
101	xprt_free_allocation(req);
102	return NULL;
103}
104
105/*
106 * Preallocate up to min_reqs structures and related buffers for use
107 * by the backchannel.  This function can be called multiple times
108 * when creating new sessions that use the same rpc_xprt.  The
109 * preallocated buffers are added to the pool of resources used by
110 * the rpc_xprt.  Any one of these resources may be used by an
111 * incoming callback request.  It's up to the higher levels in the
112 * stack to enforce that the maximum number of session slots is not
113 * being exceeded.
114 *
115 * Some callback arguments can be large.  For example, a pNFS server
116 * using multiple deviceids.  The list can be unbound, but the client
117 * has the ability to tell the server the maximum size of the callback
118 * requests.  Each deviceID is 16 bytes, so allocate one page
119 * for the arguments to have enough room to receive a number of these
120 * deviceIDs.  The NFS client indicates to the pNFS server that its
121 * callback requests can be up to 4096 bytes in size.
122 */
123int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
124{
125	if (!xprt->ops->bc_setup)
126		return 0;
127	return xprt->ops->bc_setup(xprt, min_reqs);
128}
129EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
130
131int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
132{
133	struct rpc_rqst *req;
134	struct list_head tmp_list;
135	int i;
136
137	dprintk("RPC:       setup backchannel transport\n");
138
139	if (min_reqs > BC_MAX_SLOTS)
140		min_reqs = BC_MAX_SLOTS;
141
142	/*
143	 * We use a temporary list to keep track of the preallocated
144	 * buffers.  Once we're done building the list we splice it
145	 * into the backchannel preallocation list off of the rpc_xprt
146	 * struct.  This helps minimize the amount of time the list
147	 * lock is held on the rpc_xprt struct.  It also makes cleanup
148	 * easier in case of memory allocation errors.
149	 */
150	INIT_LIST_HEAD(&tmp_list);
151	for (i = 0; i < min_reqs; i++) {
152		/* Pre-allocate one backchannel rpc_rqst */
153		req = xprt_alloc_bc_req(xprt);
154		if (req == NULL) {
155			printk(KERN_ERR "Failed to create bc rpc_rqst\n");
156			goto out_free;
157		}
158
159		/* Add the allocated buffer to the tmp list */
160		dprintk("RPC:       adding req= %p\n", req);
161		list_add(&req->rq_bc_pa_list, &tmp_list);
162	}
163
164	/*
165	 * Add the temporary list to the backchannel preallocation list
166	 */
167	spin_lock(&xprt->bc_pa_lock);
168	list_splice(&tmp_list, &xprt->bc_pa_list);
169	xprt->bc_alloc_count += min_reqs;
170	xprt->bc_alloc_max += min_reqs;
171	atomic_add(min_reqs, &xprt->bc_slot_count);
172	spin_unlock(&xprt->bc_pa_lock);
173
174	dprintk("RPC:       setup backchannel transport done\n");
175	return 0;
176
177out_free:
178	/*
179	 * Memory allocation failed, free the temporary list
180	 */
181	while (!list_empty(&tmp_list)) {
182		req = list_first_entry(&tmp_list,
183				struct rpc_rqst,
184				rq_bc_pa_list);
185		list_del(&req->rq_bc_pa_list);
186		xprt_free_allocation(req);
187	}
188
189	dprintk("RPC:       setup backchannel transport failed\n");
190	return -ENOMEM;
191}
192
193/**
194 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
195 * @xprt:	the transport holding the preallocated strucures
196 * @max_reqs:	the maximum number of preallocated structures to destroy
197 *
198 * Since these structures may have been allocated by multiple calls
199 * to xprt_setup_backchannel, we only destroy up to the maximum number
200 * of reqs specified by the caller.
201 */
202void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
203{
204	if (xprt->ops->bc_destroy)
205		xprt->ops->bc_destroy(xprt, max_reqs);
206}
207EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
208
209void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
210{
211	struct rpc_rqst *req = NULL, *tmp = NULL;
212
213	dprintk("RPC:        destroy backchannel transport\n");
214
215	if (max_reqs == 0)
216		goto out;
217
218	spin_lock_bh(&xprt->bc_pa_lock);
219	xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
220	list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
221		dprintk("RPC:        req=%p\n", req);
222		list_del(&req->rq_bc_pa_list);
223		xprt_free_allocation(req);
224		xprt->bc_alloc_count--;
225		atomic_dec(&xprt->bc_slot_count);
226		if (--max_reqs == 0)
227			break;
228	}
229	spin_unlock_bh(&xprt->bc_pa_lock);
230
231out:
232	dprintk("RPC:        backchannel list empty= %s\n",
233		list_empty(&xprt->bc_pa_list) ? "true" : "false");
234}
235
236static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
237		struct rpc_rqst *new)
238{
239	struct rpc_rqst *req = NULL;
240
241	dprintk("RPC:       allocate a backchannel request\n");
242	if (list_empty(&xprt->bc_pa_list)) {
243		if (!new)
244			goto not_found;
245		if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
246			goto not_found;
247		list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
248		xprt->bc_alloc_count++;
249		atomic_inc(&xprt->bc_slot_count);
250	}
251	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
252				rq_bc_pa_list);
253	req->rq_reply_bytes_recvd = 0;
254	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
255			sizeof(req->rq_private_buf));
256	req->rq_xid = xid;
257	req->rq_connect_cookie = xprt->connect_cookie;
258	dprintk("RPC:       backchannel req=%p\n", req);
259not_found:
260	return req;
261}
262
263/*
264 * Return the preallocated rpc_rqst structure and XDR buffers
265 * associated with this rpc_task.
266 */
267void xprt_free_bc_request(struct rpc_rqst *req)
268{
269	struct rpc_xprt *xprt = req->rq_xprt;
270
271	xprt->ops->bc_free_rqst(req);
272}
273
274void xprt_free_bc_rqst(struct rpc_rqst *req)
275{
276	struct rpc_xprt *xprt = req->rq_xprt;
277
278	dprintk("RPC:       free backchannel req=%p\n", req);
279
280	req->rq_connect_cookie = xprt->connect_cookie - 1;
281	smp_mb__before_atomic();
282	clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
283	smp_mb__after_atomic();
284
285	/*
286	 * Return it to the list of preallocations so that it
287	 * may be reused by a new callback request.
288	 */
289	spin_lock_bh(&xprt->bc_pa_lock);
290	if (xprt_need_to_requeue(xprt)) {
291		xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
292		xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
293		req->rq_rcv_buf.len = PAGE_SIZE;
294		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
295		xprt->bc_alloc_count++;
296		atomic_inc(&xprt->bc_slot_count);
297		req = NULL;
298	}
299	spin_unlock_bh(&xprt->bc_pa_lock);
300	if (req != NULL) {
301		/*
302		 * The last remaining session was destroyed while this
303		 * entry was in use.  Free the entry and don't attempt
304		 * to add back to the list because there is no need to
305		 * have anymore preallocated entries.
306		 */
307		dprintk("RPC:       Last session removed req=%p\n", req);
308		xprt_free_allocation(req);
309	}
310	xprt_put(xprt);
311}
312
313/*
314 * One or more rpc_rqst structure have been preallocated during the
315 * backchannel setup.  Buffer space for the send and private XDR buffers
316 * has been preallocated as well.  Use xprt_alloc_bc_request to allocate
317 * to this request.  Use xprt_free_bc_request to return it.
318 *
319 * We know that we're called in soft interrupt context, grab the spin_lock
320 * since there is no need to grab the bottom half spin_lock.
321 *
322 * Return an available rpc_rqst, otherwise NULL if non are available.
323 */
324struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
325{
326	struct rpc_rqst *req, *new = NULL;
327
328	do {
329		spin_lock(&xprt->bc_pa_lock);
330		list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
331			if (req->rq_connect_cookie != xprt->connect_cookie)
332				continue;
333			if (req->rq_xid == xid)
334				goto found;
335		}
336		req = xprt_get_bc_request(xprt, xid, new);
337found:
338		spin_unlock(&xprt->bc_pa_lock);
339		if (new) {
340			if (req != new)
341				xprt_free_allocation(new);
342			break;
343		} else if (req)
344			break;
345		new = xprt_alloc_bc_req(xprt);
346	} while (new);
347	return req;
348}
349
350/*
351 * Add callback request to callback list.  Wake a thread
352 * on the first pool (usually the only pool) to handle it.
 
 
353 */
354void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
355{
356	struct rpc_xprt *xprt = req->rq_xprt;
357	struct svc_serv *bc_serv = xprt->bc_serv;
358
359	spin_lock(&xprt->bc_pa_lock);
360	list_del(&req->rq_bc_pa_list);
361	xprt->bc_alloc_count--;
362	spin_unlock(&xprt->bc_pa_lock);
363
364	req->rq_private_buf.len = copied;
365	set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
366
367	dprintk("RPC:       add callback request to list\n");
368	xprt_get(xprt);
369	lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list);
370	svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]);
 
 
371}
v6.2
  1// SPDX-License-Identifier: GPL-2.0-only
  2/******************************************************************************
  3
  4(c) 2007 Network Appliance, Inc.  All Rights Reserved.
  5(c) 2009 NetApp.  All Rights Reserved.
  6
  7
  8******************************************************************************/
  9
 10#include <linux/tcp.h>
 11#include <linux/slab.h>
 12#include <linux/sunrpc/xprt.h>
 13#include <linux/export.h>
 14#include <linux/sunrpc/bc_xprt.h>
 15
 16#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 17#define RPCDBG_FACILITY	RPCDBG_TRANS
 18#endif
 19
 20#define BC_MAX_SLOTS	64U
 21
 22unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
 23{
 24	return BC_MAX_SLOTS;
 25}
 26
 27/*
 28 * Helper routines that track the number of preallocation elements
 29 * on the transport.
 30 */
 31static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
 32{
 33	return xprt->bc_alloc_count < xprt->bc_alloc_max;
 34}
 35
 36/*
 37 * Free the preallocated rpc_rqst structure and the memory
 38 * buffers hanging off of it.
 39 */
 40static void xprt_free_allocation(struct rpc_rqst *req)
 41{
 42	struct xdr_buf *xbufp;
 43
 44	dprintk("RPC:        free allocations for req= %p\n", req);
 45	WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
 46	xbufp = &req->rq_rcv_buf;
 47	free_page((unsigned long)xbufp->head[0].iov_base);
 48	xbufp = &req->rq_snd_buf;
 49	free_page((unsigned long)xbufp->head[0].iov_base);
 50	kfree(req);
 51}
 52
 53static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
 54{
 55	buf->head[0].iov_len = PAGE_SIZE;
 56	buf->tail[0].iov_len = 0;
 57	buf->pages = NULL;
 58	buf->page_len = 0;
 59	buf->flags = 0;
 60	buf->len = 0;
 61	buf->buflen = PAGE_SIZE;
 62}
 63
 64static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
 65{
 66	struct page *page;
 67	/* Preallocate one XDR receive buffer */
 68	page = alloc_page(gfp_flags);
 69	if (page == NULL)
 70		return -ENOMEM;
 71	xdr_buf_init(buf, page_address(page), PAGE_SIZE);
 72	return 0;
 73}
 74
 75static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
 76{
 77	gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
 78	struct rpc_rqst *req;
 79
 80	/* Pre-allocate one backchannel rpc_rqst */
 81	req = kzalloc(sizeof(*req), gfp_flags);
 82	if (req == NULL)
 83		return NULL;
 84
 85	req->rq_xprt = xprt;
 86	INIT_LIST_HEAD(&req->rq_bc_list);
 87
 88	/* Preallocate one XDR receive buffer */
 89	if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
 90		printk(KERN_ERR "Failed to create bc receive xbuf\n");
 91		goto out_free;
 92	}
 93	req->rq_rcv_buf.len = PAGE_SIZE;
 94
 95	/* Preallocate one XDR send buffer */
 96	if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
 97		printk(KERN_ERR "Failed to create bc snd xbuf\n");
 98		goto out_free;
 99	}
100	return req;
101out_free:
102	xprt_free_allocation(req);
103	return NULL;
104}
105
106/*
107 * Preallocate up to min_reqs structures and related buffers for use
108 * by the backchannel.  This function can be called multiple times
109 * when creating new sessions that use the same rpc_xprt.  The
110 * preallocated buffers are added to the pool of resources used by
111 * the rpc_xprt.  Any one of these resources may be used by an
112 * incoming callback request.  It's up to the higher levels in the
113 * stack to enforce that the maximum number of session slots is not
114 * being exceeded.
115 *
116 * Some callback arguments can be large.  For example, a pNFS server
117 * using multiple deviceids.  The list can be unbound, but the client
118 * has the ability to tell the server the maximum size of the callback
119 * requests.  Each deviceID is 16 bytes, so allocate one page
120 * for the arguments to have enough room to receive a number of these
121 * deviceIDs.  The NFS client indicates to the pNFS server that its
122 * callback requests can be up to 4096 bytes in size.
123 */
124int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
125{
126	if (!xprt->ops->bc_setup)
127		return 0;
128	return xprt->ops->bc_setup(xprt, min_reqs);
129}
130EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
131
132int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
133{
134	struct rpc_rqst *req;
135	struct list_head tmp_list;
136	int i;
137
138	dprintk("RPC:       setup backchannel transport\n");
139
140	if (min_reqs > BC_MAX_SLOTS)
141		min_reqs = BC_MAX_SLOTS;
142
143	/*
144	 * We use a temporary list to keep track of the preallocated
145	 * buffers.  Once we're done building the list we splice it
146	 * into the backchannel preallocation list off of the rpc_xprt
147	 * struct.  This helps minimize the amount of time the list
148	 * lock is held on the rpc_xprt struct.  It also makes cleanup
149	 * easier in case of memory allocation errors.
150	 */
151	INIT_LIST_HEAD(&tmp_list);
152	for (i = 0; i < min_reqs; i++) {
153		/* Pre-allocate one backchannel rpc_rqst */
154		req = xprt_alloc_bc_req(xprt);
155		if (req == NULL) {
156			printk(KERN_ERR "Failed to create bc rpc_rqst\n");
157			goto out_free;
158		}
159
160		/* Add the allocated buffer to the tmp list */
161		dprintk("RPC:       adding req= %p\n", req);
162		list_add(&req->rq_bc_pa_list, &tmp_list);
163	}
164
165	/*
166	 * Add the temporary list to the backchannel preallocation list
167	 */
168	spin_lock(&xprt->bc_pa_lock);
169	list_splice(&tmp_list, &xprt->bc_pa_list);
170	xprt->bc_alloc_count += min_reqs;
171	xprt->bc_alloc_max += min_reqs;
172	atomic_add(min_reqs, &xprt->bc_slot_count);
173	spin_unlock(&xprt->bc_pa_lock);
174
175	dprintk("RPC:       setup backchannel transport done\n");
176	return 0;
177
178out_free:
179	/*
180	 * Memory allocation failed, free the temporary list
181	 */
182	while (!list_empty(&tmp_list)) {
183		req = list_first_entry(&tmp_list,
184				struct rpc_rqst,
185				rq_bc_pa_list);
186		list_del(&req->rq_bc_pa_list);
187		xprt_free_allocation(req);
188	}
189
190	dprintk("RPC:       setup backchannel transport failed\n");
191	return -ENOMEM;
192}
193
194/**
195 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
196 * @xprt:	the transport holding the preallocated strucures
197 * @max_reqs:	the maximum number of preallocated structures to destroy
198 *
199 * Since these structures may have been allocated by multiple calls
200 * to xprt_setup_backchannel, we only destroy up to the maximum number
201 * of reqs specified by the caller.
202 */
203void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
204{
205	if (xprt->ops->bc_destroy)
206		xprt->ops->bc_destroy(xprt, max_reqs);
207}
208EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
209
210void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
211{
212	struct rpc_rqst *req = NULL, *tmp = NULL;
213
214	dprintk("RPC:        destroy backchannel transport\n");
215
216	if (max_reqs == 0)
217		goto out;
218
219	spin_lock_bh(&xprt->bc_pa_lock);
220	xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
221	list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
222		dprintk("RPC:        req=%p\n", req);
223		list_del(&req->rq_bc_pa_list);
224		xprt_free_allocation(req);
225		xprt->bc_alloc_count--;
226		atomic_dec(&xprt->bc_slot_count);
227		if (--max_reqs == 0)
228			break;
229	}
230	spin_unlock_bh(&xprt->bc_pa_lock);
231
232out:
233	dprintk("RPC:        backchannel list empty= %s\n",
234		list_empty(&xprt->bc_pa_list) ? "true" : "false");
235}
236
237static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
238		struct rpc_rqst *new)
239{
240	struct rpc_rqst *req = NULL;
241
242	dprintk("RPC:       allocate a backchannel request\n");
243	if (list_empty(&xprt->bc_pa_list)) {
244		if (!new)
245			goto not_found;
246		if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
247			goto not_found;
248		list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
249		xprt->bc_alloc_count++;
250		atomic_inc(&xprt->bc_slot_count);
251	}
252	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
253				rq_bc_pa_list);
254	req->rq_reply_bytes_recvd = 0;
255	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
256			sizeof(req->rq_private_buf));
257	req->rq_xid = xid;
258	req->rq_connect_cookie = xprt->connect_cookie;
259	dprintk("RPC:       backchannel req=%p\n", req);
260not_found:
261	return req;
262}
263
264/*
265 * Return the preallocated rpc_rqst structure and XDR buffers
266 * associated with this rpc_task.
267 */
268void xprt_free_bc_request(struct rpc_rqst *req)
269{
270	struct rpc_xprt *xprt = req->rq_xprt;
271
272	xprt->ops->bc_free_rqst(req);
273}
274
275void xprt_free_bc_rqst(struct rpc_rqst *req)
276{
277	struct rpc_xprt *xprt = req->rq_xprt;
278
279	dprintk("RPC:       free backchannel req=%p\n", req);
280
281	req->rq_connect_cookie = xprt->connect_cookie - 1;
282	smp_mb__before_atomic();
283	clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
284	smp_mb__after_atomic();
285
286	/*
287	 * Return it to the list of preallocations so that it
288	 * may be reused by a new callback request.
289	 */
290	spin_lock_bh(&xprt->bc_pa_lock);
291	if (xprt_need_to_requeue(xprt)) {
292		xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
293		xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
294		req->rq_rcv_buf.len = PAGE_SIZE;
295		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
296		xprt->bc_alloc_count++;
297		atomic_inc(&xprt->bc_slot_count);
298		req = NULL;
299	}
300	spin_unlock_bh(&xprt->bc_pa_lock);
301	if (req != NULL) {
302		/*
303		 * The last remaining session was destroyed while this
304		 * entry was in use.  Free the entry and don't attempt
305		 * to add back to the list because there is no need to
306		 * have anymore preallocated entries.
307		 */
308		dprintk("RPC:       Last session removed req=%p\n", req);
309		xprt_free_allocation(req);
310	}
311	xprt_put(xprt);
312}
313
314/*
315 * One or more rpc_rqst structure have been preallocated during the
316 * backchannel setup.  Buffer space for the send and private XDR buffers
317 * has been preallocated as well.  Use xprt_alloc_bc_request to allocate
318 * to this request.  Use xprt_free_bc_request to return it.
319 *
320 * We know that we're called in soft interrupt context, grab the spin_lock
321 * since there is no need to grab the bottom half spin_lock.
322 *
323 * Return an available rpc_rqst, otherwise NULL if non are available.
324 */
325struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
326{
327	struct rpc_rqst *req, *new = NULL;
328
329	do {
330		spin_lock(&xprt->bc_pa_lock);
331		list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
332			if (req->rq_connect_cookie != xprt->connect_cookie)
333				continue;
334			if (req->rq_xid == xid)
335				goto found;
336		}
337		req = xprt_get_bc_request(xprt, xid, new);
338found:
339		spin_unlock(&xprt->bc_pa_lock);
340		if (new) {
341			if (req != new)
342				xprt_free_allocation(new);
343			break;
344		} else if (req)
345			break;
346		new = xprt_alloc_bc_req(xprt);
347	} while (new);
348	return req;
349}
350
351/*
352 * Add callback request to callback list.  The callback
353 * service sleeps on the sv_cb_waitq waiting for new
354 * requests.  Wake it up after adding enqueing the
355 * request.
356 */
357void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
358{
359	struct rpc_xprt *xprt = req->rq_xprt;
360	struct svc_serv *bc_serv = xprt->bc_serv;
361
362	spin_lock(&xprt->bc_pa_lock);
363	list_del(&req->rq_bc_pa_list);
364	xprt->bc_alloc_count--;
365	spin_unlock(&xprt->bc_pa_lock);
366
367	req->rq_private_buf.len = copied;
368	set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
369
370	dprintk("RPC:       add callback request to list\n");
371	xprt_get(xprt);
372	spin_lock(&bc_serv->sv_cb_lock);
373	list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
374	wake_up(&bc_serv->sv_cb_waitq);
375	spin_unlock(&bc_serv->sv_cb_lock);
376}