Linux Audio

Check our new training course

Loading...
  1/*
  2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  3 *
  4 * This software is available to you under a choice of one of two
  5 * licenses.  You may choose to be licensed under the terms of the GNU
  6 * General Public License (GPL) Version 2, available from the file
  7 * COPYING in the main directory of this source tree, or the BSD-type
  8 * license below:
  9 *
 10 * Redistribution and use in source and binary forms, with or without
 11 * modification, are permitted provided that the following conditions
 12 * are met:
 13 *
 14 *      Redistributions of source code must retain the above copyright
 15 *      notice, this list of conditions and the following disclaimer.
 16 *
 17 *      Redistributions in binary form must reproduce the above
 18 *      copyright notice, this list of conditions and the following
 19 *      disclaimer in the documentation and/or other materials provided
 20 *      with the distribution.
 21 *
 22 *      Neither the name of the Network Appliance, Inc. nor the names of
 23 *      its contributors may be used to endorse or promote products
 24 *      derived from this software without specific prior written
 25 *      permission.
 26 *
 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 38 *
 39 * Author: Tom Tucker <tom@opengridcomputing.com>
 40 */
 41
 42#include <linux/sunrpc/debug.h>
 43#include <linux/sunrpc/rpc_rdma.h>
 44#include <linux/spinlock.h>
 45#include <asm/unaligned.h>
 46#include <rdma/ib_verbs.h>
 47#include <rdma/rdma_cm.h>
 48#include <linux/sunrpc/svc_rdma.h>
 49
 50#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 51
 52/* Encode an XDR as an array of IB SGE
 53 *
 54 * Assumptions:
 55 * - head[0] is physically contiguous.
 56 * - tail[0] is physically contiguous.
 57 * - pages[] is not physically or virtually contiguous and consists of
 58 *   PAGE_SIZE elements.
 59 *
 60 * Output:
 61 * SGE[0]              reserved for RCPRDMA header
 62 * SGE[1]              data from xdr->head[]
 63 * SGE[2..sge_count-2] data from xdr->pages[]
 64 * SGE[sge_count-1]    data from xdr->tail.
 65 *
 66 * The max SGE we need is the length of the XDR / pagesize + one for
 67 * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES
 68 * reserves a page for both the request and the reply header, and this
 69 * array is only concerned with the reply we are assured that we have
 70 * on extra page for the RPCRMDA header.
 71 */
 72static int fast_reg_xdr(struct svcxprt_rdma *xprt,
 73			struct xdr_buf *xdr,
 74			struct svc_rdma_req_map *vec)
 75{
 76	int sge_no;
 77	u32 sge_bytes;
 78	u32 page_bytes;
 79	u32 page_off;
 80	int page_no = 0;
 81	u8 *frva;
 82	struct svc_rdma_fastreg_mr *frmr;
 83
 84	frmr = svc_rdma_get_frmr(xprt);
 85	if (IS_ERR(frmr))
 86		return -ENOMEM;
 87	vec->frmr = frmr;
 88
 89	/* Skip the RPCRDMA header */
 90	sge_no = 1;
 91
 92	/* Map the head. */
 93	frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
 94	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
 95	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
 96	vec->count = 2;
 97	sge_no++;
 98
 99	/* Map the XDR head */
100	frmr->kva = frva;
101	frmr->direction = DMA_TO_DEVICE;
102	frmr->access_flags = 0;
103	frmr->map_len = PAGE_SIZE;
104	frmr->page_list_len = 1;
105	page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
106	frmr->page_list->page_list[page_no] =
107		ib_dma_map_page(xprt->sc_cm_id->device,
108				virt_to_page(xdr->head[0].iov_base),
109				page_off,
110				PAGE_SIZE - page_off,
111				DMA_TO_DEVICE);
112	if (ib_dma_mapping_error(xprt->sc_cm_id->device,
113				 frmr->page_list->page_list[page_no]))
114		goto fatal_err;
115	atomic_inc(&xprt->sc_dma_used);
116
117	/* Map the XDR page list */
118	page_off = xdr->page_base;
119	page_bytes = xdr->page_len + page_off;
120	if (!page_bytes)
121		goto encode_tail;
122
123	/* Map the pages */
124	vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
125	vec->sge[sge_no].iov_len = page_bytes;
126	sge_no++;
127	while (page_bytes) {
128		struct page *page;
129
130		page = xdr->pages[page_no++];
131		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
132		page_bytes -= sge_bytes;
133
134		frmr->page_list->page_list[page_no] =
135			ib_dma_map_page(xprt->sc_cm_id->device,
136					page, page_off,
137					sge_bytes, DMA_TO_DEVICE);
138		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
139					 frmr->page_list->page_list[page_no]))
140			goto fatal_err;
141
142		atomic_inc(&xprt->sc_dma_used);
143		page_off = 0; /* reset for next time through loop */
144		frmr->map_len += PAGE_SIZE;
145		frmr->page_list_len++;
146	}
147	vec->count++;
148
149 encode_tail:
150	/* Map tail */
151	if (0 == xdr->tail[0].iov_len)
152		goto done;
153
154	vec->count++;
155	vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
156
157	if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
158	    ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
159		/*
160		 * If head and tail use the same page, we don't need
161		 * to map it again.
162		 */
163		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
164	} else {
165		void *va;
166
167		/* Map another page for the tail */
168		page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
169		va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
170		vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
171
172		frmr->page_list->page_list[page_no] =
173		    ib_dma_map_page(xprt->sc_cm_id->device, virt_to_page(va),
174				    page_off,
175				    PAGE_SIZE,
176				    DMA_TO_DEVICE);
177		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
178					 frmr->page_list->page_list[page_no]))
179			goto fatal_err;
180		atomic_inc(&xprt->sc_dma_used);
181		frmr->map_len += PAGE_SIZE;
182		frmr->page_list_len++;
183	}
184
185 done:
186	if (svc_rdma_fastreg(xprt, frmr))
187		goto fatal_err;
188
189	return 0;
190
191 fatal_err:
192	printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
193	vec->frmr = NULL;
194	svc_rdma_put_frmr(xprt, frmr);
195	return -EIO;
196}
197
198static int map_xdr(struct svcxprt_rdma *xprt,
199		   struct xdr_buf *xdr,
200		   struct svc_rdma_req_map *vec)
201{
202	int sge_no;
203	u32 sge_bytes;
204	u32 page_bytes;
205	u32 page_off;
206	int page_no;
207
208	BUG_ON(xdr->len !=
209	       (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
210
211	if (xprt->sc_frmr_pg_list_len)
212		return fast_reg_xdr(xprt, xdr, vec);
213
214	/* Skip the first sge, this is for the RPCRDMA header */
215	sge_no = 1;
216
217	/* Head SGE */
218	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
219	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
220	sge_no++;
221
222	/* pages SGE */
223	page_no = 0;
224	page_bytes = xdr->page_len;
225	page_off = xdr->page_base;
226	while (page_bytes) {
227		vec->sge[sge_no].iov_base =
228			page_address(xdr->pages[page_no]) + page_off;
229		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
230		page_bytes -= sge_bytes;
231		vec->sge[sge_no].iov_len = sge_bytes;
232
233		sge_no++;
234		page_no++;
235		page_off = 0; /* reset for next time through loop */
236	}
237
238	/* Tail SGE */
239	if (xdr->tail[0].iov_len) {
240		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
241		vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
242		sge_no++;
243	}
244
245	dprintk("svcrdma: map_xdr: sge_no %d page_no %d "
246		"page_base %u page_len %u head_len %zu tail_len %zu\n",
247		sge_no, page_no, xdr->page_base, xdr->page_len,
248		xdr->head[0].iov_len, xdr->tail[0].iov_len);
249
250	vec->count = sge_no;
251	return 0;
252}
253
254static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
255			      struct xdr_buf *xdr,
256			      u32 xdr_off, size_t len, int dir)
257{
258	struct page *page;
259	dma_addr_t dma_addr;
260	if (xdr_off < xdr->head[0].iov_len) {
261		/* This offset is in the head */
262		xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
263		page = virt_to_page(xdr->head[0].iov_base);
264	} else {
265		xdr_off -= xdr->head[0].iov_len;
266		if (xdr_off < xdr->page_len) {
267			/* This offset is in the page list */
268			page = xdr->pages[xdr_off >> PAGE_SHIFT];
269			xdr_off &= ~PAGE_MASK;
270		} else {
271			/* This offset is in the tail */
272			xdr_off -= xdr->page_len;
273			xdr_off += (unsigned long)
274				xdr->tail[0].iov_base & ~PAGE_MASK;
275			page = virt_to_page(xdr->tail[0].iov_base);
276		}
277	}
278	dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
279				   min_t(size_t, PAGE_SIZE, len), dir);
280	return dma_addr;
281}
282
283/* Assumptions:
284 * - We are using FRMR
285 *     - or -
286 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
287 */
288static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
289		      u32 rmr, u64 to,
290		      u32 xdr_off, int write_len,
291		      struct svc_rdma_req_map *vec)
292{
293	struct ib_send_wr write_wr;
294	struct ib_sge *sge;
295	int xdr_sge_no;
296	int sge_no;
297	int sge_bytes;
298	int sge_off;
299	int bc;
300	struct svc_rdma_op_ctxt *ctxt;
301
302	BUG_ON(vec->count > RPCSVC_MAXPAGES);
303	dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
304		"write_len=%d, vec->sge=%p, vec->count=%lu\n",
305		rmr, (unsigned long long)to, xdr_off,
306		write_len, vec->sge, vec->count);
307
308	ctxt = svc_rdma_get_context(xprt);
309	ctxt->direction = DMA_TO_DEVICE;
310	sge = ctxt->sge;
311
312	/* Find the SGE associated with xdr_off */
313	for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
314	     xdr_sge_no++) {
315		if (vec->sge[xdr_sge_no].iov_len > bc)
316			break;
317		bc -= vec->sge[xdr_sge_no].iov_len;
318	}
319
320	sge_off = bc;
321	bc = write_len;
322	sge_no = 0;
323
324	/* Copy the remaining SGE */
325	while (bc != 0) {
326		sge_bytes = min_t(size_t,
327			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);
328		sge[sge_no].length = sge_bytes;
329		if (!vec->frmr) {
330			sge[sge_no].addr =
331				dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
332					    sge_bytes, DMA_TO_DEVICE);
333			xdr_off += sge_bytes;
334			if (ib_dma_mapping_error(xprt->sc_cm_id->device,
335						 sge[sge_no].addr))
336				goto err;
337			atomic_inc(&xprt->sc_dma_used);
338			sge[sge_no].lkey = xprt->sc_dma_lkey;
339		} else {
340			sge[sge_no].addr = (unsigned long)
341				vec->sge[xdr_sge_no].iov_base + sge_off;
342			sge[sge_no].lkey = vec->frmr->mr->lkey;
343		}
344		ctxt->count++;
345		ctxt->frmr = vec->frmr;
346		sge_off = 0;
347		sge_no++;
348		xdr_sge_no++;
349		BUG_ON(xdr_sge_no > vec->count);
350		bc -= sge_bytes;
351	}
352
353	/* Prepare WRITE WR */
354	memset(&write_wr, 0, sizeof write_wr);
355	ctxt->wr_op = IB_WR_RDMA_WRITE;
356	write_wr.wr_id = (unsigned long)ctxt;
357	write_wr.sg_list = &sge[0];
358	write_wr.num_sge = sge_no;
359	write_wr.opcode = IB_WR_RDMA_WRITE;
360	write_wr.send_flags = IB_SEND_SIGNALED;
361	write_wr.wr.rdma.rkey = rmr;
362	write_wr.wr.rdma.remote_addr = to;
363
364	/* Post It */
365	atomic_inc(&rdma_stat_write);
366	if (svc_rdma_send(xprt, &write_wr))
367		goto err;
368	return 0;
369 err:
370	svc_rdma_unmap_dma(ctxt);
371	svc_rdma_put_frmr(xprt, vec->frmr);
372	svc_rdma_put_context(ctxt, 0);
373	/* Fatal error, close transport */
374	return -EIO;
375}
376
377static int send_write_chunks(struct svcxprt_rdma *xprt,
378			     struct rpcrdma_msg *rdma_argp,
379			     struct rpcrdma_msg *rdma_resp,
380			     struct svc_rqst *rqstp,
381			     struct svc_rdma_req_map *vec)
382{
383	u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
384	int write_len;
385	int max_write;
386	u32 xdr_off;
387	int chunk_off;
388	int chunk_no;
389	struct rpcrdma_write_array *arg_ary;
390	struct rpcrdma_write_array *res_ary;
391	int ret;
392
393	arg_ary = svc_rdma_get_write_array(rdma_argp);
394	if (!arg_ary)
395		return 0;
396	res_ary = (struct rpcrdma_write_array *)
397		&rdma_resp->rm_body.rm_chunks[1];
398
399	if (vec->frmr)
400		max_write = vec->frmr->map_len;
401	else
402		max_write = xprt->sc_max_sge * PAGE_SIZE;
403
404	/* Write chunks start at the pagelist */
405	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
406	     xfer_len && chunk_no < arg_ary->wc_nchunks;
407	     chunk_no++) {
408		struct rpcrdma_segment *arg_ch;
409		u64 rs_offset;
410
411		arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
412		write_len = min(xfer_len, ntohl(arg_ch->rs_length));
413
414		/* Prepare the response chunk given the length actually
415		 * written */
416		xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
417		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
418						arg_ch->rs_handle,
419						arg_ch->rs_offset,
420						write_len);
421		chunk_off = 0;
422		while (write_len) {
423			int this_write;
424			this_write = min(write_len, max_write);
425			ret = send_write(xprt, rqstp,
426					 ntohl(arg_ch->rs_handle),
427					 rs_offset + chunk_off,
428					 xdr_off,
429					 this_write,
430					 vec);
431			if (ret) {
432				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
433					ret);
434				return -EIO;
435			}
436			chunk_off += this_write;
437			xdr_off += this_write;
438			xfer_len -= this_write;
439			write_len -= this_write;
440		}
441	}
442	/* Update the req with the number of chunks actually used */
443	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
444
445	return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
446}
447
448static int send_reply_chunks(struct svcxprt_rdma *xprt,
449			     struct rpcrdma_msg *rdma_argp,
450			     struct rpcrdma_msg *rdma_resp,
451			     struct svc_rqst *rqstp,
452			     struct svc_rdma_req_map *vec)
453{
454	u32 xfer_len = rqstp->rq_res.len;
455	int write_len;
456	int max_write;
457	u32 xdr_off;
458	int chunk_no;
459	int chunk_off;
460	int nchunks;
461	struct rpcrdma_segment *ch;
462	struct rpcrdma_write_array *arg_ary;
463	struct rpcrdma_write_array *res_ary;
464	int ret;
465
466	arg_ary = svc_rdma_get_reply_array(rdma_argp);
467	if (!arg_ary)
468		return 0;
469	/* XXX: need to fix when reply lists occur with read-list and or
470	 * write-list */
471	res_ary = (struct rpcrdma_write_array *)
472		&rdma_resp->rm_body.rm_chunks[2];
473
474	if (vec->frmr)
475		max_write = vec->frmr->map_len;
476	else
477		max_write = xprt->sc_max_sge * PAGE_SIZE;
478
479	/* xdr offset starts at RPC message */
480	nchunks = ntohl(arg_ary->wc_nchunks);
481	for (xdr_off = 0, chunk_no = 0;
482	     xfer_len && chunk_no < nchunks;
483	     chunk_no++) {
484		u64 rs_offset;
485		ch = &arg_ary->wc_array[chunk_no].wc_target;
486		write_len = min(xfer_len, htonl(ch->rs_length));
487
488		/* Prepare the reply chunk given the length actually
489		 * written */
490		xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
491		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
492						ch->rs_handle, ch->rs_offset,
493						write_len);
494		chunk_off = 0;
495		while (write_len) {
496			int this_write;
497
498			this_write = min(write_len, max_write);
499			ret = send_write(xprt, rqstp,
500					 ntohl(ch->rs_handle),
501					 rs_offset + chunk_off,
502					 xdr_off,
503					 this_write,
504					 vec);
505			if (ret) {
506				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
507					ret);
508				return -EIO;
509			}
510			chunk_off += this_write;
511			xdr_off += this_write;
512			xfer_len -= this_write;
513			write_len -= this_write;
514		}
515	}
516	/* Update the req with the number of chunks actually used */
517	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
518
519	return rqstp->rq_res.len;
520}
521
522/* This function prepares the portion of the RPCRDMA message to be
523 * sent in the RDMA_SEND. This function is called after data sent via
524 * RDMA has already been transmitted. There are three cases:
525 * - The RPCRDMA header, RPC header, and payload are all sent in a
526 *   single RDMA_SEND. This is the "inline" case.
527 * - The RPCRDMA header and some portion of the RPC header and data
528 *   are sent via this RDMA_SEND and another portion of the data is
529 *   sent via RDMA.
530 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
531 *   header and data are all transmitted via RDMA.
532 * In all three cases, this function prepares the RPCRDMA header in
533 * sge[0], the 'type' parameter indicates the type to place in the
534 * RPCRDMA header, and the 'byte_count' field indicates how much of
535 * the XDR to include in this RDMA_SEND. NB: The offset of the payload
536 * to send is zero in the XDR.
537 */
538static int send_reply(struct svcxprt_rdma *rdma,
539		      struct svc_rqst *rqstp,
540		      struct page *page,
541		      struct rpcrdma_msg *rdma_resp,
542		      struct svc_rdma_op_ctxt *ctxt,
543		      struct svc_rdma_req_map *vec,
544		      int byte_count)
545{
546	struct ib_send_wr send_wr;
547	struct ib_send_wr inv_wr;
548	int sge_no;
549	int sge_bytes;
550	int page_no;
551	int ret;
552
553	/* Post a recv buffer to handle another request. */
554	ret = svc_rdma_post_recv(rdma);
555	if (ret) {
556		printk(KERN_INFO
557		       "svcrdma: could not post a receive buffer, err=%d."
558		       "Closing transport %p.\n", ret, rdma);
559		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
560		svc_rdma_put_frmr(rdma, vec->frmr);
561		svc_rdma_put_context(ctxt, 0);
562		return -ENOTCONN;
563	}
564
565	/* Prepare the context */
566	ctxt->pages[0] = page;
567	ctxt->count = 1;
568	ctxt->frmr = vec->frmr;
569	if (vec->frmr)
570		set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
571	else
572		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
573
574	/* Prepare the SGE for the RPCRDMA Header */
575	ctxt->sge[0].lkey = rdma->sc_dma_lkey;
576	ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
577	ctxt->sge[0].addr =
578	    ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
579			    ctxt->sge[0].length, DMA_TO_DEVICE);
580	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
581		goto err;
582	atomic_inc(&rdma->sc_dma_used);
583
584	ctxt->direction = DMA_TO_DEVICE;
585
586	/* Map the payload indicated by 'byte_count' */
587	for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
588		int xdr_off = 0;
589		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
590		byte_count -= sge_bytes;
591		if (!vec->frmr) {
592			ctxt->sge[sge_no].addr =
593				dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
594					    sge_bytes, DMA_TO_DEVICE);
595			xdr_off += sge_bytes;
596			if (ib_dma_mapping_error(rdma->sc_cm_id->device,
597						 ctxt->sge[sge_no].addr))
598				goto err;
599			atomic_inc(&rdma->sc_dma_used);
600			ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
601		} else {
602			ctxt->sge[sge_no].addr = (unsigned long)
603				vec->sge[sge_no].iov_base;
604			ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
605		}
606		ctxt->sge[sge_no].length = sge_bytes;
607	}
608	BUG_ON(byte_count != 0);
609
610	/* Save all respages in the ctxt and remove them from the
611	 * respages array. They are our pages until the I/O
612	 * completes.
613	 */
614	for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
615		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
616		ctxt->count++;
617		rqstp->rq_respages[page_no] = NULL;
618		/*
619		 * If there are more pages than SGE, terminate SGE
620		 * list so that svc_rdma_unmap_dma doesn't attempt to
621		 * unmap garbage.
622		 */
623		if (page_no+1 >= sge_no)
624			ctxt->sge[page_no+1].length = 0;
625	}
626	BUG_ON(sge_no > rdma->sc_max_sge);
627	memset(&send_wr, 0, sizeof send_wr);
628	ctxt->wr_op = IB_WR_SEND;
629	send_wr.wr_id = (unsigned long)ctxt;
630	send_wr.sg_list = ctxt->sge;
631	send_wr.num_sge = sge_no;
632	send_wr.opcode = IB_WR_SEND;
633	send_wr.send_flags =  IB_SEND_SIGNALED;
634	if (vec->frmr) {
635		/* Prepare INVALIDATE WR */
636		memset(&inv_wr, 0, sizeof inv_wr);
637		inv_wr.opcode = IB_WR_LOCAL_INV;
638		inv_wr.send_flags = IB_SEND_SIGNALED;
639		inv_wr.ex.invalidate_rkey =
640			vec->frmr->mr->lkey;
641		send_wr.next = &inv_wr;
642	}
643
644	ret = svc_rdma_send(rdma, &send_wr);
645	if (ret)
646		goto err;
647
648	return 0;
649
650 err:
651	svc_rdma_unmap_dma(ctxt);
652	svc_rdma_put_frmr(rdma, vec->frmr);
653	svc_rdma_put_context(ctxt, 1);
654	return -EIO;
655}
656
657void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
658{
659}
660
661/*
662 * Return the start of an xdr buffer.
663 */
664static void *xdr_start(struct xdr_buf *xdr)
665{
666	return xdr->head[0].iov_base -
667		(xdr->len -
668		 xdr->page_len -
669		 xdr->tail[0].iov_len -
670		 xdr->head[0].iov_len);
671}
672
673int svc_rdma_sendto(struct svc_rqst *rqstp)
674{
675	struct svc_xprt *xprt = rqstp->rq_xprt;
676	struct svcxprt_rdma *rdma =
677		container_of(xprt, struct svcxprt_rdma, sc_xprt);
678	struct rpcrdma_msg *rdma_argp;
679	struct rpcrdma_msg *rdma_resp;
680	struct rpcrdma_write_array *reply_ary;
681	enum rpcrdma_proc reply_type;
682	int ret;
683	int inline_bytes;
684	struct page *res_page;
685	struct svc_rdma_op_ctxt *ctxt;
686	struct svc_rdma_req_map *vec;
687
688	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
689
690	/* Get the RDMA request header. */
691	rdma_argp = xdr_start(&rqstp->rq_arg);
692
693	/* Build an req vec for the XDR */
694	ctxt = svc_rdma_get_context(rdma);
695	ctxt->direction = DMA_TO_DEVICE;
696	vec = svc_rdma_get_req_map();
697	ret = map_xdr(rdma, &rqstp->rq_res, vec);
698	if (ret)
699		goto err0;
700	inline_bytes = rqstp->rq_res.len;
701
702	/* Create the RDMA response header */
703	res_page = svc_rdma_get_page();
704	rdma_resp = page_address(res_page);
705	reply_ary = svc_rdma_get_reply_array(rdma_argp);
706	if (reply_ary)
707		reply_type = RDMA_NOMSG;
708	else
709		reply_type = RDMA_MSG;
710	svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
711					 rdma_resp, reply_type);
712
713	/* Send any write-chunk data and build resp write-list */
714	ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
715				rqstp, vec);
716	if (ret < 0) {
717		printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
718		       ret);
719		goto err1;
720	}
721	inline_bytes -= ret;
722
723	/* Send any reply-list data and update resp reply-list */
724	ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
725				rqstp, vec);
726	if (ret < 0) {
727		printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
728		       ret);
729		goto err1;
730	}
731	inline_bytes -= ret;
732
733	ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
734			 inline_bytes);
735	svc_rdma_put_req_map(vec);
736	dprintk("svcrdma: send_reply returns %d\n", ret);
737	return ret;
738
739 err1:
740	put_page(res_page);
741 err0:
742	svc_rdma_put_req_map(vec);
743	svc_rdma_put_context(ctxt, 0);
744	return ret;
745}
  1/*
  2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  3 *
  4 * This software is available to you under a choice of one of two
  5 * licenses.  You may choose to be licensed under the terms of the GNU
  6 * General Public License (GPL) Version 2, available from the file
  7 * COPYING in the main directory of this source tree, or the BSD-type
  8 * license below:
  9 *
 10 * Redistribution and use in source and binary forms, with or without
 11 * modification, are permitted provided that the following conditions
 12 * are met:
 13 *
 14 *      Redistributions of source code must retain the above copyright
 15 *      notice, this list of conditions and the following disclaimer.
 16 *
 17 *      Redistributions in binary form must reproduce the above
 18 *      copyright notice, this list of conditions and the following
 19 *      disclaimer in the documentation and/or other materials provided
 20 *      with the distribution.
 21 *
 22 *      Neither the name of the Network Appliance, Inc. nor the names of
 23 *      its contributors may be used to endorse or promote products
 24 *      derived from this software without specific prior written
 25 *      permission.
 26 *
 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 38 *
 39 * Author: Tom Tucker <tom@opengridcomputing.com>
 40 */
 41
 42#include <linux/sunrpc/debug.h>
 43#include <linux/sunrpc/rpc_rdma.h>
 44#include <linux/spinlock.h>
 45#include <asm/unaligned.h>
 46#include <rdma/ib_verbs.h>
 47#include <rdma/rdma_cm.h>
 48#include <linux/sunrpc/svc_rdma.h>
 49
 50#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 51
 52/* Encode an XDR as an array of IB SGE
 53 *
 54 * Assumptions:
 55 * - head[0] is physically contiguous.
 56 * - tail[0] is physically contiguous.
 57 * - pages[] is not physically or virtually contiguous and consists of
 58 *   PAGE_SIZE elements.
 59 *
 60 * Output:
 61 * SGE[0]              reserved for RCPRDMA header
 62 * SGE[1]              data from xdr->head[]
 63 * SGE[2..sge_count-2] data from xdr->pages[]
 64 * SGE[sge_count-1]    data from xdr->tail.
 65 *
 66 * The max SGE we need is the length of the XDR / pagesize + one for
 67 * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES
 68 * reserves a page for both the request and the reply header, and this
 69 * array is only concerned with the reply we are assured that we have
 70 * on extra page for the RPCRMDA header.
 71 */
 72static int fast_reg_xdr(struct svcxprt_rdma *xprt,
 73			struct xdr_buf *xdr,
 74			struct svc_rdma_req_map *vec)
 75{
 76	int sge_no;
 77	u32 sge_bytes;
 78	u32 page_bytes;
 79	u32 page_off;
 80	int page_no = 0;
 81	u8 *frva;
 82	struct svc_rdma_fastreg_mr *frmr;
 83
 84	frmr = svc_rdma_get_frmr(xprt);
 85	if (IS_ERR(frmr))
 86		return -ENOMEM;
 87	vec->frmr = frmr;
 88
 89	/* Skip the RPCRDMA header */
 90	sge_no = 1;
 91
 92	/* Map the head. */
 93	frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
 94	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
 95	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
 96	vec->count = 2;
 97	sge_no++;
 98
 99	/* Map the XDR head */
100	frmr->kva = frva;
101	frmr->direction = DMA_TO_DEVICE;
102	frmr->access_flags = 0;
103	frmr->map_len = PAGE_SIZE;
104	frmr->page_list_len = 1;
105	page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
106	frmr->page_list->page_list[page_no] =
107		ib_dma_map_page(xprt->sc_cm_id->device,
108				virt_to_page(xdr->head[0].iov_base),
109				page_off,
110				PAGE_SIZE - page_off,
111				DMA_TO_DEVICE);
112	if (ib_dma_mapping_error(xprt->sc_cm_id->device,
113				 frmr->page_list->page_list[page_no]))
114		goto fatal_err;
115	atomic_inc(&xprt->sc_dma_used);
116
117	/* Map the XDR page list */
118	page_off = xdr->page_base;
119	page_bytes = xdr->page_len + page_off;
120	if (!page_bytes)
121		goto encode_tail;
122
123	/* Map the pages */
124	vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
125	vec->sge[sge_no].iov_len = page_bytes;
126	sge_no++;
127	while (page_bytes) {
128		struct page *page;
129
130		page = xdr->pages[page_no++];
131		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
132		page_bytes -= sge_bytes;
133
134		frmr->page_list->page_list[page_no] =
135			ib_dma_map_page(xprt->sc_cm_id->device,
136					page, page_off,
137					sge_bytes, DMA_TO_DEVICE);
138		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
139					 frmr->page_list->page_list[page_no]))
140			goto fatal_err;
141
142		atomic_inc(&xprt->sc_dma_used);
143		page_off = 0; /* reset for next time through loop */
144		frmr->map_len += PAGE_SIZE;
145		frmr->page_list_len++;
146	}
147	vec->count++;
148
149 encode_tail:
150	/* Map tail */
151	if (0 == xdr->tail[0].iov_len)
152		goto done;
153
154	vec->count++;
155	vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
156
157	if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
158	    ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
159		/*
160		 * If head and tail use the same page, we don't need
161		 * to map it again.
162		 */
163		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
164	} else {
165		void *va;
166
167		/* Map another page for the tail */
168		page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
169		va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
170		vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
171
172		frmr->page_list->page_list[page_no] =
173		    ib_dma_map_page(xprt->sc_cm_id->device, virt_to_page(va),
174				    page_off,
175				    PAGE_SIZE,
176				    DMA_TO_DEVICE);
177		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
178					 frmr->page_list->page_list[page_no]))
179			goto fatal_err;
180		atomic_inc(&xprt->sc_dma_used);
181		frmr->map_len += PAGE_SIZE;
182		frmr->page_list_len++;
183	}
184
185 done:
186	if (svc_rdma_fastreg(xprt, frmr))
187		goto fatal_err;
188
189	return 0;
190
191 fatal_err:
192	printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
193	vec->frmr = NULL;
194	svc_rdma_put_frmr(xprt, frmr);
195	return -EIO;
196}
197
198static int map_xdr(struct svcxprt_rdma *xprt,
199		   struct xdr_buf *xdr,
200		   struct svc_rdma_req_map *vec)
201{
202	int sge_no;
203	u32 sge_bytes;
204	u32 page_bytes;
205	u32 page_off;
206	int page_no;
207
208	BUG_ON(xdr->len !=
209	       (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
210
211	if (xprt->sc_frmr_pg_list_len)
212		return fast_reg_xdr(xprt, xdr, vec);
213
214	/* Skip the first sge, this is for the RPCRDMA header */
215	sge_no = 1;
216
217	/* Head SGE */
218	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
219	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
220	sge_no++;
221
222	/* pages SGE */
223	page_no = 0;
224	page_bytes = xdr->page_len;
225	page_off = xdr->page_base;
226	while (page_bytes) {
227		vec->sge[sge_no].iov_base =
228			page_address(xdr->pages[page_no]) + page_off;
229		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
230		page_bytes -= sge_bytes;
231		vec->sge[sge_no].iov_len = sge_bytes;
232
233		sge_no++;
234		page_no++;
235		page_off = 0; /* reset for next time through loop */
236	}
237
238	/* Tail SGE */
239	if (xdr->tail[0].iov_len) {
240		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
241		vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
242		sge_no++;
243	}
244
245	dprintk("svcrdma: map_xdr: sge_no %d page_no %d "
246		"page_base %u page_len %u head_len %zu tail_len %zu\n",
247		sge_no, page_no, xdr->page_base, xdr->page_len,
248		xdr->head[0].iov_len, xdr->tail[0].iov_len);
249
250	vec->count = sge_no;
251	return 0;
252}
253
254static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
255			      struct xdr_buf *xdr,
256			      u32 xdr_off, size_t len, int dir)
257{
258	struct page *page;
259	dma_addr_t dma_addr;
260	if (xdr_off < xdr->head[0].iov_len) {
261		/* This offset is in the head */
262		xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
263		page = virt_to_page(xdr->head[0].iov_base);
264	} else {
265		xdr_off -= xdr->head[0].iov_len;
266		if (xdr_off < xdr->page_len) {
267			/* This offset is in the page list */
268			page = xdr->pages[xdr_off >> PAGE_SHIFT];
269			xdr_off &= ~PAGE_MASK;
270		} else {
271			/* This offset is in the tail */
272			xdr_off -= xdr->page_len;
273			xdr_off += (unsigned long)
274				xdr->tail[0].iov_base & ~PAGE_MASK;
275			page = virt_to_page(xdr->tail[0].iov_base);
276		}
277	}
278	dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
279				   min_t(size_t, PAGE_SIZE, len), dir);
280	return dma_addr;
281}
282
283/* Assumptions:
284 * - We are using FRMR
285 *     - or -
286 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
287 */
288static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
289		      u32 rmr, u64 to,
290		      u32 xdr_off, int write_len,
291		      struct svc_rdma_req_map *vec)
292{
293	struct ib_send_wr write_wr;
294	struct ib_sge *sge;
295	int xdr_sge_no;
296	int sge_no;
297	int sge_bytes;
298	int sge_off;
299	int bc;
300	struct svc_rdma_op_ctxt *ctxt;
301
302	BUG_ON(vec->count > RPCSVC_MAXPAGES);
303	dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
304		"write_len=%d, vec->sge=%p, vec->count=%lu\n",
305		rmr, (unsigned long long)to, xdr_off,
306		write_len, vec->sge, vec->count);
307
308	ctxt = svc_rdma_get_context(xprt);
309	ctxt->direction = DMA_TO_DEVICE;
310	sge = ctxt->sge;
311
312	/* Find the SGE associated with xdr_off */
313	for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
314	     xdr_sge_no++) {
315		if (vec->sge[xdr_sge_no].iov_len > bc)
316			break;
317		bc -= vec->sge[xdr_sge_no].iov_len;
318	}
319
320	sge_off = bc;
321	bc = write_len;
322	sge_no = 0;
323
324	/* Copy the remaining SGE */
325	while (bc != 0) {
326		sge_bytes = min_t(size_t,
327			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);
328		sge[sge_no].length = sge_bytes;
329		if (!vec->frmr) {
330			sge[sge_no].addr =
331				dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
332					    sge_bytes, DMA_TO_DEVICE);
333			xdr_off += sge_bytes;
334			if (ib_dma_mapping_error(xprt->sc_cm_id->device,
335						 sge[sge_no].addr))
336				goto err;
337			atomic_inc(&xprt->sc_dma_used);
338			sge[sge_no].lkey = xprt->sc_dma_lkey;
339		} else {
340			sge[sge_no].addr = (unsigned long)
341				vec->sge[xdr_sge_no].iov_base + sge_off;
342			sge[sge_no].lkey = vec->frmr->mr->lkey;
343		}
344		ctxt->count++;
345		ctxt->frmr = vec->frmr;
346		sge_off = 0;
347		sge_no++;
348		xdr_sge_no++;
349		BUG_ON(xdr_sge_no > vec->count);
350		bc -= sge_bytes;
351	}
352
353	/* Prepare WRITE WR */
354	memset(&write_wr, 0, sizeof write_wr);
355	ctxt->wr_op = IB_WR_RDMA_WRITE;
356	write_wr.wr_id = (unsigned long)ctxt;
357	write_wr.sg_list = &sge[0];
358	write_wr.num_sge = sge_no;
359	write_wr.opcode = IB_WR_RDMA_WRITE;
360	write_wr.send_flags = IB_SEND_SIGNALED;
361	write_wr.wr.rdma.rkey = rmr;
362	write_wr.wr.rdma.remote_addr = to;
363
364	/* Post It */
365	atomic_inc(&rdma_stat_write);
366	if (svc_rdma_send(xprt, &write_wr))
367		goto err;
368	return 0;
369 err:
370	svc_rdma_unmap_dma(ctxt);
371	svc_rdma_put_frmr(xprt, vec->frmr);
372	svc_rdma_put_context(ctxt, 0);
373	/* Fatal error, close transport */
374	return -EIO;
375}
376
377static int send_write_chunks(struct svcxprt_rdma *xprt,
378			     struct rpcrdma_msg *rdma_argp,
379			     struct rpcrdma_msg *rdma_resp,
380			     struct svc_rqst *rqstp,
381			     struct svc_rdma_req_map *vec)
382{
383	u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
384	int write_len;
385	int max_write;
386	u32 xdr_off;
387	int chunk_off;
388	int chunk_no;
389	struct rpcrdma_write_array *arg_ary;
390	struct rpcrdma_write_array *res_ary;
391	int ret;
392
393	arg_ary = svc_rdma_get_write_array(rdma_argp);
394	if (!arg_ary)
395		return 0;
396	res_ary = (struct rpcrdma_write_array *)
397		&rdma_resp->rm_body.rm_chunks[1];
398
399	if (vec->frmr)
400		max_write = vec->frmr->map_len;
401	else
402		max_write = xprt->sc_max_sge * PAGE_SIZE;
403
404	/* Write chunks start at the pagelist */
405	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
406	     xfer_len && chunk_no < arg_ary->wc_nchunks;
407	     chunk_no++) {
408		struct rpcrdma_segment *arg_ch;
409		u64 rs_offset;
410
411		arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
412		write_len = min(xfer_len, arg_ch->rs_length);
413
414		/* Prepare the response chunk given the length actually
415		 * written */
416		rs_offset = get_unaligned(&(arg_ch->rs_offset));
417		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
418					    arg_ch->rs_handle,
419					    rs_offset,
420					    write_len);
421		chunk_off = 0;
422		while (write_len) {
423			int this_write;
424			this_write = min(write_len, max_write);
425			ret = send_write(xprt, rqstp,
426					 arg_ch->rs_handle,
427					 rs_offset + chunk_off,
428					 xdr_off,
429					 this_write,
430					 vec);
431			if (ret) {
432				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
433					ret);
434				return -EIO;
435			}
436			chunk_off += this_write;
437			xdr_off += this_write;
438			xfer_len -= this_write;
439			write_len -= this_write;
440		}
441	}
442	/* Update the req with the number of chunks actually used */
443	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
444
445	return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
446}
447
448static int send_reply_chunks(struct svcxprt_rdma *xprt,
449			     struct rpcrdma_msg *rdma_argp,
450			     struct rpcrdma_msg *rdma_resp,
451			     struct svc_rqst *rqstp,
452			     struct svc_rdma_req_map *vec)
453{
454	u32 xfer_len = rqstp->rq_res.len;
455	int write_len;
456	int max_write;
457	u32 xdr_off;
458	int chunk_no;
459	int chunk_off;
 
460	struct rpcrdma_segment *ch;
461	struct rpcrdma_write_array *arg_ary;
462	struct rpcrdma_write_array *res_ary;
463	int ret;
464
465	arg_ary = svc_rdma_get_reply_array(rdma_argp);
466	if (!arg_ary)
467		return 0;
468	/* XXX: need to fix when reply lists occur with read-list and or
469	 * write-list */
470	res_ary = (struct rpcrdma_write_array *)
471		&rdma_resp->rm_body.rm_chunks[2];
472
473	if (vec->frmr)
474		max_write = vec->frmr->map_len;
475	else
476		max_write = xprt->sc_max_sge * PAGE_SIZE;
477
478	/* xdr offset starts at RPC message */
 
479	for (xdr_off = 0, chunk_no = 0;
480	     xfer_len && chunk_no < arg_ary->wc_nchunks;
481	     chunk_no++) {
482		u64 rs_offset;
483		ch = &arg_ary->wc_array[chunk_no].wc_target;
484		write_len = min(xfer_len, ch->rs_length);
485
486		/* Prepare the reply chunk given the length actually
487		 * written */
488		rs_offset = get_unaligned(&(ch->rs_offset));
489		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
490					    ch->rs_handle, rs_offset,
491					    write_len);
492		chunk_off = 0;
493		while (write_len) {
494			int this_write;
495
496			this_write = min(write_len, max_write);
497			ret = send_write(xprt, rqstp,
498					 ch->rs_handle,
499					 rs_offset + chunk_off,
500					 xdr_off,
501					 this_write,
502					 vec);
503			if (ret) {
504				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
505					ret);
506				return -EIO;
507			}
508			chunk_off += this_write;
509			xdr_off += this_write;
510			xfer_len -= this_write;
511			write_len -= this_write;
512		}
513	}
514	/* Update the req with the number of chunks actually used */
515	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
516
517	return rqstp->rq_res.len;
518}
519
520/* This function prepares the portion of the RPCRDMA message to be
521 * sent in the RDMA_SEND. This function is called after data sent via
522 * RDMA has already been transmitted. There are three cases:
523 * - The RPCRDMA header, RPC header, and payload are all sent in a
524 *   single RDMA_SEND. This is the "inline" case.
525 * - The RPCRDMA header and some portion of the RPC header and data
526 *   are sent via this RDMA_SEND and another portion of the data is
527 *   sent via RDMA.
528 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
529 *   header and data are all transmitted via RDMA.
530 * In all three cases, this function prepares the RPCRDMA header in
531 * sge[0], the 'type' parameter indicates the type to place in the
532 * RPCRDMA header, and the 'byte_count' field indicates how much of
533 * the XDR to include in this RDMA_SEND. NB: The offset of the payload
534 * to send is zero in the XDR.
535 */
536static int send_reply(struct svcxprt_rdma *rdma,
537		      struct svc_rqst *rqstp,
538		      struct page *page,
539		      struct rpcrdma_msg *rdma_resp,
540		      struct svc_rdma_op_ctxt *ctxt,
541		      struct svc_rdma_req_map *vec,
542		      int byte_count)
543{
544	struct ib_send_wr send_wr;
545	struct ib_send_wr inv_wr;
546	int sge_no;
547	int sge_bytes;
548	int page_no;
549	int ret;
550
551	/* Post a recv buffer to handle another request. */
552	ret = svc_rdma_post_recv(rdma);
553	if (ret) {
554		printk(KERN_INFO
555		       "svcrdma: could not post a receive buffer, err=%d."
556		       "Closing transport %p.\n", ret, rdma);
557		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
558		svc_rdma_put_frmr(rdma, vec->frmr);
559		svc_rdma_put_context(ctxt, 0);
560		return -ENOTCONN;
561	}
562
563	/* Prepare the context */
564	ctxt->pages[0] = page;
565	ctxt->count = 1;
566	ctxt->frmr = vec->frmr;
567	if (vec->frmr)
568		set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
569	else
570		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
571
572	/* Prepare the SGE for the RPCRDMA Header */
573	ctxt->sge[0].lkey = rdma->sc_dma_lkey;
574	ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
575	ctxt->sge[0].addr =
576	    ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
577			    ctxt->sge[0].length, DMA_TO_DEVICE);
578	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
579		goto err;
580	atomic_inc(&rdma->sc_dma_used);
581
582	ctxt->direction = DMA_TO_DEVICE;
583
584	/* Map the payload indicated by 'byte_count' */
585	for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
586		int xdr_off = 0;
587		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
588		byte_count -= sge_bytes;
589		if (!vec->frmr) {
590			ctxt->sge[sge_no].addr =
591				dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
592					    sge_bytes, DMA_TO_DEVICE);
593			xdr_off += sge_bytes;
594			if (ib_dma_mapping_error(rdma->sc_cm_id->device,
595						 ctxt->sge[sge_no].addr))
596				goto err;
597			atomic_inc(&rdma->sc_dma_used);
598			ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
599		} else {
600			ctxt->sge[sge_no].addr = (unsigned long)
601				vec->sge[sge_no].iov_base;
602			ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
603		}
604		ctxt->sge[sge_no].length = sge_bytes;
605	}
606	BUG_ON(byte_count != 0);
607
608	/* Save all respages in the ctxt and remove them from the
609	 * respages array. They are our pages until the I/O
610	 * completes.
611	 */
612	for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
613		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
614		ctxt->count++;
615		rqstp->rq_respages[page_no] = NULL;
616		/*
617		 * If there are more pages than SGE, terminate SGE
618		 * list so that svc_rdma_unmap_dma doesn't attempt to
619		 * unmap garbage.
620		 */
621		if (page_no+1 >= sge_no)
622			ctxt->sge[page_no+1].length = 0;
623	}
624	BUG_ON(sge_no > rdma->sc_max_sge);
625	memset(&send_wr, 0, sizeof send_wr);
626	ctxt->wr_op = IB_WR_SEND;
627	send_wr.wr_id = (unsigned long)ctxt;
628	send_wr.sg_list = ctxt->sge;
629	send_wr.num_sge = sge_no;
630	send_wr.opcode = IB_WR_SEND;
631	send_wr.send_flags =  IB_SEND_SIGNALED;
632	if (vec->frmr) {
633		/* Prepare INVALIDATE WR */
634		memset(&inv_wr, 0, sizeof inv_wr);
635		inv_wr.opcode = IB_WR_LOCAL_INV;
636		inv_wr.send_flags = IB_SEND_SIGNALED;
637		inv_wr.ex.invalidate_rkey =
638			vec->frmr->mr->lkey;
639		send_wr.next = &inv_wr;
640	}
641
642	ret = svc_rdma_send(rdma, &send_wr);
643	if (ret)
644		goto err;
645
646	return 0;
647
648 err:
649	svc_rdma_unmap_dma(ctxt);
650	svc_rdma_put_frmr(rdma, vec->frmr);
651	svc_rdma_put_context(ctxt, 1);
652	return -EIO;
653}
654
655void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
656{
657}
658
659/*
660 * Return the start of an xdr buffer.
661 */
662static void *xdr_start(struct xdr_buf *xdr)
663{
664	return xdr->head[0].iov_base -
665		(xdr->len -
666		 xdr->page_len -
667		 xdr->tail[0].iov_len -
668		 xdr->head[0].iov_len);
669}
670
671int svc_rdma_sendto(struct svc_rqst *rqstp)
672{
673	struct svc_xprt *xprt = rqstp->rq_xprt;
674	struct svcxprt_rdma *rdma =
675		container_of(xprt, struct svcxprt_rdma, sc_xprt);
676	struct rpcrdma_msg *rdma_argp;
677	struct rpcrdma_msg *rdma_resp;
678	struct rpcrdma_write_array *reply_ary;
679	enum rpcrdma_proc reply_type;
680	int ret;
681	int inline_bytes;
682	struct page *res_page;
683	struct svc_rdma_op_ctxt *ctxt;
684	struct svc_rdma_req_map *vec;
685
686	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
687
688	/* Get the RDMA request header. */
689	rdma_argp = xdr_start(&rqstp->rq_arg);
690
691	/* Build an req vec for the XDR */
692	ctxt = svc_rdma_get_context(rdma);
693	ctxt->direction = DMA_TO_DEVICE;
694	vec = svc_rdma_get_req_map();
695	ret = map_xdr(rdma, &rqstp->rq_res, vec);
696	if (ret)
697		goto err0;
698	inline_bytes = rqstp->rq_res.len;
699
700	/* Create the RDMA response header */
701	res_page = svc_rdma_get_page();
702	rdma_resp = page_address(res_page);
703	reply_ary = svc_rdma_get_reply_array(rdma_argp);
704	if (reply_ary)
705		reply_type = RDMA_NOMSG;
706	else
707		reply_type = RDMA_MSG;
708	svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
709					 rdma_resp, reply_type);
710
711	/* Send any write-chunk data and build resp write-list */
712	ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
713				rqstp, vec);
714	if (ret < 0) {
715		printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
716		       ret);
717		goto err1;
718	}
719	inline_bytes -= ret;
720
721	/* Send any reply-list data and update resp reply-list */
722	ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
723				rqstp, vec);
724	if (ret < 0) {
725		printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
726		       ret);
727		goto err1;
728	}
729	inline_bytes -= ret;
730
731	ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
732			 inline_bytes);
733	svc_rdma_put_req_map(vec);
734	dprintk("svcrdma: send_reply returns %d\n", ret);
735	return ret;
736
737 err1:
738	put_page(res_page);
739 err0:
740	svc_rdma_put_req_map(vec);
741	svc_rdma_put_context(ctxt, 0);
742	return ret;
743}