Loading...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 | // SPDX-License-Identifier: GPL-2.0 /* * Copyright (c) 2015-2020, Oracle and/or its affiliates. * * Support for reverse-direction RPCs on RPC/RDMA. */ #include <linux/sunrpc/xprt.h> #include <linux/sunrpc/svc.h> #include <linux/sunrpc/svc_xprt.h> #include <linux/sunrpc/svc_rdma.h> #include "xprt_rdma.h" #include <trace/events/rpcrdma.h> #undef RPCRDMA_BACKCHANNEL_DEBUG /** * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests * @xprt: transport associated with these backchannel resources * @reqs: number of concurrent incoming requests to expect * * Returns 0 on success; otherwise a negative errno */ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1; trace_xprtrdma_cb_setup(r_xprt, reqs); return 0; } /** * xprt_rdma_bc_maxpayload - Return maximum backchannel message size * @xprt: transport * * Returns maximum size, in bytes, of a backchannel message */ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_ep *ep = r_xprt->rx_ep; size_t maxmsg; maxmsg = min_t(unsigned int, ep->re_inline_send, ep->re_inline_recv); maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE); return maxmsg - RPCRDMA_HDRLEN_MIN; } unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt) { return RPCRDMA_BACKWARD_WRS >> 1; } static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); __be32 *p; rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf), rqst); p = xdr_reserve_space(&req->rl_stream, 28); if (unlikely(!p)) return -EIO; *p++ = rqst->rq_xid; *p++ = rpcrdma_version; *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); *p++ = rdma_msg; *p++ = xdr_zero; *p++ = xdr_zero; *p = xdr_zero; if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, &rqst->rq_snd_buf, rpcrdma_noch_pullup)) return -EIO; trace_xprtrdma_cb_reply(r_xprt, rqst); return 0; } /** * xprt_rdma_bc_send_reply - marshal and send a backchannel reply * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf * * Caller holds the transport's write lock. * * Returns: * %0 if the RPC message has been sent * %-ENOTCONN if the caller should reconnect and call again * %-EIO if a permanent error occurred and the request was not * sent. Do not try to send this message again. */ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) { struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); int rc; if (!xprt_connected(xprt)) return -ENOTCONN; if (!xprt_request_get_cong(xprt, rqst)) return -EBADSLT; rc = rpcrdma_bc_marshal_reply(rqst); if (rc < 0) goto failed_marshal; if (frwr_send(r_xprt, req)) goto drop_connection; return 0; failed_marshal: if (rc != -ENOTCONN) return rc; drop_connection: xprt_rdma_close(xprt); return -ENOTCONN; } /** * xprt_rdma_bc_destroy - Release resources for handling backchannel requests * @xprt: transport associated with these backchannel resources * @reqs: number of incoming requests to destroy; ignored */ void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) { struct rpc_rqst *rqst, *tmp; spin_lock(&xprt->bc_pa_lock); list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { list_del(&rqst->rq_bc_pa_list); spin_unlock(&xprt->bc_pa_lock); rpcrdma_req_destroy(rpcr_to_rdmar(rqst)); spin_lock(&xprt->bc_pa_lock); } spin_unlock(&xprt->bc_pa_lock); } /** * xprt_rdma_bc_free_rqst - Release a backchannel rqst * @rqst: request to release */ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_rep *rep = req->rl_reply; struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); rpcrdma_rep_put(&r_xprt->rx_buf, rep); req->rl_reply = NULL; spin_lock(&xprt->bc_pa_lock); list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); spin_unlock(&xprt->bc_pa_lock); xprt_put(xprt); } static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt) { struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct rpcrdma_req *req; struct rpc_rqst *rqst; size_t size; spin_lock(&xprt->bc_pa_lock); rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst, rq_bc_pa_list); if (!rqst) goto create_req; list_del(&rqst->rq_bc_pa_list); spin_unlock(&xprt->bc_pa_lock); return rqst; create_req: spin_unlock(&xprt->bc_pa_lock); /* Set a limit to prevent a remote from overrunning our resources. */ if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS) return NULL; size = min_t(size_t, r_xprt->rx_ep->re_inline_recv, PAGE_SIZE); req = rpcrdma_req_create(r_xprt, size); if (!req) return NULL; if (rpcrdma_req_setup(r_xprt, req)) { rpcrdma_req_destroy(req); return NULL; } xprt->bc_alloc_count++; rqst = &req->rl_slot; rqst->rq_xprt = xprt; __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size); return rqst; } /** * rpcrdma_bc_receive_call - Handle a reverse-direction Call * @r_xprt: transport receiving the call * @rep: receive buffer containing the call * * Operational assumptions: * o Backchannel credits are ignored, just as the NFS server * forechannel currently does * o The ULP manages a replay cache (eg, NFSv4.1 sessions). * No replay detection is done at the transport level */ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) { struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct svc_serv *bc_serv; struct rpcrdma_req *req; struct rpc_rqst *rqst; struct xdr_buf *buf; size_t size; __be32 *p; p = xdr_inline_decode(&rep->rr_stream, 0); size = xdr_stream_remaining(&rep->rr_stream); #ifdef RPCRDMA_BACKCHANNEL_DEBUG pr_info("RPC: %s: callback XID %08x, length=%u\n", __func__, be32_to_cpup(p), size); pr_info("RPC: %s: %*ph\n", __func__, size, p); #endif rqst = rpcrdma_bc_rqst_get(r_xprt); if (!rqst) goto out_overflow; rqst->rq_reply_bytes_recvd = 0; rqst->rq_xid = *p; rqst->rq_private_buf.len = size; buf = &rqst->rq_rcv_buf; memset(buf, 0, sizeof(*buf)); buf->head[0].iov_base = p; buf->head[0].iov_len = size; buf->len = size; /* The receive buffer has to be hooked to the rpcrdma_req * so that it is not released while the req is pointing * to its buffer, and so that it can be reposted after * the Upper Layer is done decoding it. */ req = rpcr_to_rdmar(rqst); req->rl_reply = rep; trace_xprtrdma_cb_call(r_xprt, rqst); /* Queue rqst for ULP's callback service */ bc_serv = xprt->bc_serv; xprt_get(xprt); lwq_enqueue(&rqst->rq_bc_list, &bc_serv->sv_cb_list); svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]); r_xprt->rx_stats.bcall_count++; return; out_overflow: pr_warn("RPC/RDMA backchannel overflow\n"); xprt_force_disconnect(xprt); /* This receive buffer gets reposted automatically * when the connection is re-established. */ return; } |