Loading...
1// SPDX-License-Identifier: GPL-2.0-or-later
2/*
3 * RDMA Transport Layer
4 *
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8 */
9
10#undef pr_fmt
11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13#include <linux/module.h>
14
15#include "rtrs-srv.h"
16#include "rtrs-log.h"
17#include <rdma/ib_cm.h>
18#include <rdma/ib_verbs.h>
19#include "rtrs-srv-trace.h"
20
21MODULE_DESCRIPTION("RDMA Transport Server");
22MODULE_LICENSE("GPL");
23
24/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
25#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
26#define DEFAULT_SESS_QUEUE_DEPTH 512
27#define MAX_HDR_SIZE PAGE_SIZE
28
29static struct rtrs_rdma_dev_pd dev_pd;
30const struct class rtrs_dev_class = {
31 .name = "rtrs-server",
32};
33static struct rtrs_srv_ib_ctx ib_ctx;
34
35static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
36static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
37
38static bool always_invalidate = true;
39module_param(always_invalidate, bool, 0444);
40MODULE_PARM_DESC(always_invalidate,
41 "Invalidate memory registration for contiguous memory regions before accessing.");
42
43module_param_named(max_chunk_size, max_chunk_size, int, 0444);
44MODULE_PARM_DESC(max_chunk_size,
45 "Max size for each IO request, when change the unit is in byte (default: "
46 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
47
48module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
49MODULE_PARM_DESC(sess_queue_depth,
50 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
51 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
52 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
53
54static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
55
56static struct workqueue_struct *rtrs_wq;
57
58static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
59{
60 return container_of(c, struct rtrs_srv_con, c);
61}
62
63static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path,
64 enum rtrs_srv_state new_state)
65{
66 enum rtrs_srv_state old_state;
67 bool changed = false;
68 unsigned long flags;
69
70 spin_lock_irqsave(&srv_path->state_lock, flags);
71 old_state = srv_path->state;
72 switch (new_state) {
73 case RTRS_SRV_CONNECTED:
74 if (old_state == RTRS_SRV_CONNECTING)
75 changed = true;
76 break;
77 case RTRS_SRV_CLOSING:
78 if (old_state == RTRS_SRV_CONNECTING ||
79 old_state == RTRS_SRV_CONNECTED)
80 changed = true;
81 break;
82 case RTRS_SRV_CLOSED:
83 if (old_state == RTRS_SRV_CLOSING)
84 changed = true;
85 break;
86 default:
87 break;
88 }
89 if (changed)
90 srv_path->state = new_state;
91 spin_unlock_irqrestore(&srv_path->state_lock, flags);
92
93 return changed;
94}
95
96static void free_id(struct rtrs_srv_op *id)
97{
98 if (!id)
99 return;
100 kfree(id);
101}
102
103static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path)
104{
105 struct rtrs_srv_sess *srv = srv_path->srv;
106 int i;
107
108 if (srv_path->ops_ids) {
109 for (i = 0; i < srv->queue_depth; i++)
110 free_id(srv_path->ops_ids[i]);
111 kfree(srv_path->ops_ids);
112 srv_path->ops_ids = NULL;
113 }
114}
115
116static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
117
118static struct ib_cqe io_comp_cqe = {
119 .done = rtrs_srv_rdma_done
120};
121
122static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref)
123{
124 struct rtrs_srv_path *srv_path = container_of(ref,
125 struct rtrs_srv_path,
126 ids_inflight_ref);
127
128 percpu_ref_exit(&srv_path->ids_inflight_ref);
129 complete(&srv_path->complete_done);
130}
131
132static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path)
133{
134 struct rtrs_srv_sess *srv = srv_path->srv;
135 struct rtrs_srv_op *id;
136 int i, ret;
137
138 srv_path->ops_ids = kcalloc(srv->queue_depth,
139 sizeof(*srv_path->ops_ids),
140 GFP_KERNEL);
141 if (!srv_path->ops_ids)
142 goto err;
143
144 for (i = 0; i < srv->queue_depth; ++i) {
145 id = kzalloc(sizeof(*id), GFP_KERNEL);
146 if (!id)
147 goto err;
148
149 srv_path->ops_ids[i] = id;
150 }
151
152 ret = percpu_ref_init(&srv_path->ids_inflight_ref,
153 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL);
154 if (ret) {
155 pr_err("Percpu reference init failed\n");
156 goto err;
157 }
158 init_completion(&srv_path->complete_done);
159
160 return 0;
161
162err:
163 rtrs_srv_free_ops_ids(srv_path);
164 return -ENOMEM;
165}
166
167static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path)
168{
169 percpu_ref_get(&srv_path->ids_inflight_ref);
170}
171
172static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path)
173{
174 percpu_ref_put(&srv_path->ids_inflight_ref);
175}
176
177static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
178{
179 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
180 struct rtrs_path *s = con->c.path;
181 struct rtrs_srv_path *srv_path = to_srv_path(s);
182
183 if (wc->status != IB_WC_SUCCESS) {
184 rtrs_err(s, "REG MR failed: %s\n",
185 ib_wc_status_msg(wc->status));
186 close_path(srv_path);
187 return;
188 }
189}
190
191static struct ib_cqe local_reg_cqe = {
192 .done = rtrs_srv_reg_mr_done
193};
194
195static int rdma_write_sg(struct rtrs_srv_op *id)
196{
197 struct rtrs_path *s = id->con->c.path;
198 struct rtrs_srv_path *srv_path = to_srv_path(s);
199 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id];
200 struct rtrs_srv_mr *srv_mr;
201 struct ib_send_wr inv_wr;
202 struct ib_rdma_wr imm_wr;
203 struct ib_rdma_wr *wr = NULL;
204 enum ib_send_flags flags;
205 size_t sg_cnt;
206 int err, offset;
207 bool need_inval;
208 u32 rkey = 0;
209 struct ib_reg_wr rwr;
210 struct ib_sge *plist;
211 struct ib_sge list;
212
213 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
214 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
215 if (sg_cnt != 1)
216 return -EINVAL;
217
218 offset = 0;
219
220 wr = &id->tx_wr;
221 plist = &id->tx_sg;
222 plist->addr = dma_addr + offset;
223 plist->length = le32_to_cpu(id->rd_msg->desc[0].len);
224
225 /* WR will fail with length error
226 * if this is 0
227 */
228 if (plist->length == 0) {
229 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
230 return -EINVAL;
231 }
232
233 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
234 offset += plist->length;
235
236 wr->wr.sg_list = plist;
237 wr->wr.num_sge = 1;
238 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr);
239 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key);
240 if (rkey == 0)
241 rkey = wr->rkey;
242 else
243 /* Only one key is actually used */
244 WARN_ON_ONCE(rkey != wr->rkey);
245
246 wr->wr.opcode = IB_WR_RDMA_WRITE;
247 wr->wr.wr_cqe = &io_comp_cqe;
248 wr->wr.ex.imm_data = 0;
249 wr->wr.send_flags = 0;
250
251 if (need_inval && always_invalidate) {
252 wr->wr.next = &rwr.wr;
253 rwr.wr.next = &inv_wr;
254 inv_wr.next = &imm_wr.wr;
255 } else if (always_invalidate) {
256 wr->wr.next = &rwr.wr;
257 rwr.wr.next = &imm_wr.wr;
258 } else if (need_inval) {
259 wr->wr.next = &inv_wr;
260 inv_wr.next = &imm_wr.wr;
261 } else {
262 wr->wr.next = &imm_wr.wr;
263 }
264 /*
265 * From time to time we have to post signaled sends,
266 * or send queue will fill up and only QP reset can help.
267 */
268 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ?
269 0 : IB_SEND_SIGNALED;
270
271 if (need_inval) {
272 inv_wr.sg_list = NULL;
273 inv_wr.num_sge = 0;
274 inv_wr.opcode = IB_WR_SEND_WITH_INV;
275 inv_wr.wr_cqe = &io_comp_cqe;
276 inv_wr.send_flags = 0;
277 inv_wr.ex.invalidate_rkey = rkey;
278 }
279
280 imm_wr.wr.next = NULL;
281 if (always_invalidate) {
282 struct rtrs_msg_rkey_rsp *msg;
283
284 srv_mr = &srv_path->mrs[id->msg_id];
285 rwr.wr.opcode = IB_WR_REG_MR;
286 rwr.wr.wr_cqe = &local_reg_cqe;
287 rwr.wr.num_sge = 0;
288 rwr.mr = srv_mr->mr;
289 rwr.wr.send_flags = 0;
290 rwr.key = srv_mr->mr->rkey;
291 rwr.access = (IB_ACCESS_LOCAL_WRITE |
292 IB_ACCESS_REMOTE_WRITE);
293 msg = srv_mr->iu->buf;
294 msg->buf_id = cpu_to_le16(id->msg_id);
295 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
296 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
297
298 list.addr = srv_mr->iu->dma_addr;
299 list.length = sizeof(*msg);
300 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
301 imm_wr.wr.sg_list = &list;
302 imm_wr.wr.num_sge = 1;
303 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
304 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
305 srv_mr->iu->dma_addr,
306 srv_mr->iu->size, DMA_TO_DEVICE);
307 } else {
308 imm_wr.wr.sg_list = NULL;
309 imm_wr.wr.num_sge = 0;
310 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
311 }
312 imm_wr.wr.send_flags = flags;
313 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
314 0, need_inval));
315
316 imm_wr.wr.wr_cqe = &io_comp_cqe;
317 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr,
318 offset, DMA_BIDIRECTIONAL);
319
320 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
321 if (err)
322 rtrs_err(s,
323 "Posting RDMA-Write-Request to QP failed, err: %d\n",
324 err);
325
326 return err;
327}
328
329/**
330 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
331 * requests or on successful WRITE request.
332 * @con: the connection to send back result
333 * @id: the id associated with the IO
334 * @errno: the error number of the IO.
335 *
336 * Return 0 on success, errno otherwise.
337 */
338static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
339 int errno)
340{
341 struct rtrs_path *s = con->c.path;
342 struct rtrs_srv_path *srv_path = to_srv_path(s);
343 struct ib_send_wr inv_wr, *wr = NULL;
344 struct ib_rdma_wr imm_wr;
345 struct ib_reg_wr rwr;
346 struct rtrs_srv_mr *srv_mr;
347 bool need_inval = false;
348 enum ib_send_flags flags;
349 u32 imm;
350 int err;
351
352 if (id->dir == READ) {
353 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
354 size_t sg_cnt;
355
356 need_inval = le16_to_cpu(rd_msg->flags) &
357 RTRS_MSG_NEED_INVAL_F;
358 sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
359
360 if (need_inval) {
361 if (sg_cnt) {
362 inv_wr.wr_cqe = &io_comp_cqe;
363 inv_wr.sg_list = NULL;
364 inv_wr.num_sge = 0;
365 inv_wr.opcode = IB_WR_SEND_WITH_INV;
366 inv_wr.send_flags = 0;
367 /* Only one key is actually used */
368 inv_wr.ex.invalidate_rkey =
369 le32_to_cpu(rd_msg->desc[0].key);
370 } else {
371 WARN_ON_ONCE(1);
372 need_inval = false;
373 }
374 }
375 }
376
377 trace_send_io_resp_imm(id, need_inval, always_invalidate, errno);
378
379 if (need_inval && always_invalidate) {
380 wr = &inv_wr;
381 inv_wr.next = &rwr.wr;
382 rwr.wr.next = &imm_wr.wr;
383 } else if (always_invalidate) {
384 wr = &rwr.wr;
385 rwr.wr.next = &imm_wr.wr;
386 } else if (need_inval) {
387 wr = &inv_wr;
388 inv_wr.next = &imm_wr.wr;
389 } else {
390 wr = &imm_wr.wr;
391 }
392 /*
393 * From time to time we have to post signalled sends,
394 * or send queue will fill up and only QP reset can help.
395 */
396 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ?
397 0 : IB_SEND_SIGNALED;
398 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
399 imm_wr.wr.next = NULL;
400 if (always_invalidate) {
401 struct ib_sge list;
402 struct rtrs_msg_rkey_rsp *msg;
403
404 srv_mr = &srv_path->mrs[id->msg_id];
405 rwr.wr.next = &imm_wr.wr;
406 rwr.wr.opcode = IB_WR_REG_MR;
407 rwr.wr.wr_cqe = &local_reg_cqe;
408 rwr.wr.num_sge = 0;
409 rwr.wr.send_flags = 0;
410 rwr.mr = srv_mr->mr;
411 rwr.key = srv_mr->mr->rkey;
412 rwr.access = (IB_ACCESS_LOCAL_WRITE |
413 IB_ACCESS_REMOTE_WRITE);
414 msg = srv_mr->iu->buf;
415 msg->buf_id = cpu_to_le16(id->msg_id);
416 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
417 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
418
419 list.addr = srv_mr->iu->dma_addr;
420 list.length = sizeof(*msg);
421 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey;
422 imm_wr.wr.sg_list = &list;
423 imm_wr.wr.num_sge = 1;
424 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
425 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
426 srv_mr->iu->dma_addr,
427 srv_mr->iu->size, DMA_TO_DEVICE);
428 } else {
429 imm_wr.wr.sg_list = NULL;
430 imm_wr.wr.num_sge = 0;
431 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
432 }
433 imm_wr.wr.send_flags = flags;
434 imm_wr.wr.wr_cqe = &io_comp_cqe;
435
436 imm_wr.wr.ex.imm_data = cpu_to_be32(imm);
437
438 err = ib_post_send(id->con->c.qp, wr, NULL);
439 if (err)
440 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
441 err);
442
443 return err;
444}
445
446void close_path(struct rtrs_srv_path *srv_path)
447{
448 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING))
449 queue_work(rtrs_wq, &srv_path->close_work);
450 WARN_ON(srv_path->state != RTRS_SRV_CLOSING);
451}
452
453static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
454{
455 switch (state) {
456 case RTRS_SRV_CONNECTING:
457 return "RTRS_SRV_CONNECTING";
458 case RTRS_SRV_CONNECTED:
459 return "RTRS_SRV_CONNECTED";
460 case RTRS_SRV_CLOSING:
461 return "RTRS_SRV_CLOSING";
462 case RTRS_SRV_CLOSED:
463 return "RTRS_SRV_CLOSED";
464 default:
465 return "UNKNOWN";
466 }
467}
468
469/**
470 * rtrs_srv_resp_rdma() - Finish an RDMA request
471 *
472 * @id: Internal RTRS operation identifier
473 * @status: Response Code sent to the other side for this operation.
474 * 0 = success, <=0 error
475 * Context: any
476 *
477 * Finish a RDMA operation. A message is sent to the client and the
478 * corresponding memory areas will be released.
479 */
480bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
481{
482 struct rtrs_srv_path *srv_path;
483 struct rtrs_srv_con *con;
484 struct rtrs_path *s;
485 int err;
486
487 if (WARN_ON(!id))
488 return true;
489
490 con = id->con;
491 s = con->c.path;
492 srv_path = to_srv_path(s);
493
494 id->status = status;
495
496 if (srv_path->state != RTRS_SRV_CONNECTED) {
497 rtrs_err_rl(s,
498 "Sending I/O response failed, server path %s is disconnected, path state %s\n",
499 kobject_name(&srv_path->kobj),
500 rtrs_srv_state_str(srv_path->state));
501 goto out;
502 }
503 if (always_invalidate) {
504 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id];
505
506 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
507 }
508 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) {
509 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n",
510 kobject_name(&srv_path->kobj),
511 con->c.cid);
512 atomic_add(1, &con->c.sq_wr_avail);
513 spin_lock(&con->rsp_wr_wait_lock);
514 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
515 spin_unlock(&con->rsp_wr_wait_lock);
516 return false;
517 }
518
519 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
520 err = send_io_resp_imm(con, id, status);
521 else
522 err = rdma_write_sg(id);
523
524 if (err) {
525 rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err,
526 kobject_name(&srv_path->kobj));
527 close_path(srv_path);
528 }
529out:
530 rtrs_srv_put_ops_ids(srv_path);
531 return true;
532}
533EXPORT_SYMBOL(rtrs_srv_resp_rdma);
534
535/**
536 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
537 * @srv: Session pointer
538 * @priv: The private pointer that is associated with the session.
539 */
540void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv)
541{
542 srv->priv = priv;
543}
544EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
545
546static void unmap_cont_bufs(struct rtrs_srv_path *srv_path)
547{
548 int i;
549
550 for (i = 0; i < srv_path->mrs_num; i++) {
551 struct rtrs_srv_mr *srv_mr;
552
553 srv_mr = &srv_path->mrs[i];
554
555 if (always_invalidate)
556 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1);
557
558 ib_dereg_mr(srv_mr->mr);
559 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl,
560 srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
561 sg_free_table(&srv_mr->sgt);
562 }
563 kfree(srv_path->mrs);
564}
565
566static int map_cont_bufs(struct rtrs_srv_path *srv_path)
567{
568 struct rtrs_srv_sess *srv = srv_path->srv;
569 struct rtrs_path *ss = &srv_path->s;
570 int i, err, mrs_num;
571 unsigned int chunk_bits;
572 int chunks_per_mr = 1;
573 struct ib_mr *mr;
574 struct sg_table *sgt;
575
576 /*
577 * Here we map queue_depth chunks to MR. Firstly we have to
578 * figure out how many chunks can we map per MR.
579 */
580 if (always_invalidate) {
581 /*
582 * in order to do invalidate for each chunks of memory, we needs
583 * more memory regions.
584 */
585 mrs_num = srv->queue_depth;
586 } else {
587 chunks_per_mr =
588 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
589 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
590 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
591 }
592
593 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL);
594 if (!srv_path->mrs)
595 return -ENOMEM;
596
597 for (srv_path->mrs_num = 0; srv_path->mrs_num < mrs_num;
598 srv_path->mrs_num++) {
599 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[srv_path->mrs_num];
600 struct scatterlist *s;
601 int nr, nr_sgt, chunks;
602
603 sgt = &srv_mr->sgt;
604 chunks = chunks_per_mr * srv_path->mrs_num;
605 if (!always_invalidate)
606 chunks_per_mr = min_t(int, chunks_per_mr,
607 srv->queue_depth - chunks);
608
609 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
610 if (err)
611 goto err;
612
613 for_each_sg(sgt->sgl, s, chunks_per_mr, i)
614 sg_set_page(s, srv->chunks[chunks + i],
615 max_chunk_size, 0);
616
617 nr_sgt = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl,
618 sgt->nents, DMA_BIDIRECTIONAL);
619 if (!nr_sgt) {
620 err = -EINVAL;
621 goto free_sg;
622 }
623 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
624 nr_sgt);
625 if (IS_ERR(mr)) {
626 err = PTR_ERR(mr);
627 goto unmap_sg;
628 }
629 nr = ib_map_mr_sg(mr, sgt->sgl, nr_sgt,
630 NULL, max_chunk_size);
631 if (nr != nr_sgt) {
632 err = nr < 0 ? nr : -EINVAL;
633 goto dereg_mr;
634 }
635
636 if (always_invalidate) {
637 srv_mr->iu = rtrs_iu_alloc(1,
638 sizeof(struct rtrs_msg_rkey_rsp),
639 GFP_KERNEL, srv_path->s.dev->ib_dev,
640 DMA_TO_DEVICE, rtrs_srv_rdma_done);
641 if (!srv_mr->iu) {
642 err = -ENOMEM;
643 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
644 goto dereg_mr;
645 }
646 }
647 /* Eventually dma addr for each chunk can be cached */
648 for_each_sg(sgt->sgl, s, nr_sgt, i)
649 srv_path->dma_addr[chunks + i] = sg_dma_address(s);
650
651 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
652 srv_mr->mr = mr;
653 }
654
655 chunk_bits = ilog2(srv->queue_depth - 1) + 1;
656 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
657
658 return 0;
659
660dereg_mr:
661 ib_dereg_mr(mr);
662unmap_sg:
663 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl,
664 sgt->nents, DMA_BIDIRECTIONAL);
665free_sg:
666 sg_free_table(sgt);
667err:
668 unmap_cont_bufs(srv_path);
669
670 return err;
671}
672
673static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
674{
675 close_path(to_srv_path(c->path));
676}
677
678static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path)
679{
680 rtrs_init_hb(&srv_path->s, &io_comp_cqe,
681 RTRS_HB_INTERVAL_MS,
682 RTRS_HB_MISSED_MAX,
683 rtrs_srv_hb_err_handler,
684 rtrs_wq);
685}
686
687static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path)
688{
689 rtrs_start_hb(&srv_path->s);
690}
691
692static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path)
693{
694 rtrs_stop_hb(&srv_path->s);
695}
696
697static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
698{
699 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
700 struct rtrs_path *s = con->c.path;
701 struct rtrs_srv_path *srv_path = to_srv_path(s);
702 struct rtrs_iu *iu;
703
704 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
705 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
706
707 if (wc->status != IB_WC_SUCCESS) {
708 rtrs_err(s, "Sess info response send failed: %s\n",
709 ib_wc_status_msg(wc->status));
710 close_path(srv_path);
711 return;
712 }
713 WARN_ON(wc->opcode != IB_WC_SEND);
714}
715
716static int rtrs_srv_path_up(struct rtrs_srv_path *srv_path)
717{
718 struct rtrs_srv_sess *srv = srv_path->srv;
719 struct rtrs_srv_ctx *ctx = srv->ctx;
720 int up, ret = 0;
721
722 mutex_lock(&srv->paths_ev_mutex);
723 up = ++srv->paths_up;
724 if (up == 1)
725 ret = ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
726 mutex_unlock(&srv->paths_ev_mutex);
727
728 /* Mark session as established */
729 if (!ret)
730 srv_path->established = true;
731
732 return ret;
733}
734
735static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path)
736{
737 struct rtrs_srv_sess *srv = srv_path->srv;
738 struct rtrs_srv_ctx *ctx = srv->ctx;
739
740 if (!srv_path->established)
741 return;
742
743 srv_path->established = false;
744 mutex_lock(&srv->paths_ev_mutex);
745 WARN_ON(!srv->paths_up);
746 if (--srv->paths_up == 0)
747 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
748 mutex_unlock(&srv->paths_ev_mutex);
749}
750
751static bool exist_pathname(struct rtrs_srv_ctx *ctx,
752 const char *pathname, const uuid_t *path_uuid)
753{
754 struct rtrs_srv_sess *srv;
755 struct rtrs_srv_path *srv_path;
756 bool found = false;
757
758 mutex_lock(&ctx->srv_mutex);
759 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
760 mutex_lock(&srv->paths_mutex);
761
762 /* when a client with same uuid and same sessname tried to add a path */
763 if (uuid_equal(&srv->paths_uuid, path_uuid)) {
764 mutex_unlock(&srv->paths_mutex);
765 continue;
766 }
767
768 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
769 if (strlen(srv_path->s.sessname) == strlen(pathname) &&
770 !strcmp(srv_path->s.sessname, pathname)) {
771 found = true;
772 break;
773 }
774 }
775 mutex_unlock(&srv->paths_mutex);
776 if (found)
777 break;
778 }
779 mutex_unlock(&ctx->srv_mutex);
780 return found;
781}
782
783static int post_recv_path(struct rtrs_srv_path *srv_path);
784static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno);
785
786static int process_info_req(struct rtrs_srv_con *con,
787 struct rtrs_msg_info_req *msg)
788{
789 struct rtrs_path *s = con->c.path;
790 struct rtrs_srv_path *srv_path = to_srv_path(s);
791 struct ib_send_wr *reg_wr = NULL;
792 struct rtrs_msg_info_rsp *rsp;
793 struct rtrs_iu *tx_iu;
794 struct ib_reg_wr *rwr;
795 int mri, err;
796 size_t tx_sz;
797
798 err = post_recv_path(srv_path);
799 if (err) {
800 rtrs_err(s, "post_recv_path(), err: %d\n", err);
801 return err;
802 }
803
804 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) {
805 rtrs_err(s, "pathname cannot contain / and .\n");
806 return -EINVAL;
807 }
808
809 if (exist_pathname(srv_path->srv->ctx,
810 msg->pathname, &srv_path->srv->paths_uuid)) {
811 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname);
812 return -EPERM;
813 }
814 strscpy(srv_path->s.sessname, msg->pathname,
815 sizeof(srv_path->s.sessname));
816
817 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL);
818 if (!rwr)
819 return -ENOMEM;
820
821 tx_sz = sizeof(*rsp);
822 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num;
823 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev,
824 DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
825 if (!tx_iu) {
826 err = -ENOMEM;
827 goto rwr_free;
828 }
829
830 rsp = tx_iu->buf;
831 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
832 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num);
833
834 for (mri = 0; mri < srv_path->mrs_num; mri++) {
835 struct ib_mr *mr = srv_path->mrs[mri].mr;
836
837 rsp->desc[mri].addr = cpu_to_le64(mr->iova);
838 rsp->desc[mri].key = cpu_to_le32(mr->rkey);
839 rsp->desc[mri].len = cpu_to_le32(mr->length);
840
841 /*
842 * Fill in reg MR request and chain them *backwards*
843 */
844 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
845 rwr[mri].wr.opcode = IB_WR_REG_MR;
846 rwr[mri].wr.wr_cqe = &local_reg_cqe;
847 rwr[mri].wr.num_sge = 0;
848 rwr[mri].wr.send_flags = 0;
849 rwr[mri].mr = mr;
850 rwr[mri].key = mr->rkey;
851 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
852 IB_ACCESS_REMOTE_WRITE);
853 reg_wr = &rwr[mri].wr;
854 }
855
856 err = rtrs_srv_create_path_files(srv_path);
857 if (err)
858 goto iu_free;
859 kobject_get(&srv_path->kobj);
860 get_device(&srv_path->srv->dev);
861 err = rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED);
862 if (!err) {
863 rtrs_err(s, "rtrs_srv_change_state(), err: %d\n", err);
864 goto iu_free;
865 }
866
867 rtrs_srv_start_hb(srv_path);
868
869 /*
870 * We do not account number of established connections at the current
871 * moment, we rely on the client, which should send info request when
872 * all connections are successfully established. Thus, simply notify
873 * listener with a proper event if we are the first path.
874 */
875 err = rtrs_srv_path_up(srv_path);
876 if (err) {
877 rtrs_err(s, "rtrs_srv_path_up(), err: %d\n", err);
878 goto iu_free;
879 }
880
881 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev,
882 tx_iu->dma_addr,
883 tx_iu->size, DMA_TO_DEVICE);
884
885 /* Send info response */
886 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
887 if (err) {
888 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
889iu_free:
890 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1);
891 }
892rwr_free:
893 kfree(rwr);
894
895 return err;
896}
897
898static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
899{
900 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
901 struct rtrs_path *s = con->c.path;
902 struct rtrs_srv_path *srv_path = to_srv_path(s);
903 struct rtrs_msg_info_req *msg;
904 struct rtrs_iu *iu;
905 int err;
906
907 WARN_ON(con->c.cid);
908
909 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
910 if (wc->status != IB_WC_SUCCESS) {
911 rtrs_err(s, "Sess info request receive failed: %s\n",
912 ib_wc_status_msg(wc->status));
913 goto close;
914 }
915 WARN_ON(wc->opcode != IB_WC_RECV);
916
917 if (wc->byte_len < sizeof(*msg)) {
918 rtrs_err(s, "Sess info request is malformed: size %d\n",
919 wc->byte_len);
920 goto close;
921 }
922 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr,
923 iu->size, DMA_FROM_DEVICE);
924 msg = iu->buf;
925 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) {
926 rtrs_err(s, "Sess info request is malformed: type %d\n",
927 le16_to_cpu(msg->type));
928 goto close;
929 }
930 err = process_info_req(con, msg);
931 if (err)
932 goto close;
933
934out:
935 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1);
936 return;
937close:
938 close_path(srv_path);
939 goto out;
940}
941
942static int post_recv_info_req(struct rtrs_srv_con *con)
943{
944 struct rtrs_path *s = con->c.path;
945 struct rtrs_srv_path *srv_path = to_srv_path(s);
946 struct rtrs_iu *rx_iu;
947 int err;
948
949 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
950 GFP_KERNEL, srv_path->s.dev->ib_dev,
951 DMA_FROM_DEVICE, rtrs_srv_info_req_done);
952 if (!rx_iu)
953 return -ENOMEM;
954 /* Prepare for getting info response */
955 err = rtrs_iu_post_recv(&con->c, rx_iu);
956 if (err) {
957 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
958 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1);
959 return err;
960 }
961
962 return 0;
963}
964
965static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
966{
967 int i, err;
968
969 for (i = 0; i < q_size; i++) {
970 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
971 if (err)
972 return err;
973 }
974
975 return 0;
976}
977
978static int post_recv_path(struct rtrs_srv_path *srv_path)
979{
980 struct rtrs_srv_sess *srv = srv_path->srv;
981 struct rtrs_path *s = &srv_path->s;
982 size_t q_size;
983 int err, cid;
984
985 for (cid = 0; cid < srv_path->s.con_num; cid++) {
986 if (cid == 0)
987 q_size = SERVICE_CON_QUEUE_DEPTH;
988 else
989 q_size = srv->queue_depth;
990
991 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size);
992 if (err) {
993 rtrs_err(s, "post_recv_io(), err: %d\n", err);
994 return err;
995 }
996 }
997
998 return 0;
999}
1000
1001static void process_read(struct rtrs_srv_con *con,
1002 struct rtrs_msg_rdma_read *msg,
1003 u32 buf_id, u32 off)
1004{
1005 struct rtrs_path *s = con->c.path;
1006 struct rtrs_srv_path *srv_path = to_srv_path(s);
1007 struct rtrs_srv_sess *srv = srv_path->srv;
1008 struct rtrs_srv_ctx *ctx = srv->ctx;
1009 struct rtrs_srv_op *id;
1010
1011 size_t usr_len, data_len;
1012 void *data;
1013 int ret;
1014
1015 if (srv_path->state != RTRS_SRV_CONNECTED) {
1016 rtrs_err_rl(s,
1017 "Processing read request failed, session is disconnected, sess state %s\n",
1018 rtrs_srv_state_str(srv_path->state));
1019 return;
1020 }
1021 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) {
1022 rtrs_err_rl(s,
1023 "Processing read request failed, invalid message\n");
1024 return;
1025 }
1026 rtrs_srv_get_ops_ids(srv_path);
1027 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ);
1028 id = srv_path->ops_ids[buf_id];
1029 id->con = con;
1030 id->dir = READ;
1031 id->msg_id = buf_id;
1032 id->rd_msg = msg;
1033 usr_len = le16_to_cpu(msg->usr_len);
1034 data_len = off - usr_len;
1035 data = page_address(srv->chunks[buf_id]);
1036 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len,
1037 data + data_len, usr_len);
1038
1039 if (ret) {
1040 rtrs_err_rl(s,
1041 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
1042 buf_id, ret);
1043 goto send_err_msg;
1044 }
1045
1046 return;
1047
1048send_err_msg:
1049 ret = send_io_resp_imm(con, id, ret);
1050 if (ret < 0) {
1051 rtrs_err_rl(s,
1052 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
1053 buf_id, ret);
1054 close_path(srv_path);
1055 }
1056 rtrs_srv_put_ops_ids(srv_path);
1057}
1058
1059static void process_write(struct rtrs_srv_con *con,
1060 struct rtrs_msg_rdma_write *req,
1061 u32 buf_id, u32 off)
1062{
1063 struct rtrs_path *s = con->c.path;
1064 struct rtrs_srv_path *srv_path = to_srv_path(s);
1065 struct rtrs_srv_sess *srv = srv_path->srv;
1066 struct rtrs_srv_ctx *ctx = srv->ctx;
1067 struct rtrs_srv_op *id;
1068
1069 size_t data_len, usr_len;
1070 void *data;
1071 int ret;
1072
1073 if (srv_path->state != RTRS_SRV_CONNECTED) {
1074 rtrs_err_rl(s,
1075 "Processing write request failed, session is disconnected, sess state %s\n",
1076 rtrs_srv_state_str(srv_path->state));
1077 return;
1078 }
1079 rtrs_srv_get_ops_ids(srv_path);
1080 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE);
1081 id = srv_path->ops_ids[buf_id];
1082 id->con = con;
1083 id->dir = WRITE;
1084 id->msg_id = buf_id;
1085
1086 usr_len = le16_to_cpu(req->usr_len);
1087 data_len = off - usr_len;
1088 data = page_address(srv->chunks[buf_id]);
1089 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len,
1090 data + data_len, usr_len);
1091 if (ret) {
1092 rtrs_err_rl(s,
1093 "Processing write request failed, user module callback reports err: %d\n",
1094 ret);
1095 goto send_err_msg;
1096 }
1097
1098 return;
1099
1100send_err_msg:
1101 ret = send_io_resp_imm(con, id, ret);
1102 if (ret < 0) {
1103 rtrs_err_rl(s,
1104 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
1105 buf_id, ret);
1106 close_path(srv_path);
1107 }
1108 rtrs_srv_put_ops_ids(srv_path);
1109}
1110
1111static void process_io_req(struct rtrs_srv_con *con, void *msg,
1112 u32 id, u32 off)
1113{
1114 struct rtrs_path *s = con->c.path;
1115 struct rtrs_srv_path *srv_path = to_srv_path(s);
1116 struct rtrs_msg_rdma_hdr *hdr;
1117 unsigned int type;
1118
1119 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev,
1120 srv_path->dma_addr[id],
1121 max_chunk_size, DMA_BIDIRECTIONAL);
1122 hdr = msg;
1123 type = le16_to_cpu(hdr->type);
1124
1125 switch (type) {
1126 case RTRS_MSG_WRITE:
1127 process_write(con, msg, id, off);
1128 break;
1129 case RTRS_MSG_READ:
1130 process_read(con, msg, id, off);
1131 break;
1132 default:
1133 rtrs_err(s,
1134 "Processing I/O request failed, unknown message type received: 0x%02x\n",
1135 type);
1136 goto err;
1137 }
1138
1139 return;
1140
1141err:
1142 close_path(srv_path);
1143}
1144
1145static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
1146{
1147 struct rtrs_srv_mr *mr =
1148 container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
1149 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1150 struct rtrs_path *s = con->c.path;
1151 struct rtrs_srv_path *srv_path = to_srv_path(s);
1152 struct rtrs_srv_sess *srv = srv_path->srv;
1153 u32 msg_id, off;
1154 void *data;
1155
1156 if (wc->status != IB_WC_SUCCESS) {
1157 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
1158 ib_wc_status_msg(wc->status));
1159 close_path(srv_path);
1160 }
1161 msg_id = mr->msg_id;
1162 off = mr->msg_off;
1163 data = page_address(srv->chunks[msg_id]) + off;
1164 process_io_req(con, data, msg_id, off);
1165}
1166
1167static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
1168 struct rtrs_srv_mr *mr)
1169{
1170 struct ib_send_wr wr = {
1171 .opcode = IB_WR_LOCAL_INV,
1172 .wr_cqe = &mr->inv_cqe,
1173 .send_flags = IB_SEND_SIGNALED,
1174 .ex.invalidate_rkey = mr->mr->rkey,
1175 };
1176 mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
1177
1178 return ib_post_send(con->c.qp, &wr, NULL);
1179}
1180
1181static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
1182{
1183 spin_lock(&con->rsp_wr_wait_lock);
1184 while (!list_empty(&con->rsp_wr_wait_list)) {
1185 struct rtrs_srv_op *id;
1186 int ret;
1187
1188 id = list_entry(con->rsp_wr_wait_list.next,
1189 struct rtrs_srv_op, wait_list);
1190 list_del(&id->wait_list);
1191
1192 spin_unlock(&con->rsp_wr_wait_lock);
1193 ret = rtrs_srv_resp_rdma(id, id->status);
1194 spin_lock(&con->rsp_wr_wait_lock);
1195
1196 if (!ret) {
1197 list_add(&id->wait_list, &con->rsp_wr_wait_list);
1198 break;
1199 }
1200 }
1201 spin_unlock(&con->rsp_wr_wait_lock);
1202}
1203
1204static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
1205{
1206 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1207 struct rtrs_path *s = con->c.path;
1208 struct rtrs_srv_path *srv_path = to_srv_path(s);
1209 struct rtrs_srv_sess *srv = srv_path->srv;
1210 u32 imm_type, imm_payload;
1211 int err;
1212
1213 if (wc->status != IB_WC_SUCCESS) {
1214 if (wc->status != IB_WC_WR_FLUSH_ERR) {
1215 rtrs_err(s,
1216 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
1217 ib_wc_status_msg(wc->status), wc->wr_cqe,
1218 wc->opcode, wc->vendor_err, wc->byte_len);
1219 close_path(srv_path);
1220 }
1221 return;
1222 }
1223
1224 switch (wc->opcode) {
1225 case IB_WC_RECV_RDMA_WITH_IMM:
1226 /*
1227 * post_recv() RDMA write completions of IO reqs (read/write)
1228 * and hb
1229 */
1230 if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
1231 return;
1232 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
1233 if (err) {
1234 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
1235 close_path(srv_path);
1236 break;
1237 }
1238 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
1239 &imm_type, &imm_payload);
1240 if (imm_type == RTRS_IO_REQ_IMM) {
1241 u32 msg_id, off;
1242 void *data;
1243
1244 msg_id = imm_payload >> srv_path->mem_bits;
1245 off = imm_payload & ((1 << srv_path->mem_bits) - 1);
1246 if (msg_id >= srv->queue_depth || off >= max_chunk_size) {
1247 rtrs_err(s, "Wrong msg_id %u, off %u\n",
1248 msg_id, off);
1249 close_path(srv_path);
1250 return;
1251 }
1252 if (always_invalidate) {
1253 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id];
1254
1255 mr->msg_off = off;
1256 mr->msg_id = msg_id;
1257 err = rtrs_srv_inv_rkey(con, mr);
1258 if (err) {
1259 rtrs_err(s, "rtrs_post_recv(), err: %d\n",
1260 err);
1261 close_path(srv_path);
1262 break;
1263 }
1264 } else {
1265 data = page_address(srv->chunks[msg_id]) + off;
1266 process_io_req(con, data, msg_id, off);
1267 }
1268 } else if (imm_type == RTRS_HB_MSG_IMM) {
1269 WARN_ON(con->c.cid);
1270 rtrs_send_hb_ack(&srv_path->s);
1271 } else if (imm_type == RTRS_HB_ACK_IMM) {
1272 WARN_ON(con->c.cid);
1273 srv_path->s.hb_missed_cnt = 0;
1274 } else {
1275 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
1276 }
1277 break;
1278 case IB_WC_RDMA_WRITE:
1279 case IB_WC_SEND:
1280 /*
1281 * post_send() RDMA write completions of IO reqs (read/write)
1282 * and hb.
1283 */
1284 atomic_add(s->signal_interval, &con->c.sq_wr_avail);
1285
1286 if (!list_empty_careful(&con->rsp_wr_wait_list))
1287 rtrs_rdma_process_wr_wait_list(con);
1288
1289 break;
1290 default:
1291 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
1292 return;
1293 }
1294}
1295
1296/**
1297 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname.
1298 * @srv: Session
1299 * @pathname: Pathname buffer
1300 * @len: Length of sessname buffer
1301 */
1302int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname,
1303 size_t len)
1304{
1305 struct rtrs_srv_path *srv_path;
1306 int err = -ENOTCONN;
1307
1308 mutex_lock(&srv->paths_mutex);
1309 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
1310 if (srv_path->state != RTRS_SRV_CONNECTED)
1311 continue;
1312 strscpy(pathname, srv_path->s.sessname,
1313 min_t(size_t, sizeof(srv_path->s.sessname), len));
1314 err = 0;
1315 break;
1316 }
1317 mutex_unlock(&srv->paths_mutex);
1318
1319 return err;
1320}
1321EXPORT_SYMBOL(rtrs_srv_get_path_name);
1322
1323/**
1324 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth.
1325 * @srv: Session
1326 */
1327int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv)
1328{
1329 return srv->queue_depth;
1330}
1331EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
1332
1333static int find_next_bit_ring(struct rtrs_srv_path *srv_path)
1334{
1335 struct ib_device *ib_dev = srv_path->s.dev->ib_dev;
1336 int v;
1337
1338 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask);
1339 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
1340 v = cpumask_first(&cq_affinity_mask);
1341 return v;
1342}
1343
1344static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path)
1345{
1346 srv_path->cur_cq_vector = find_next_bit_ring(srv_path);
1347
1348 return srv_path->cur_cq_vector;
1349}
1350
1351static void rtrs_srv_dev_release(struct device *dev)
1352{
1353 struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess,
1354 dev);
1355
1356 kfree(srv);
1357}
1358
1359static void free_srv(struct rtrs_srv_sess *srv)
1360{
1361 int i;
1362
1363 WARN_ON(refcount_read(&srv->refcount));
1364 for (i = 0; i < srv->queue_depth; i++)
1365 __free_pages(srv->chunks[i], get_order(max_chunk_size));
1366 kfree(srv->chunks);
1367 mutex_destroy(&srv->paths_mutex);
1368 mutex_destroy(&srv->paths_ev_mutex);
1369 /* last put to release the srv structure */
1370 put_device(&srv->dev);
1371}
1372
1373static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx,
1374 const uuid_t *paths_uuid,
1375 bool first_conn)
1376{
1377 struct rtrs_srv_sess *srv;
1378 int i;
1379
1380 mutex_lock(&ctx->srv_mutex);
1381 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
1382 if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
1383 refcount_inc_not_zero(&srv->refcount)) {
1384 mutex_unlock(&ctx->srv_mutex);
1385 return srv;
1386 }
1387 }
1388 mutex_unlock(&ctx->srv_mutex);
1389 /*
1390 * If this request is not the first connection request from the
1391 * client for this session then fail and return error.
1392 */
1393 if (!first_conn) {
1394 pr_err_ratelimited("Error: Not the first connection request for this session\n");
1395 return ERR_PTR(-ENXIO);
1396 }
1397
1398 /* need to allocate a new srv */
1399 srv = kzalloc(sizeof(*srv), GFP_KERNEL);
1400 if (!srv)
1401 return ERR_PTR(-ENOMEM);
1402
1403 INIT_LIST_HEAD(&srv->paths_list);
1404 mutex_init(&srv->paths_mutex);
1405 mutex_init(&srv->paths_ev_mutex);
1406 uuid_copy(&srv->paths_uuid, paths_uuid);
1407 srv->queue_depth = sess_queue_depth;
1408 srv->ctx = ctx;
1409 device_initialize(&srv->dev);
1410 srv->dev.release = rtrs_srv_dev_release;
1411
1412 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
1413 GFP_KERNEL);
1414 if (!srv->chunks)
1415 goto err_free_srv;
1416
1417 for (i = 0; i < srv->queue_depth; i++) {
1418 srv->chunks[i] = alloc_pages(GFP_KERNEL,
1419 get_order(max_chunk_size));
1420 if (!srv->chunks[i])
1421 goto err_free_chunks;
1422 }
1423 refcount_set(&srv->refcount, 1);
1424 mutex_lock(&ctx->srv_mutex);
1425 list_add(&srv->ctx_list, &ctx->srv_list);
1426 mutex_unlock(&ctx->srv_mutex);
1427
1428 return srv;
1429
1430err_free_chunks:
1431 while (i--)
1432 __free_pages(srv->chunks[i], get_order(max_chunk_size));
1433 kfree(srv->chunks);
1434
1435err_free_srv:
1436 kfree(srv);
1437 return ERR_PTR(-ENOMEM);
1438}
1439
1440static void put_srv(struct rtrs_srv_sess *srv)
1441{
1442 if (refcount_dec_and_test(&srv->refcount)) {
1443 struct rtrs_srv_ctx *ctx = srv->ctx;
1444
1445 WARN_ON(srv->dev.kobj.state_in_sysfs);
1446
1447 mutex_lock(&ctx->srv_mutex);
1448 list_del(&srv->ctx_list);
1449 mutex_unlock(&ctx->srv_mutex);
1450 free_srv(srv);
1451 }
1452}
1453
1454static void __add_path_to_srv(struct rtrs_srv_sess *srv,
1455 struct rtrs_srv_path *srv_path)
1456{
1457 list_add_tail(&srv_path->s.entry, &srv->paths_list);
1458 srv->paths_num++;
1459 WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
1460}
1461
1462static void del_path_from_srv(struct rtrs_srv_path *srv_path)
1463{
1464 struct rtrs_srv_sess *srv = srv_path->srv;
1465
1466 if (WARN_ON(!srv))
1467 return;
1468
1469 mutex_lock(&srv->paths_mutex);
1470 list_del(&srv_path->s.entry);
1471 WARN_ON(!srv->paths_num);
1472 srv->paths_num--;
1473 mutex_unlock(&srv->paths_mutex);
1474}
1475
1476/* return true if addresses are the same, error other wise */
1477static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
1478{
1479 switch (a->sa_family) {
1480 case AF_IB:
1481 return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
1482 &((struct sockaddr_ib *)b)->sib_addr,
1483 sizeof(struct ib_addr)) &&
1484 (b->sa_family == AF_IB);
1485 case AF_INET:
1486 return memcmp(&((struct sockaddr_in *)a)->sin_addr,
1487 &((struct sockaddr_in *)b)->sin_addr,
1488 sizeof(struct in_addr)) &&
1489 (b->sa_family == AF_INET);
1490 case AF_INET6:
1491 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
1492 &((struct sockaddr_in6 *)b)->sin6_addr,
1493 sizeof(struct in6_addr)) &&
1494 (b->sa_family == AF_INET6);
1495 default:
1496 return -ENOENT;
1497 }
1498}
1499
1500static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv,
1501 struct rdma_addr *addr)
1502{
1503 struct rtrs_srv_path *srv_path;
1504
1505 list_for_each_entry(srv_path, &srv->paths_list, s.entry)
1506 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr,
1507 (struct sockaddr *)&addr->dst_addr) &&
1508 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr,
1509 (struct sockaddr *)&addr->src_addr))
1510 return true;
1511
1512 return false;
1513}
1514
1515static void free_path(struct rtrs_srv_path *srv_path)
1516{
1517 if (srv_path->kobj.state_in_sysfs) {
1518 kobject_del(&srv_path->kobj);
1519 kobject_put(&srv_path->kobj);
1520 } else {
1521 free_percpu(srv_path->stats->rdma_stats);
1522 kfree(srv_path->stats);
1523 kfree(srv_path);
1524 }
1525}
1526
1527static void rtrs_srv_close_work(struct work_struct *work)
1528{
1529 struct rtrs_srv_path *srv_path;
1530 struct rtrs_srv_con *con;
1531 int i;
1532
1533 srv_path = container_of(work, typeof(*srv_path), close_work);
1534
1535 rtrs_srv_stop_hb(srv_path);
1536
1537 for (i = 0; i < srv_path->s.con_num; i++) {
1538 if (!srv_path->s.con[i])
1539 continue;
1540 con = to_srv_con(srv_path->s.con[i]);
1541 rdma_disconnect(con->c.cm_id);
1542 ib_drain_qp(con->c.qp);
1543 }
1544
1545 /*
1546 * Degrade ref count to the usual model with a single shared
1547 * atomic_t counter
1548 */
1549 percpu_ref_kill(&srv_path->ids_inflight_ref);
1550
1551 /* Wait for all completion */
1552 wait_for_completion(&srv_path->complete_done);
1553
1554 rtrs_srv_destroy_path_files(srv_path);
1555
1556 /* Notify upper layer if we are the last path */
1557 rtrs_srv_path_down(srv_path);
1558
1559 unmap_cont_bufs(srv_path);
1560 rtrs_srv_free_ops_ids(srv_path);
1561
1562 for (i = 0; i < srv_path->s.con_num; i++) {
1563 if (!srv_path->s.con[i])
1564 continue;
1565 con = to_srv_con(srv_path->s.con[i]);
1566 rtrs_cq_qp_destroy(&con->c);
1567 rdma_destroy_id(con->c.cm_id);
1568 kfree(con);
1569 }
1570 rtrs_ib_dev_put(srv_path->s.dev);
1571
1572 del_path_from_srv(srv_path);
1573 put_srv(srv_path->srv);
1574 srv_path->srv = NULL;
1575 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED);
1576
1577 kfree(srv_path->dma_addr);
1578 kfree(srv_path->s.con);
1579 free_path(srv_path);
1580}
1581
1582static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path,
1583 struct rdma_cm_id *cm_id)
1584{
1585 struct rtrs_srv_sess *srv = srv_path->srv;
1586 struct rtrs_msg_conn_rsp msg;
1587 struct rdma_conn_param param;
1588 int err;
1589
1590 param = (struct rdma_conn_param) {
1591 .rnr_retry_count = 7,
1592 .private_data = &msg,
1593 .private_data_len = sizeof(msg),
1594 };
1595
1596 msg = (struct rtrs_msg_conn_rsp) {
1597 .magic = cpu_to_le16(RTRS_MAGIC),
1598 .version = cpu_to_le16(RTRS_PROTO_VER),
1599 .queue_depth = cpu_to_le16(srv->queue_depth),
1600 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
1601 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
1602 };
1603
1604 if (always_invalidate)
1605 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
1606
1607 err = rdma_accept(cm_id, ¶m);
1608 if (err)
1609 pr_err("rdma_accept(), err: %d\n", err);
1610
1611 return err;
1612}
1613
1614static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
1615{
1616 struct rtrs_msg_conn_rsp msg;
1617 int err;
1618
1619 msg = (struct rtrs_msg_conn_rsp) {
1620 .magic = cpu_to_le16(RTRS_MAGIC),
1621 .version = cpu_to_le16(RTRS_PROTO_VER),
1622 .errno = cpu_to_le16(errno),
1623 };
1624
1625 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
1626 if (err)
1627 pr_err("rdma_reject(), err: %d\n", err);
1628
1629 /* Bounce errno back */
1630 return errno;
1631}
1632
1633static struct rtrs_srv_path *
1634__find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid)
1635{
1636 struct rtrs_srv_path *srv_path;
1637
1638 list_for_each_entry(srv_path, &srv->paths_list, s.entry) {
1639 if (uuid_equal(&srv_path->s.uuid, sess_uuid))
1640 return srv_path;
1641 }
1642
1643 return NULL;
1644}
1645
1646static int create_con(struct rtrs_srv_path *srv_path,
1647 struct rdma_cm_id *cm_id,
1648 unsigned int cid)
1649{
1650 struct rtrs_srv_sess *srv = srv_path->srv;
1651 struct rtrs_path *s = &srv_path->s;
1652 struct rtrs_srv_con *con;
1653
1654 u32 cq_num, max_send_wr, max_recv_wr, wr_limit;
1655 int err, cq_vector;
1656
1657 con = kzalloc(sizeof(*con), GFP_KERNEL);
1658 if (!con) {
1659 err = -ENOMEM;
1660 goto err;
1661 }
1662
1663 spin_lock_init(&con->rsp_wr_wait_lock);
1664 INIT_LIST_HEAD(&con->rsp_wr_wait_list);
1665 con->c.cm_id = cm_id;
1666 con->c.path = &srv_path->s;
1667 con->c.cid = cid;
1668 atomic_set(&con->c.wr_cnt, 1);
1669 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr;
1670
1671 if (con->c.cid == 0) {
1672 /*
1673 * All receive and all send (each requiring invalidate)
1674 * + 2 for drain and heartbeat
1675 */
1676 max_send_wr = min_t(int, wr_limit,
1677 SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1678 max_recv_wr = max_send_wr;
1679 s->signal_interval = min_not_zero(srv->queue_depth,
1680 (size_t)SERVICE_CON_QUEUE_DEPTH);
1681 } else {
1682 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */
1683 if (always_invalidate)
1684 max_send_wr =
1685 min_t(int, wr_limit,
1686 srv->queue_depth * (1 + 4) + 1);
1687 else
1688 max_send_wr =
1689 min_t(int, wr_limit,
1690 srv->queue_depth * (1 + 2) + 1);
1691
1692 max_recv_wr = srv->queue_depth + 1;
1693 }
1694 cq_num = max_send_wr + max_recv_wr;
1695 atomic_set(&con->c.sq_wr_avail, max_send_wr);
1696 cq_vector = rtrs_srv_get_next_cq_vector(srv_path);
1697
1698 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */
1699 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num,
1700 max_send_wr, max_recv_wr,
1701 IB_POLL_WORKQUEUE);
1702 if (err) {
1703 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
1704 goto free_con;
1705 }
1706 if (con->c.cid == 0) {
1707 err = post_recv_info_req(con);
1708 if (err)
1709 goto free_cqqp;
1710 }
1711 WARN_ON(srv_path->s.con[cid]);
1712 srv_path->s.con[cid] = &con->c;
1713
1714 /*
1715 * Change context from server to current connection. The other
1716 * way is to use cm_id->qp->qp_context, which does not work on OFED.
1717 */
1718 cm_id->context = &con->c;
1719
1720 return 0;
1721
1722free_cqqp:
1723 rtrs_cq_qp_destroy(&con->c);
1724free_con:
1725 kfree(con);
1726
1727err:
1728 return err;
1729}
1730
1731static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv,
1732 struct rdma_cm_id *cm_id,
1733 unsigned int con_num,
1734 unsigned int recon_cnt,
1735 const uuid_t *uuid)
1736{
1737 struct rtrs_srv_path *srv_path;
1738 int err = -ENOMEM;
1739 char str[NAME_MAX];
1740 struct rtrs_addr path;
1741
1742 if (srv->paths_num >= MAX_PATHS_NUM) {
1743 err = -ECONNRESET;
1744 goto err;
1745 }
1746 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
1747 err = -EEXIST;
1748 pr_err("Path with same addr exists\n");
1749 goto err;
1750 }
1751 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL);
1752 if (!srv_path)
1753 goto err;
1754
1755 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL);
1756 if (!srv_path->stats)
1757 goto err_free_sess;
1758
1759 srv_path->stats->rdma_stats = alloc_percpu(struct rtrs_srv_stats_rdma_stats);
1760 if (!srv_path->stats->rdma_stats)
1761 goto err_free_stats;
1762
1763 srv_path->stats->srv_path = srv_path;
1764
1765 srv_path->dma_addr = kcalloc(srv->queue_depth,
1766 sizeof(*srv_path->dma_addr),
1767 GFP_KERNEL);
1768 if (!srv_path->dma_addr)
1769 goto err_free_percpu;
1770
1771 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con),
1772 GFP_KERNEL);
1773 if (!srv_path->s.con)
1774 goto err_free_dma_addr;
1775
1776 srv_path->state = RTRS_SRV_CONNECTING;
1777 srv_path->srv = srv;
1778 srv_path->cur_cq_vector = -1;
1779 srv_path->s.dst_addr = cm_id->route.addr.dst_addr;
1780 srv_path->s.src_addr = cm_id->route.addr.src_addr;
1781
1782 /* temporary until receiving session-name from client */
1783 path.src = &srv_path->s.src_addr;
1784 path.dst = &srv_path->s.dst_addr;
1785 rtrs_addr_to_str(&path, str, sizeof(str));
1786 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname));
1787
1788 srv_path->s.con_num = con_num;
1789 srv_path->s.irq_con_num = con_num;
1790 srv_path->s.recon_cnt = recon_cnt;
1791 uuid_copy(&srv_path->s.uuid, uuid);
1792 spin_lock_init(&srv_path->state_lock);
1793 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work);
1794 rtrs_srv_init_hb(srv_path);
1795
1796 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
1797 if (!srv_path->s.dev) {
1798 err = -ENOMEM;
1799 goto err_free_con;
1800 }
1801 err = map_cont_bufs(srv_path);
1802 if (err)
1803 goto err_put_dev;
1804
1805 err = rtrs_srv_alloc_ops_ids(srv_path);
1806 if (err)
1807 goto err_unmap_bufs;
1808
1809 __add_path_to_srv(srv, srv_path);
1810
1811 return srv_path;
1812
1813err_unmap_bufs:
1814 unmap_cont_bufs(srv_path);
1815err_put_dev:
1816 rtrs_ib_dev_put(srv_path->s.dev);
1817err_free_con:
1818 kfree(srv_path->s.con);
1819err_free_dma_addr:
1820 kfree(srv_path->dma_addr);
1821err_free_percpu:
1822 free_percpu(srv_path->stats->rdma_stats);
1823err_free_stats:
1824 kfree(srv_path->stats);
1825err_free_sess:
1826 kfree(srv_path);
1827err:
1828 return ERR_PTR(err);
1829}
1830
1831static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
1832 const struct rtrs_msg_conn_req *msg,
1833 size_t len)
1834{
1835 struct rtrs_srv_ctx *ctx = cm_id->context;
1836 struct rtrs_srv_path *srv_path;
1837 struct rtrs_srv_sess *srv;
1838
1839 u16 version, con_num, cid;
1840 u16 recon_cnt;
1841 int err = -ECONNRESET;
1842
1843 if (len < sizeof(*msg)) {
1844 pr_err("Invalid RTRS connection request\n");
1845 goto reject_w_err;
1846 }
1847 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1848 pr_err("Invalid RTRS magic\n");
1849 goto reject_w_err;
1850 }
1851 version = le16_to_cpu(msg->version);
1852 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1853 pr_err("Unsupported major RTRS version: %d, expected %d\n",
1854 version >> 8, RTRS_PROTO_VER_MAJOR);
1855 goto reject_w_err;
1856 }
1857 con_num = le16_to_cpu(msg->cid_num);
1858 if (con_num > 4096) {
1859 /* Sanity check */
1860 pr_err("Too many connections requested: %d\n", con_num);
1861 goto reject_w_err;
1862 }
1863 cid = le16_to_cpu(msg->cid);
1864 if (cid >= con_num) {
1865 /* Sanity check */
1866 pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
1867 goto reject_w_err;
1868 }
1869 recon_cnt = le16_to_cpu(msg->recon_cnt);
1870 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn);
1871 if (IS_ERR(srv)) {
1872 err = PTR_ERR(srv);
1873 pr_err("get_or_create_srv(), error %d\n", err);
1874 goto reject_w_err;
1875 }
1876 mutex_lock(&srv->paths_mutex);
1877 srv_path = __find_path(srv, &msg->sess_uuid);
1878 if (srv_path) {
1879 struct rtrs_path *s = &srv_path->s;
1880
1881 /* Session already holds a reference */
1882 put_srv(srv);
1883
1884 if (srv_path->state != RTRS_SRV_CONNECTING) {
1885 rtrs_err(s, "Session in wrong state: %s\n",
1886 rtrs_srv_state_str(srv_path->state));
1887 mutex_unlock(&srv->paths_mutex);
1888 goto reject_w_err;
1889 }
1890 /*
1891 * Sanity checks
1892 */
1893 if (con_num != s->con_num || cid >= s->con_num) {
1894 rtrs_err(s, "Incorrect request: %d, %d\n",
1895 cid, con_num);
1896 mutex_unlock(&srv->paths_mutex);
1897 goto reject_w_err;
1898 }
1899 if (s->con[cid]) {
1900 rtrs_err(s, "Connection already exists: %d\n",
1901 cid);
1902 mutex_unlock(&srv->paths_mutex);
1903 goto reject_w_err;
1904 }
1905 } else {
1906 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt,
1907 &msg->sess_uuid);
1908 if (IS_ERR(srv_path)) {
1909 mutex_unlock(&srv->paths_mutex);
1910 put_srv(srv);
1911 err = PTR_ERR(srv_path);
1912 pr_err("RTRS server session allocation failed: %d\n", err);
1913 goto reject_w_err;
1914 }
1915 }
1916 err = create_con(srv_path, cm_id, cid);
1917 if (err) {
1918 rtrs_err((&srv_path->s), "create_con(), error %d\n", err);
1919 rtrs_rdma_do_reject(cm_id, err);
1920 /*
1921 * Since session has other connections we follow normal way
1922 * through workqueue, but still return an error to tell cma.c
1923 * to call rdma_destroy_id() for current connection.
1924 */
1925 goto close_and_return_err;
1926 }
1927 err = rtrs_rdma_do_accept(srv_path, cm_id);
1928 if (err) {
1929 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err);
1930 rtrs_rdma_do_reject(cm_id, err);
1931 /*
1932 * Since current connection was successfully added to the
1933 * session we follow normal way through workqueue to close the
1934 * session, thus return 0 to tell cma.c we call
1935 * rdma_destroy_id() ourselves.
1936 */
1937 err = 0;
1938 goto close_and_return_err;
1939 }
1940 mutex_unlock(&srv->paths_mutex);
1941
1942 return 0;
1943
1944reject_w_err:
1945 return rtrs_rdma_do_reject(cm_id, err);
1946
1947close_and_return_err:
1948 mutex_unlock(&srv->paths_mutex);
1949 close_path(srv_path);
1950
1951 return err;
1952}
1953
1954static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
1955 struct rdma_cm_event *ev)
1956{
1957 struct rtrs_srv_path *srv_path = NULL;
1958 struct rtrs_path *s = NULL;
1959 struct rtrs_con *c = NULL;
1960
1961 if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
1962 /*
1963 * In case of error cma.c will destroy cm_id,
1964 * see cma_process_remove()
1965 */
1966 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
1967 ev->param.conn.private_data_len);
1968
1969 c = cm_id->context;
1970 s = c->path;
1971 srv_path = to_srv_path(s);
1972
1973 switch (ev->event) {
1974 case RDMA_CM_EVENT_ESTABLISHED:
1975 /* Nothing here */
1976 break;
1977 case RDMA_CM_EVENT_REJECTED:
1978 case RDMA_CM_EVENT_CONNECT_ERROR:
1979 case RDMA_CM_EVENT_UNREACHABLE:
1980 rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
1981 rdma_event_msg(ev->event), ev->status);
1982 fallthrough;
1983 case RDMA_CM_EVENT_DISCONNECTED:
1984 case RDMA_CM_EVENT_ADDR_CHANGE:
1985 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1986 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1987 close_path(srv_path);
1988 break;
1989 default:
1990 pr_err("Ignoring unexpected CM event %s, err %d\n",
1991 rdma_event_msg(ev->event), ev->status);
1992 break;
1993 }
1994
1995 return 0;
1996}
1997
1998static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
1999 struct sockaddr *addr,
2000 enum rdma_ucm_port_space ps)
2001{
2002 struct rdma_cm_id *cm_id;
2003 int ret;
2004
2005 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
2006 ctx, ps, IB_QPT_RC);
2007 if (IS_ERR(cm_id)) {
2008 ret = PTR_ERR(cm_id);
2009 pr_err("Creating id for RDMA connection failed, err: %d\n",
2010 ret);
2011 goto err_out;
2012 }
2013 ret = rdma_bind_addr(cm_id, addr);
2014 if (ret) {
2015 pr_err("Binding RDMA address failed, err: %d\n", ret);
2016 goto err_cm;
2017 }
2018 ret = rdma_listen(cm_id, 64);
2019 if (ret) {
2020 pr_err("Listening on RDMA connection failed, err: %d\n",
2021 ret);
2022 goto err_cm;
2023 }
2024
2025 return cm_id;
2026
2027err_cm:
2028 rdma_destroy_id(cm_id);
2029err_out:
2030
2031 return ERR_PTR(ret);
2032}
2033
2034static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
2035{
2036 struct sockaddr_in6 sin = {
2037 .sin6_family = AF_INET6,
2038 .sin6_addr = IN6ADDR_ANY_INIT,
2039 .sin6_port = htons(port),
2040 };
2041 struct sockaddr_ib sib = {
2042 .sib_family = AF_IB,
2043 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port),
2044 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL),
2045 .sib_pkey = cpu_to_be16(0xffff),
2046 };
2047 struct rdma_cm_id *cm_ip, *cm_ib;
2048 int ret;
2049
2050 /*
2051 * We accept both IPoIB and IB connections, so we need to keep
2052 * two cm id's, one for each socket type and port space.
2053 * If the cm initialization of one of the id's fails, we abort
2054 * everything.
2055 */
2056 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
2057 if (IS_ERR(cm_ip))
2058 return PTR_ERR(cm_ip);
2059
2060 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
2061 if (IS_ERR(cm_ib)) {
2062 ret = PTR_ERR(cm_ib);
2063 goto free_cm_ip;
2064 }
2065
2066 ctx->cm_id_ip = cm_ip;
2067 ctx->cm_id_ib = cm_ib;
2068
2069 return 0;
2070
2071free_cm_ip:
2072 rdma_destroy_id(cm_ip);
2073
2074 return ret;
2075}
2076
2077static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
2078{
2079 struct rtrs_srv_ctx *ctx;
2080
2081 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
2082 if (!ctx)
2083 return NULL;
2084
2085 ctx->ops = *ops;
2086 mutex_init(&ctx->srv_mutex);
2087 INIT_LIST_HEAD(&ctx->srv_list);
2088
2089 return ctx;
2090}
2091
2092static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
2093{
2094 WARN_ON(!list_empty(&ctx->srv_list));
2095 mutex_destroy(&ctx->srv_mutex);
2096 kfree(ctx);
2097}
2098
2099static int rtrs_srv_add_one(struct ib_device *device)
2100{
2101 struct rtrs_srv_ctx *ctx;
2102 int ret = 0;
2103
2104 mutex_lock(&ib_ctx.ib_dev_mutex);
2105 if (ib_ctx.ib_dev_count)
2106 goto out;
2107
2108 /*
2109 * Since our CM IDs are NOT bound to any ib device we will create them
2110 * only once
2111 */
2112 ctx = ib_ctx.srv_ctx;
2113 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port);
2114 if (ret) {
2115 /*
2116 * We errored out here.
2117 * According to the ib code, if we encounter an error here then the
2118 * error code is ignored, and no more calls to our ops are made.
2119 */
2120 pr_err("Failed to initialize RDMA connection");
2121 goto err_out;
2122 }
2123
2124out:
2125 /*
2126 * Keep a track on the number of ib devices added
2127 */
2128 ib_ctx.ib_dev_count++;
2129
2130err_out:
2131 mutex_unlock(&ib_ctx.ib_dev_mutex);
2132 return ret;
2133}
2134
2135static void rtrs_srv_remove_one(struct ib_device *device, void *client_data)
2136{
2137 struct rtrs_srv_ctx *ctx;
2138
2139 mutex_lock(&ib_ctx.ib_dev_mutex);
2140 ib_ctx.ib_dev_count--;
2141
2142 if (ib_ctx.ib_dev_count)
2143 goto out;
2144
2145 /*
2146 * Since our CM IDs are NOT bound to any ib device we will remove them
2147 * only once, when the last device is removed
2148 */
2149 ctx = ib_ctx.srv_ctx;
2150 rdma_destroy_id(ctx->cm_id_ip);
2151 rdma_destroy_id(ctx->cm_id_ib);
2152
2153out:
2154 mutex_unlock(&ib_ctx.ib_dev_mutex);
2155}
2156
2157static struct ib_client rtrs_srv_client = {
2158 .name = "rtrs_server",
2159 .add = rtrs_srv_add_one,
2160 .remove = rtrs_srv_remove_one
2161};
2162
2163/**
2164 * rtrs_srv_open() - open RTRS server context
2165 * @ops: callback functions
2166 * @port: port to listen on
2167 *
2168 * Creates server context with specified callbacks.
2169 *
2170 * Return a valid pointer on success otherwise PTR_ERR.
2171 */
2172struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
2173{
2174 struct rtrs_srv_ctx *ctx;
2175 int err;
2176
2177 ctx = alloc_srv_ctx(ops);
2178 if (!ctx)
2179 return ERR_PTR(-ENOMEM);
2180
2181 mutex_init(&ib_ctx.ib_dev_mutex);
2182 ib_ctx.srv_ctx = ctx;
2183 ib_ctx.port = port;
2184
2185 err = ib_register_client(&rtrs_srv_client);
2186 if (err) {
2187 free_srv_ctx(ctx);
2188 return ERR_PTR(err);
2189 }
2190
2191 return ctx;
2192}
2193EXPORT_SYMBOL(rtrs_srv_open);
2194
2195static void close_paths(struct rtrs_srv_sess *srv)
2196{
2197 struct rtrs_srv_path *srv_path;
2198
2199 mutex_lock(&srv->paths_mutex);
2200 list_for_each_entry(srv_path, &srv->paths_list, s.entry)
2201 close_path(srv_path);
2202 mutex_unlock(&srv->paths_mutex);
2203}
2204
2205static void close_ctx(struct rtrs_srv_ctx *ctx)
2206{
2207 struct rtrs_srv_sess *srv;
2208
2209 mutex_lock(&ctx->srv_mutex);
2210 list_for_each_entry(srv, &ctx->srv_list, ctx_list)
2211 close_paths(srv);
2212 mutex_unlock(&ctx->srv_mutex);
2213 flush_workqueue(rtrs_wq);
2214}
2215
2216/**
2217 * rtrs_srv_close() - close RTRS server context
2218 * @ctx: pointer to server context
2219 *
2220 * Closes RTRS server context with all client sessions.
2221 */
2222void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
2223{
2224 ib_unregister_client(&rtrs_srv_client);
2225 mutex_destroy(&ib_ctx.ib_dev_mutex);
2226 close_ctx(ctx);
2227 free_srv_ctx(ctx);
2228}
2229EXPORT_SYMBOL(rtrs_srv_close);
2230
2231static int check_module_params(void)
2232{
2233 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
2234 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
2235 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
2236 return -EINVAL;
2237 }
2238 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) {
2239 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
2240 max_chunk_size, MIN_CHUNK_SIZE);
2241 return -EINVAL;
2242 }
2243
2244 /*
2245 * Check if IB immediate data size is enough to hold the mem_id and the
2246 * offset inside the memory chunk
2247 */
2248 if ((ilog2(sess_queue_depth - 1) + 1) +
2249 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
2250 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
2251 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
2252 return -EINVAL;
2253 }
2254
2255 return 0;
2256}
2257
2258static int __init rtrs_server_init(void)
2259{
2260 int err;
2261
2262 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
2263 KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
2264 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
2265 sess_queue_depth, always_invalidate);
2266
2267 rtrs_rdma_dev_pd_init(0, &dev_pd);
2268
2269 err = check_module_params();
2270 if (err) {
2271 pr_err("Failed to load module, invalid module parameters, err: %d\n",
2272 err);
2273 return err;
2274 }
2275 err = class_register(&rtrs_dev_class);
2276 if (err)
2277 goto out_err;
2278
2279 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
2280 if (!rtrs_wq) {
2281 err = -ENOMEM;
2282 goto out_dev_class;
2283 }
2284
2285 return 0;
2286
2287out_dev_class:
2288 class_unregister(&rtrs_dev_class);
2289out_err:
2290 return err;
2291}
2292
2293static void __exit rtrs_server_exit(void)
2294{
2295 destroy_workqueue(rtrs_wq);
2296 class_unregister(&rtrs_dev_class);
2297 rtrs_rdma_dev_pd_deinit(&dev_pd);
2298}
2299
2300module_init(rtrs_server_init);
2301module_exit(rtrs_server_exit);
1// SPDX-License-Identifier: GPL-2.0-or-later
2/*
3 * RDMA Transport Layer
4 *
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8 */
9
10#undef pr_fmt
11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13#include <linux/module.h>
14#include <linux/mempool.h>
15
16#include "rtrs-srv.h"
17#include "rtrs-log.h"
18#include <rdma/ib_cm.h>
19
20MODULE_DESCRIPTION("RDMA Transport Server");
21MODULE_LICENSE("GPL");
22
23/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
24#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
25#define DEFAULT_SESS_QUEUE_DEPTH 512
26#define MAX_HDR_SIZE PAGE_SIZE
27
28/* We guarantee to serve 10 paths at least */
29#define CHUNK_POOL_SZ 10
30
31static struct rtrs_rdma_dev_pd dev_pd;
32static mempool_t *chunk_pool;
33struct class *rtrs_dev_class;
34
35static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
36static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
37
38static bool always_invalidate = true;
39module_param(always_invalidate, bool, 0444);
40MODULE_PARM_DESC(always_invalidate,
41 "Invalidate memory registration for contiguous memory regions before accessing.");
42
43module_param_named(max_chunk_size, max_chunk_size, int, 0444);
44MODULE_PARM_DESC(max_chunk_size,
45 "Max size for each IO request, when change the unit is in byte (default: "
46 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
47
48module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
49MODULE_PARM_DESC(sess_queue_depth,
50 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
51 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
52 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
53
54static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
55
56static struct workqueue_struct *rtrs_wq;
57
58static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
59{
60 return container_of(c, struct rtrs_srv_con, c);
61}
62
63static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s)
64{
65 return container_of(s, struct rtrs_srv_sess, s);
66}
67
68static bool __rtrs_srv_change_state(struct rtrs_srv_sess *sess,
69 enum rtrs_srv_state new_state)
70{
71 enum rtrs_srv_state old_state;
72 bool changed = false;
73
74 lockdep_assert_held(&sess->state_lock);
75 old_state = sess->state;
76 switch (new_state) {
77 case RTRS_SRV_CONNECTED:
78 switch (old_state) {
79 case RTRS_SRV_CONNECTING:
80 changed = true;
81 fallthrough;
82 default:
83 break;
84 }
85 break;
86 case RTRS_SRV_CLOSING:
87 switch (old_state) {
88 case RTRS_SRV_CONNECTING:
89 case RTRS_SRV_CONNECTED:
90 changed = true;
91 fallthrough;
92 default:
93 break;
94 }
95 break;
96 case RTRS_SRV_CLOSED:
97 switch (old_state) {
98 case RTRS_SRV_CLOSING:
99 changed = true;
100 fallthrough;
101 default:
102 break;
103 }
104 break;
105 default:
106 break;
107 }
108 if (changed)
109 sess->state = new_state;
110
111 return changed;
112}
113
114static bool rtrs_srv_change_state_get_old(struct rtrs_srv_sess *sess,
115 enum rtrs_srv_state new_state,
116 enum rtrs_srv_state *old_state)
117{
118 bool changed;
119
120 spin_lock_irq(&sess->state_lock);
121 *old_state = sess->state;
122 changed = __rtrs_srv_change_state(sess, new_state);
123 spin_unlock_irq(&sess->state_lock);
124
125 return changed;
126}
127
128static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess,
129 enum rtrs_srv_state new_state)
130{
131 enum rtrs_srv_state old_state;
132
133 return rtrs_srv_change_state_get_old(sess, new_state, &old_state);
134}
135
136static void free_id(struct rtrs_srv_op *id)
137{
138 if (!id)
139 return;
140 kfree(id);
141}
142
143static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess)
144{
145 struct rtrs_srv *srv = sess->srv;
146 int i;
147
148 WARN_ON(atomic_read(&sess->ids_inflight));
149 if (sess->ops_ids) {
150 for (i = 0; i < srv->queue_depth; i++)
151 free_id(sess->ops_ids[i]);
152 kfree(sess->ops_ids);
153 sess->ops_ids = NULL;
154 }
155}
156
157static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
158
159static struct ib_cqe io_comp_cqe = {
160 .done = rtrs_srv_rdma_done
161};
162
163static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess)
164{
165 struct rtrs_srv *srv = sess->srv;
166 struct rtrs_srv_op *id;
167 int i;
168
169 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids),
170 GFP_KERNEL);
171 if (!sess->ops_ids)
172 goto err;
173
174 for (i = 0; i < srv->queue_depth; ++i) {
175 id = kzalloc(sizeof(*id), GFP_KERNEL);
176 if (!id)
177 goto err;
178
179 sess->ops_ids[i] = id;
180 }
181 init_waitqueue_head(&sess->ids_waitq);
182 atomic_set(&sess->ids_inflight, 0);
183
184 return 0;
185
186err:
187 rtrs_srv_free_ops_ids(sess);
188 return -ENOMEM;
189}
190
191static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess)
192{
193 atomic_inc(&sess->ids_inflight);
194}
195
196static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess)
197{
198 if (atomic_dec_and_test(&sess->ids_inflight))
199 wake_up(&sess->ids_waitq);
200}
201
202static void rtrs_srv_wait_ops_ids(struct rtrs_srv_sess *sess)
203{
204 wait_event(sess->ids_waitq, !atomic_read(&sess->ids_inflight));
205}
206
207
208static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
209{
210 struct rtrs_srv_con *con = cq->cq_context;
211 struct rtrs_sess *s = con->c.sess;
212 struct rtrs_srv_sess *sess = to_srv_sess(s);
213
214 if (unlikely(wc->status != IB_WC_SUCCESS)) {
215 rtrs_err(s, "REG MR failed: %s\n",
216 ib_wc_status_msg(wc->status));
217 close_sess(sess);
218 return;
219 }
220}
221
222static struct ib_cqe local_reg_cqe = {
223 .done = rtrs_srv_reg_mr_done
224};
225
226static int rdma_write_sg(struct rtrs_srv_op *id)
227{
228 struct rtrs_sess *s = id->con->c.sess;
229 struct rtrs_srv_sess *sess = to_srv_sess(s);
230 dma_addr_t dma_addr = sess->dma_addr[id->msg_id];
231 struct rtrs_srv_mr *srv_mr;
232 struct rtrs_srv *srv = sess->srv;
233 struct ib_send_wr inv_wr, imm_wr;
234 struct ib_rdma_wr *wr = NULL;
235 enum ib_send_flags flags;
236 size_t sg_cnt;
237 int err, offset;
238 bool need_inval;
239 u32 rkey = 0;
240 struct ib_reg_wr rwr;
241 struct ib_sge *plist;
242 struct ib_sge list;
243
244 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
245 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
246 if (unlikely(sg_cnt != 1))
247 return -EINVAL;
248
249 offset = 0;
250
251 wr = &id->tx_wr;
252 plist = &id->tx_sg;
253 plist->addr = dma_addr + offset;
254 plist->length = le32_to_cpu(id->rd_msg->desc[0].len);
255
256 /* WR will fail with length error
257 * if this is 0
258 */
259 if (unlikely(plist->length == 0)) {
260 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
261 return -EINVAL;
262 }
263
264 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey;
265 offset += plist->length;
266
267 wr->wr.sg_list = plist;
268 wr->wr.num_sge = 1;
269 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr);
270 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key);
271 if (rkey == 0)
272 rkey = wr->rkey;
273 else
274 /* Only one key is actually used */
275 WARN_ON_ONCE(rkey != wr->rkey);
276
277 wr->wr.opcode = IB_WR_RDMA_WRITE;
278 wr->wr.ex.imm_data = 0;
279 wr->wr.send_flags = 0;
280
281 if (need_inval && always_invalidate) {
282 wr->wr.next = &rwr.wr;
283 rwr.wr.next = &inv_wr;
284 inv_wr.next = &imm_wr;
285 } else if (always_invalidate) {
286 wr->wr.next = &rwr.wr;
287 rwr.wr.next = &imm_wr;
288 } else if (need_inval) {
289 wr->wr.next = &inv_wr;
290 inv_wr.next = &imm_wr;
291 } else {
292 wr->wr.next = &imm_wr;
293 }
294 /*
295 * From time to time we have to post signaled sends,
296 * or send queue will fill up and only QP reset can help.
297 */
298 flags = (atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth) ?
299 0 : IB_SEND_SIGNALED;
300
301 if (need_inval) {
302 inv_wr.sg_list = NULL;
303 inv_wr.num_sge = 0;
304 inv_wr.opcode = IB_WR_SEND_WITH_INV;
305 inv_wr.send_flags = 0;
306 inv_wr.ex.invalidate_rkey = rkey;
307 }
308
309 imm_wr.next = NULL;
310 if (always_invalidate) {
311 struct rtrs_msg_rkey_rsp *msg;
312
313 srv_mr = &sess->mrs[id->msg_id];
314 rwr.wr.opcode = IB_WR_REG_MR;
315 rwr.wr.num_sge = 0;
316 rwr.mr = srv_mr->mr;
317 rwr.wr.send_flags = 0;
318 rwr.key = srv_mr->mr->rkey;
319 rwr.access = (IB_ACCESS_LOCAL_WRITE |
320 IB_ACCESS_REMOTE_WRITE);
321 msg = srv_mr->iu->buf;
322 msg->buf_id = cpu_to_le16(id->msg_id);
323 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
324 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
325
326 list.addr = srv_mr->iu->dma_addr;
327 list.length = sizeof(*msg);
328 list.lkey = sess->s.dev->ib_pd->local_dma_lkey;
329 imm_wr.sg_list = &list;
330 imm_wr.num_sge = 1;
331 imm_wr.opcode = IB_WR_SEND_WITH_IMM;
332 ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
333 srv_mr->iu->dma_addr,
334 srv_mr->iu->size, DMA_TO_DEVICE);
335 } else {
336 imm_wr.sg_list = NULL;
337 imm_wr.num_sge = 0;
338 imm_wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
339 }
340 imm_wr.send_flags = flags;
341 imm_wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
342 0, need_inval));
343
344 imm_wr.wr_cqe = &io_comp_cqe;
345 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr,
346 offset, DMA_BIDIRECTIONAL);
347
348 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
349 if (unlikely(err))
350 rtrs_err(s,
351 "Posting RDMA-Write-Request to QP failed, err: %d\n",
352 err);
353
354 return err;
355}
356
357/**
358 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
359 * requests or on successful WRITE request.
360 * @con: the connection to send back result
361 * @id: the id associated with the IO
362 * @errno: the error number of the IO.
363 *
364 * Return 0 on success, errno otherwise.
365 */
366static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
367 int errno)
368{
369 struct rtrs_sess *s = con->c.sess;
370 struct rtrs_srv_sess *sess = to_srv_sess(s);
371 struct ib_send_wr inv_wr, imm_wr, *wr = NULL;
372 struct ib_reg_wr rwr;
373 struct rtrs_srv *srv = sess->srv;
374 struct rtrs_srv_mr *srv_mr;
375 bool need_inval = false;
376 enum ib_send_flags flags;
377 u32 imm;
378 int err;
379
380 if (id->dir == READ) {
381 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
382 size_t sg_cnt;
383
384 need_inval = le16_to_cpu(rd_msg->flags) &
385 RTRS_MSG_NEED_INVAL_F;
386 sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
387
388 if (need_inval) {
389 if (likely(sg_cnt)) {
390 inv_wr.sg_list = NULL;
391 inv_wr.num_sge = 0;
392 inv_wr.opcode = IB_WR_SEND_WITH_INV;
393 inv_wr.send_flags = 0;
394 /* Only one key is actually used */
395 inv_wr.ex.invalidate_rkey =
396 le32_to_cpu(rd_msg->desc[0].key);
397 } else {
398 WARN_ON_ONCE(1);
399 need_inval = false;
400 }
401 }
402 }
403
404 if (need_inval && always_invalidate) {
405 wr = &inv_wr;
406 inv_wr.next = &rwr.wr;
407 rwr.wr.next = &imm_wr;
408 } else if (always_invalidate) {
409 wr = &rwr.wr;
410 rwr.wr.next = &imm_wr;
411 } else if (need_inval) {
412 wr = &inv_wr;
413 inv_wr.next = &imm_wr;
414 } else {
415 wr = &imm_wr;
416 }
417 /*
418 * From time to time we have to post signalled sends,
419 * or send queue will fill up and only QP reset can help.
420 */
421 flags = (atomic_inc_return(&con->wr_cnt) % srv->queue_depth) ?
422 0 : IB_SEND_SIGNALED;
423 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
424 imm_wr.next = NULL;
425 if (always_invalidate) {
426 struct ib_sge list;
427 struct rtrs_msg_rkey_rsp *msg;
428
429 srv_mr = &sess->mrs[id->msg_id];
430 rwr.wr.next = &imm_wr;
431 rwr.wr.opcode = IB_WR_REG_MR;
432 rwr.wr.num_sge = 0;
433 rwr.wr.send_flags = 0;
434 rwr.mr = srv_mr->mr;
435 rwr.key = srv_mr->mr->rkey;
436 rwr.access = (IB_ACCESS_LOCAL_WRITE |
437 IB_ACCESS_REMOTE_WRITE);
438 msg = srv_mr->iu->buf;
439 msg->buf_id = cpu_to_le16(id->msg_id);
440 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
441 msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
442
443 list.addr = srv_mr->iu->dma_addr;
444 list.length = sizeof(*msg);
445 list.lkey = sess->s.dev->ib_pd->local_dma_lkey;
446 imm_wr.sg_list = &list;
447 imm_wr.num_sge = 1;
448 imm_wr.opcode = IB_WR_SEND_WITH_IMM;
449 ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
450 srv_mr->iu->dma_addr,
451 srv_mr->iu->size, DMA_TO_DEVICE);
452 } else {
453 imm_wr.sg_list = NULL;
454 imm_wr.num_sge = 0;
455 imm_wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
456 }
457 imm_wr.send_flags = flags;
458 imm_wr.wr_cqe = &io_comp_cqe;
459
460 imm_wr.ex.imm_data = cpu_to_be32(imm);
461
462 err = ib_post_send(id->con->c.qp, wr, NULL);
463 if (unlikely(err))
464 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
465 err);
466
467 return err;
468}
469
470void close_sess(struct rtrs_srv_sess *sess)
471{
472 enum rtrs_srv_state old_state;
473
474 if (rtrs_srv_change_state_get_old(sess, RTRS_SRV_CLOSING,
475 &old_state))
476 queue_work(rtrs_wq, &sess->close_work);
477 WARN_ON(sess->state != RTRS_SRV_CLOSING);
478}
479
480static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
481{
482 switch (state) {
483 case RTRS_SRV_CONNECTING:
484 return "RTRS_SRV_CONNECTING";
485 case RTRS_SRV_CONNECTED:
486 return "RTRS_SRV_CONNECTED";
487 case RTRS_SRV_CLOSING:
488 return "RTRS_SRV_CLOSING";
489 case RTRS_SRV_CLOSED:
490 return "RTRS_SRV_CLOSED";
491 default:
492 return "UNKNOWN";
493 }
494}
495
496/**
497 * rtrs_srv_resp_rdma() - Finish an RDMA request
498 *
499 * @id: Internal RTRS operation identifier
500 * @status: Response Code sent to the other side for this operation.
501 * 0 = success, <=0 error
502 * Context: any
503 *
504 * Finish a RDMA operation. A message is sent to the client and the
505 * corresponding memory areas will be released.
506 */
507bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
508{
509 struct rtrs_srv_sess *sess;
510 struct rtrs_srv_con *con;
511 struct rtrs_sess *s;
512 int err;
513
514 if (WARN_ON(!id))
515 return true;
516
517 con = id->con;
518 s = con->c.sess;
519 sess = to_srv_sess(s);
520
521 id->status = status;
522
523 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
524 rtrs_err_rl(s,
525 "Sending I/O response failed, session is disconnected, sess state %s\n",
526 rtrs_srv_state_str(sess->state));
527 goto out;
528 }
529 if (always_invalidate) {
530 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id];
531
532 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
533 }
534 if (unlikely(atomic_sub_return(1,
535 &con->sq_wr_avail) < 0)) {
536 pr_err("IB send queue full\n");
537 atomic_add(1, &con->sq_wr_avail);
538 spin_lock(&con->rsp_wr_wait_lock);
539 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
540 spin_unlock(&con->rsp_wr_wait_lock);
541 return false;
542 }
543
544 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
545 err = send_io_resp_imm(con, id, status);
546 else
547 err = rdma_write_sg(id);
548
549 if (unlikely(err)) {
550 rtrs_err_rl(s, "IO response failed: %d\n", err);
551 close_sess(sess);
552 }
553out:
554 rtrs_srv_put_ops_ids(sess);
555 return true;
556}
557EXPORT_SYMBOL(rtrs_srv_resp_rdma);
558
559/**
560 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
561 * @srv: Session pointer
562 * @priv: The private pointer that is associated with the session.
563 */
564void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv)
565{
566 srv->priv = priv;
567}
568EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
569
570static void unmap_cont_bufs(struct rtrs_srv_sess *sess)
571{
572 int i;
573
574 for (i = 0; i < sess->mrs_num; i++) {
575 struct rtrs_srv_mr *srv_mr;
576
577 srv_mr = &sess->mrs[i];
578 rtrs_iu_free(srv_mr->iu, DMA_TO_DEVICE,
579 sess->s.dev->ib_dev, 1);
580 ib_dereg_mr(srv_mr->mr);
581 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl,
582 srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
583 sg_free_table(&srv_mr->sgt);
584 }
585 kfree(sess->mrs);
586}
587
588static int map_cont_bufs(struct rtrs_srv_sess *sess)
589{
590 struct rtrs_srv *srv = sess->srv;
591 struct rtrs_sess *ss = &sess->s;
592 int i, mri, err, mrs_num;
593 unsigned int chunk_bits;
594 int chunks_per_mr = 1;
595
596 /*
597 * Here we map queue_depth chunks to MR. Firstly we have to
598 * figure out how many chunks can we map per MR.
599 */
600 if (always_invalidate) {
601 /*
602 * in order to do invalidate for each chunks of memory, we needs
603 * more memory regions.
604 */
605 mrs_num = srv->queue_depth;
606 } else {
607 chunks_per_mr =
608 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
609 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
610 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
611 }
612
613 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL);
614 if (!sess->mrs)
615 return -ENOMEM;
616
617 sess->mrs_num = mrs_num;
618
619 for (mri = 0; mri < mrs_num; mri++) {
620 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri];
621 struct sg_table *sgt = &srv_mr->sgt;
622 struct scatterlist *s;
623 struct ib_mr *mr;
624 int nr, chunks;
625
626 chunks = chunks_per_mr * mri;
627 if (!always_invalidate)
628 chunks_per_mr = min_t(int, chunks_per_mr,
629 srv->queue_depth - chunks);
630
631 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
632 if (err)
633 goto err;
634
635 for_each_sg(sgt->sgl, s, chunks_per_mr, i)
636 sg_set_page(s, srv->chunks[chunks + i],
637 max_chunk_size, 0);
638
639 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl,
640 sgt->nents, DMA_BIDIRECTIONAL);
641 if (nr < sgt->nents) {
642 err = nr < 0 ? nr : -EINVAL;
643 goto free_sg;
644 }
645 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
646 sgt->nents);
647 if (IS_ERR(mr)) {
648 err = PTR_ERR(mr);
649 goto unmap_sg;
650 }
651 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents,
652 NULL, max_chunk_size);
653 if (nr < 0 || nr < sgt->nents) {
654 err = nr < 0 ? nr : -EINVAL;
655 goto dereg_mr;
656 }
657
658 if (always_invalidate) {
659 srv_mr->iu = rtrs_iu_alloc(1,
660 sizeof(struct rtrs_msg_rkey_rsp),
661 GFP_KERNEL, sess->s.dev->ib_dev,
662 DMA_TO_DEVICE, rtrs_srv_rdma_done);
663 if (!srv_mr->iu) {
664 err = -ENOMEM;
665 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
666 goto free_iu;
667 }
668 }
669 /* Eventually dma addr for each chunk can be cached */
670 for_each_sg(sgt->sgl, s, sgt->orig_nents, i)
671 sess->dma_addr[chunks + i] = sg_dma_address(s);
672
673 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
674 srv_mr->mr = mr;
675
676 continue;
677err:
678 while (mri--) {
679 srv_mr = &sess->mrs[mri];
680 sgt = &srv_mr->sgt;
681 mr = srv_mr->mr;
682free_iu:
683 rtrs_iu_free(srv_mr->iu, DMA_TO_DEVICE,
684 sess->s.dev->ib_dev, 1);
685dereg_mr:
686 ib_dereg_mr(mr);
687unmap_sg:
688 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl,
689 sgt->nents, DMA_BIDIRECTIONAL);
690free_sg:
691 sg_free_table(sgt);
692 }
693 kfree(sess->mrs);
694
695 return err;
696 }
697
698 chunk_bits = ilog2(srv->queue_depth - 1) + 1;
699 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
700
701 return 0;
702}
703
704static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
705{
706 close_sess(to_srv_sess(c->sess));
707}
708
709static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess)
710{
711 rtrs_init_hb(&sess->s, &io_comp_cqe,
712 RTRS_HB_INTERVAL_MS,
713 RTRS_HB_MISSED_MAX,
714 rtrs_srv_hb_err_handler,
715 rtrs_wq);
716}
717
718static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess)
719{
720 rtrs_start_hb(&sess->s);
721}
722
723static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess)
724{
725 rtrs_stop_hb(&sess->s);
726}
727
728static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
729{
730 struct rtrs_srv_con *con = cq->cq_context;
731 struct rtrs_sess *s = con->c.sess;
732 struct rtrs_srv_sess *sess = to_srv_sess(s);
733 struct rtrs_iu *iu;
734
735 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
736 rtrs_iu_free(iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1);
737
738 if (unlikely(wc->status != IB_WC_SUCCESS)) {
739 rtrs_err(s, "Sess info response send failed: %s\n",
740 ib_wc_status_msg(wc->status));
741 close_sess(sess);
742 return;
743 }
744 WARN_ON(wc->opcode != IB_WC_SEND);
745}
746
747static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess)
748{
749 struct rtrs_srv *srv = sess->srv;
750 struct rtrs_srv_ctx *ctx = srv->ctx;
751 int up;
752
753 mutex_lock(&srv->paths_ev_mutex);
754 up = ++srv->paths_up;
755 if (up == 1)
756 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
757 mutex_unlock(&srv->paths_ev_mutex);
758
759 /* Mark session as established */
760 sess->established = true;
761}
762
763static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess)
764{
765 struct rtrs_srv *srv = sess->srv;
766 struct rtrs_srv_ctx *ctx = srv->ctx;
767
768 if (!sess->established)
769 return;
770
771 sess->established = false;
772 mutex_lock(&srv->paths_ev_mutex);
773 WARN_ON(!srv->paths_up);
774 if (--srv->paths_up == 0)
775 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
776 mutex_unlock(&srv->paths_ev_mutex);
777}
778
779static int post_recv_sess(struct rtrs_srv_sess *sess);
780
781static int process_info_req(struct rtrs_srv_con *con,
782 struct rtrs_msg_info_req *msg)
783{
784 struct rtrs_sess *s = con->c.sess;
785 struct rtrs_srv_sess *sess = to_srv_sess(s);
786 struct ib_send_wr *reg_wr = NULL;
787 struct rtrs_msg_info_rsp *rsp;
788 struct rtrs_iu *tx_iu;
789 struct ib_reg_wr *rwr;
790 int mri, err;
791 size_t tx_sz;
792
793 err = post_recv_sess(sess);
794 if (unlikely(err)) {
795 rtrs_err(s, "post_recv_sess(), err: %d\n", err);
796 return err;
797 }
798 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL);
799 if (unlikely(!rwr))
800 return -ENOMEM;
801 strlcpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname));
802
803 tx_sz = sizeof(*rsp);
804 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num;
805 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
806 DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
807 if (unlikely(!tx_iu)) {
808 err = -ENOMEM;
809 goto rwr_free;
810 }
811
812 rsp = tx_iu->buf;
813 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
814 rsp->sg_cnt = cpu_to_le16(sess->mrs_num);
815
816 for (mri = 0; mri < sess->mrs_num; mri++) {
817 struct ib_mr *mr = sess->mrs[mri].mr;
818
819 rsp->desc[mri].addr = cpu_to_le64(mr->iova);
820 rsp->desc[mri].key = cpu_to_le32(mr->rkey);
821 rsp->desc[mri].len = cpu_to_le32(mr->length);
822
823 /*
824 * Fill in reg MR request and chain them *backwards*
825 */
826 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
827 rwr[mri].wr.opcode = IB_WR_REG_MR;
828 rwr[mri].wr.wr_cqe = &local_reg_cqe;
829 rwr[mri].wr.num_sge = 0;
830 rwr[mri].wr.send_flags = mri ? 0 : IB_SEND_SIGNALED;
831 rwr[mri].mr = mr;
832 rwr[mri].key = mr->rkey;
833 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
834 IB_ACCESS_REMOTE_WRITE);
835 reg_wr = &rwr[mri].wr;
836 }
837
838 err = rtrs_srv_create_sess_files(sess);
839 if (unlikely(err))
840 goto iu_free;
841 kobject_get(&sess->kobj);
842 get_device(&sess->srv->dev);
843 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED);
844 rtrs_srv_start_hb(sess);
845
846 /*
847 * We do not account number of established connections at the current
848 * moment, we rely on the client, which should send info request when
849 * all connections are successfully established. Thus, simply notify
850 * listener with a proper event if we are the first path.
851 */
852 rtrs_srv_sess_up(sess);
853
854 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
855 tx_iu->size, DMA_TO_DEVICE);
856
857 /* Send info response */
858 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
859 if (unlikely(err)) {
860 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
861iu_free:
862 rtrs_iu_free(tx_iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1);
863 }
864rwr_free:
865 kfree(rwr);
866
867 return err;
868}
869
870static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
871{
872 struct rtrs_srv_con *con = cq->cq_context;
873 struct rtrs_sess *s = con->c.sess;
874 struct rtrs_srv_sess *sess = to_srv_sess(s);
875 struct rtrs_msg_info_req *msg;
876 struct rtrs_iu *iu;
877 int err;
878
879 WARN_ON(con->c.cid);
880
881 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
882 if (unlikely(wc->status != IB_WC_SUCCESS)) {
883 rtrs_err(s, "Sess info request receive failed: %s\n",
884 ib_wc_status_msg(wc->status));
885 goto close;
886 }
887 WARN_ON(wc->opcode != IB_WC_RECV);
888
889 if (unlikely(wc->byte_len < sizeof(*msg))) {
890 rtrs_err(s, "Sess info request is malformed: size %d\n",
891 wc->byte_len);
892 goto close;
893 }
894 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
895 iu->size, DMA_FROM_DEVICE);
896 msg = iu->buf;
897 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) {
898 rtrs_err(s, "Sess info request is malformed: type %d\n",
899 le16_to_cpu(msg->type));
900 goto close;
901 }
902 err = process_info_req(con, msg);
903 if (unlikely(err))
904 goto close;
905
906out:
907 rtrs_iu_free(iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1);
908 return;
909close:
910 close_sess(sess);
911 goto out;
912}
913
914static int post_recv_info_req(struct rtrs_srv_con *con)
915{
916 struct rtrs_sess *s = con->c.sess;
917 struct rtrs_srv_sess *sess = to_srv_sess(s);
918 struct rtrs_iu *rx_iu;
919 int err;
920
921 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
922 GFP_KERNEL, sess->s.dev->ib_dev,
923 DMA_FROM_DEVICE, rtrs_srv_info_req_done);
924 if (unlikely(!rx_iu))
925 return -ENOMEM;
926 /* Prepare for getting info response */
927 err = rtrs_iu_post_recv(&con->c, rx_iu);
928 if (unlikely(err)) {
929 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
930 rtrs_iu_free(rx_iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1);
931 return err;
932 }
933
934 return 0;
935}
936
937static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
938{
939 int i, err;
940
941 for (i = 0; i < q_size; i++) {
942 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
943 if (unlikely(err))
944 return err;
945 }
946
947 return 0;
948}
949
950static int post_recv_sess(struct rtrs_srv_sess *sess)
951{
952 struct rtrs_srv *srv = sess->srv;
953 struct rtrs_sess *s = &sess->s;
954 size_t q_size;
955 int err, cid;
956
957 for (cid = 0; cid < sess->s.con_num; cid++) {
958 if (cid == 0)
959 q_size = SERVICE_CON_QUEUE_DEPTH;
960 else
961 q_size = srv->queue_depth;
962
963 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size);
964 if (unlikely(err)) {
965 rtrs_err(s, "post_recv_io(), err: %d\n", err);
966 return err;
967 }
968 }
969
970 return 0;
971}
972
973static void process_read(struct rtrs_srv_con *con,
974 struct rtrs_msg_rdma_read *msg,
975 u32 buf_id, u32 off)
976{
977 struct rtrs_sess *s = con->c.sess;
978 struct rtrs_srv_sess *sess = to_srv_sess(s);
979 struct rtrs_srv *srv = sess->srv;
980 struct rtrs_srv_ctx *ctx = srv->ctx;
981 struct rtrs_srv_op *id;
982
983 size_t usr_len, data_len;
984 void *data;
985 int ret;
986
987 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
988 rtrs_err_rl(s,
989 "Processing read request failed, session is disconnected, sess state %s\n",
990 rtrs_srv_state_str(sess->state));
991 return;
992 }
993 if (unlikely(msg->sg_cnt != 1 && msg->sg_cnt != 0)) {
994 rtrs_err_rl(s,
995 "Processing read request failed, invalid message\n");
996 return;
997 }
998 rtrs_srv_get_ops_ids(sess);
999 rtrs_srv_update_rdma_stats(sess->stats, off, READ);
1000 id = sess->ops_ids[buf_id];
1001 id->con = con;
1002 id->dir = READ;
1003 id->msg_id = buf_id;
1004 id->rd_msg = msg;
1005 usr_len = le16_to_cpu(msg->usr_len);
1006 data_len = off - usr_len;
1007 data = page_address(srv->chunks[buf_id]);
1008 ret = ctx->ops.rdma_ev(srv, srv->priv, id, READ, data, data_len,
1009 data + data_len, usr_len);
1010
1011 if (unlikely(ret)) {
1012 rtrs_err_rl(s,
1013 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
1014 buf_id, ret);
1015 goto send_err_msg;
1016 }
1017
1018 return;
1019
1020send_err_msg:
1021 ret = send_io_resp_imm(con, id, ret);
1022 if (ret < 0) {
1023 rtrs_err_rl(s,
1024 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
1025 buf_id, ret);
1026 close_sess(sess);
1027 }
1028 rtrs_srv_put_ops_ids(sess);
1029}
1030
1031static void process_write(struct rtrs_srv_con *con,
1032 struct rtrs_msg_rdma_write *req,
1033 u32 buf_id, u32 off)
1034{
1035 struct rtrs_sess *s = con->c.sess;
1036 struct rtrs_srv_sess *sess = to_srv_sess(s);
1037 struct rtrs_srv *srv = sess->srv;
1038 struct rtrs_srv_ctx *ctx = srv->ctx;
1039 struct rtrs_srv_op *id;
1040
1041 size_t data_len, usr_len;
1042 void *data;
1043 int ret;
1044
1045 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
1046 rtrs_err_rl(s,
1047 "Processing write request failed, session is disconnected, sess state %s\n",
1048 rtrs_srv_state_str(sess->state));
1049 return;
1050 }
1051 rtrs_srv_get_ops_ids(sess);
1052 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE);
1053 id = sess->ops_ids[buf_id];
1054 id->con = con;
1055 id->dir = WRITE;
1056 id->msg_id = buf_id;
1057
1058 usr_len = le16_to_cpu(req->usr_len);
1059 data_len = off - usr_len;
1060 data = page_address(srv->chunks[buf_id]);
1061 ret = ctx->ops.rdma_ev(srv, srv->priv, id, WRITE, data, data_len,
1062 data + data_len, usr_len);
1063 if (unlikely(ret)) {
1064 rtrs_err_rl(s,
1065 "Processing write request failed, user module callback reports err: %d\n",
1066 ret);
1067 goto send_err_msg;
1068 }
1069
1070 return;
1071
1072send_err_msg:
1073 ret = send_io_resp_imm(con, id, ret);
1074 if (ret < 0) {
1075 rtrs_err_rl(s,
1076 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
1077 buf_id, ret);
1078 close_sess(sess);
1079 }
1080 rtrs_srv_put_ops_ids(sess);
1081}
1082
1083static void process_io_req(struct rtrs_srv_con *con, void *msg,
1084 u32 id, u32 off)
1085{
1086 struct rtrs_sess *s = con->c.sess;
1087 struct rtrs_srv_sess *sess = to_srv_sess(s);
1088 struct rtrs_msg_rdma_hdr *hdr;
1089 unsigned int type;
1090
1091 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id],
1092 max_chunk_size, DMA_BIDIRECTIONAL);
1093 hdr = msg;
1094 type = le16_to_cpu(hdr->type);
1095
1096 switch (type) {
1097 case RTRS_MSG_WRITE:
1098 process_write(con, msg, id, off);
1099 break;
1100 case RTRS_MSG_READ:
1101 process_read(con, msg, id, off);
1102 break;
1103 default:
1104 rtrs_err(s,
1105 "Processing I/O request failed, unknown message type received: 0x%02x\n",
1106 type);
1107 goto err;
1108 }
1109
1110 return;
1111
1112err:
1113 close_sess(sess);
1114}
1115
1116static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
1117{
1118 struct rtrs_srv_mr *mr =
1119 container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
1120 struct rtrs_srv_con *con = cq->cq_context;
1121 struct rtrs_sess *s = con->c.sess;
1122 struct rtrs_srv_sess *sess = to_srv_sess(s);
1123 struct rtrs_srv *srv = sess->srv;
1124 u32 msg_id, off;
1125 void *data;
1126
1127 if (unlikely(wc->status != IB_WC_SUCCESS)) {
1128 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
1129 ib_wc_status_msg(wc->status));
1130 close_sess(sess);
1131 }
1132 msg_id = mr->msg_id;
1133 off = mr->msg_off;
1134 data = page_address(srv->chunks[msg_id]) + off;
1135 process_io_req(con, data, msg_id, off);
1136}
1137
1138static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
1139 struct rtrs_srv_mr *mr)
1140{
1141 struct ib_send_wr wr = {
1142 .opcode = IB_WR_LOCAL_INV,
1143 .wr_cqe = &mr->inv_cqe,
1144 .send_flags = IB_SEND_SIGNALED,
1145 .ex.invalidate_rkey = mr->mr->rkey,
1146 };
1147 mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
1148
1149 return ib_post_send(con->c.qp, &wr, NULL);
1150}
1151
1152static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
1153{
1154 spin_lock(&con->rsp_wr_wait_lock);
1155 while (!list_empty(&con->rsp_wr_wait_list)) {
1156 struct rtrs_srv_op *id;
1157 int ret;
1158
1159 id = list_entry(con->rsp_wr_wait_list.next,
1160 struct rtrs_srv_op, wait_list);
1161 list_del(&id->wait_list);
1162
1163 spin_unlock(&con->rsp_wr_wait_lock);
1164 ret = rtrs_srv_resp_rdma(id, id->status);
1165 spin_lock(&con->rsp_wr_wait_lock);
1166
1167 if (!ret) {
1168 list_add(&id->wait_list, &con->rsp_wr_wait_list);
1169 break;
1170 }
1171 }
1172 spin_unlock(&con->rsp_wr_wait_lock);
1173}
1174
1175static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
1176{
1177 struct rtrs_srv_con *con = cq->cq_context;
1178 struct rtrs_sess *s = con->c.sess;
1179 struct rtrs_srv_sess *sess = to_srv_sess(s);
1180 struct rtrs_srv *srv = sess->srv;
1181 u32 imm_type, imm_payload;
1182 int err;
1183
1184 if (unlikely(wc->status != IB_WC_SUCCESS)) {
1185 if (wc->status != IB_WC_WR_FLUSH_ERR) {
1186 rtrs_err(s,
1187 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
1188 ib_wc_status_msg(wc->status), wc->wr_cqe,
1189 wc->opcode, wc->vendor_err, wc->byte_len);
1190 close_sess(sess);
1191 }
1192 return;
1193 }
1194
1195 switch (wc->opcode) {
1196 case IB_WC_RECV_RDMA_WITH_IMM:
1197 /*
1198 * post_recv() RDMA write completions of IO reqs (read/write)
1199 * and hb
1200 */
1201 if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
1202 return;
1203 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
1204 if (unlikely(err)) {
1205 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
1206 close_sess(sess);
1207 break;
1208 }
1209 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
1210 &imm_type, &imm_payload);
1211 if (likely(imm_type == RTRS_IO_REQ_IMM)) {
1212 u32 msg_id, off;
1213 void *data;
1214
1215 msg_id = imm_payload >> sess->mem_bits;
1216 off = imm_payload & ((1 << sess->mem_bits) - 1);
1217 if (unlikely(msg_id >= srv->queue_depth ||
1218 off >= max_chunk_size)) {
1219 rtrs_err(s, "Wrong msg_id %u, off %u\n",
1220 msg_id, off);
1221 close_sess(sess);
1222 return;
1223 }
1224 if (always_invalidate) {
1225 struct rtrs_srv_mr *mr = &sess->mrs[msg_id];
1226
1227 mr->msg_off = off;
1228 mr->msg_id = msg_id;
1229 err = rtrs_srv_inv_rkey(con, mr);
1230 if (unlikely(err)) {
1231 rtrs_err(s, "rtrs_post_recv(), err: %d\n",
1232 err);
1233 close_sess(sess);
1234 break;
1235 }
1236 } else {
1237 data = page_address(srv->chunks[msg_id]) + off;
1238 process_io_req(con, data, msg_id, off);
1239 }
1240 } else if (imm_type == RTRS_HB_MSG_IMM) {
1241 WARN_ON(con->c.cid);
1242 rtrs_send_hb_ack(&sess->s);
1243 } else if (imm_type == RTRS_HB_ACK_IMM) {
1244 WARN_ON(con->c.cid);
1245 sess->s.hb_missed_cnt = 0;
1246 } else {
1247 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
1248 }
1249 break;
1250 case IB_WC_RDMA_WRITE:
1251 case IB_WC_SEND:
1252 /*
1253 * post_send() RDMA write completions of IO reqs (read/write)
1254 * and hb
1255 */
1256 atomic_add(srv->queue_depth, &con->sq_wr_avail);
1257
1258 if (unlikely(!list_empty_careful(&con->rsp_wr_wait_list)))
1259 rtrs_rdma_process_wr_wait_list(con);
1260
1261 break;
1262 default:
1263 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
1264 return;
1265 }
1266}
1267
1268/**
1269 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname.
1270 * @srv: Session
1271 * @sessname: Sessname buffer
1272 * @len: Length of sessname buffer
1273 */
1274int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len)
1275{
1276 struct rtrs_srv_sess *sess;
1277 int err = -ENOTCONN;
1278
1279 mutex_lock(&srv->paths_mutex);
1280 list_for_each_entry(sess, &srv->paths_list, s.entry) {
1281 if (sess->state != RTRS_SRV_CONNECTED)
1282 continue;
1283 strlcpy(sessname, sess->s.sessname,
1284 min_t(size_t, sizeof(sess->s.sessname), len));
1285 err = 0;
1286 break;
1287 }
1288 mutex_unlock(&srv->paths_mutex);
1289
1290 return err;
1291}
1292EXPORT_SYMBOL(rtrs_srv_get_sess_name);
1293
1294/**
1295 * rtrs_srv_get_sess_qdepth() - Get rtrs_srv qdepth.
1296 * @srv: Session
1297 */
1298int rtrs_srv_get_queue_depth(struct rtrs_srv *srv)
1299{
1300 return srv->queue_depth;
1301}
1302EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
1303
1304static int find_next_bit_ring(struct rtrs_srv_sess *sess)
1305{
1306 struct ib_device *ib_dev = sess->s.dev->ib_dev;
1307 int v;
1308
1309 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask);
1310 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
1311 v = cpumask_first(&cq_affinity_mask);
1312 return v;
1313}
1314
1315static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess)
1316{
1317 sess->cur_cq_vector = find_next_bit_ring(sess);
1318
1319 return sess->cur_cq_vector;
1320}
1321
1322static void rtrs_srv_dev_release(struct device *dev)
1323{
1324 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev);
1325
1326 kfree(srv);
1327}
1328
1329static struct rtrs_srv *__alloc_srv(struct rtrs_srv_ctx *ctx,
1330 const uuid_t *paths_uuid)
1331{
1332 struct rtrs_srv *srv;
1333 int i;
1334
1335 srv = kzalloc(sizeof(*srv), GFP_KERNEL);
1336 if (!srv)
1337 return NULL;
1338
1339 refcount_set(&srv->refcount, 1);
1340 INIT_LIST_HEAD(&srv->paths_list);
1341 mutex_init(&srv->paths_mutex);
1342 mutex_init(&srv->paths_ev_mutex);
1343 uuid_copy(&srv->paths_uuid, paths_uuid);
1344 srv->queue_depth = sess_queue_depth;
1345 srv->ctx = ctx;
1346 device_initialize(&srv->dev);
1347 srv->dev.release = rtrs_srv_dev_release;
1348
1349 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
1350 GFP_KERNEL);
1351 if (!srv->chunks)
1352 goto err_free_srv;
1353
1354 for (i = 0; i < srv->queue_depth; i++) {
1355 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL);
1356 if (!srv->chunks[i])
1357 goto err_free_chunks;
1358 }
1359 list_add(&srv->ctx_list, &ctx->srv_list);
1360
1361 return srv;
1362
1363err_free_chunks:
1364 while (i--)
1365 mempool_free(srv->chunks[i], chunk_pool);
1366 kfree(srv->chunks);
1367
1368err_free_srv:
1369 kfree(srv);
1370
1371 return NULL;
1372}
1373
1374static void free_srv(struct rtrs_srv *srv)
1375{
1376 int i;
1377
1378 WARN_ON(refcount_read(&srv->refcount));
1379 for (i = 0; i < srv->queue_depth; i++)
1380 mempool_free(srv->chunks[i], chunk_pool);
1381 kfree(srv->chunks);
1382 mutex_destroy(&srv->paths_mutex);
1383 mutex_destroy(&srv->paths_ev_mutex);
1384 /* last put to release the srv structure */
1385 put_device(&srv->dev);
1386}
1387
1388static inline struct rtrs_srv *__find_srv_and_get(struct rtrs_srv_ctx *ctx,
1389 const uuid_t *paths_uuid)
1390{
1391 struct rtrs_srv *srv;
1392
1393 list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
1394 if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
1395 refcount_inc_not_zero(&srv->refcount))
1396 return srv;
1397 }
1398
1399 return NULL;
1400}
1401
1402static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx,
1403 const uuid_t *paths_uuid)
1404{
1405 struct rtrs_srv *srv;
1406
1407 mutex_lock(&ctx->srv_mutex);
1408 srv = __find_srv_and_get(ctx, paths_uuid);
1409 if (!srv)
1410 srv = __alloc_srv(ctx, paths_uuid);
1411 mutex_unlock(&ctx->srv_mutex);
1412
1413 return srv;
1414}
1415
1416static void put_srv(struct rtrs_srv *srv)
1417{
1418 if (refcount_dec_and_test(&srv->refcount)) {
1419 struct rtrs_srv_ctx *ctx = srv->ctx;
1420
1421 WARN_ON(srv->dev.kobj.state_in_sysfs);
1422
1423 mutex_lock(&ctx->srv_mutex);
1424 list_del(&srv->ctx_list);
1425 mutex_unlock(&ctx->srv_mutex);
1426 free_srv(srv);
1427 }
1428}
1429
1430static void __add_path_to_srv(struct rtrs_srv *srv,
1431 struct rtrs_srv_sess *sess)
1432{
1433 list_add_tail(&sess->s.entry, &srv->paths_list);
1434 srv->paths_num++;
1435 WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
1436}
1437
1438static void del_path_from_srv(struct rtrs_srv_sess *sess)
1439{
1440 struct rtrs_srv *srv = sess->srv;
1441
1442 if (WARN_ON(!srv))
1443 return;
1444
1445 mutex_lock(&srv->paths_mutex);
1446 list_del(&sess->s.entry);
1447 WARN_ON(!srv->paths_num);
1448 srv->paths_num--;
1449 mutex_unlock(&srv->paths_mutex);
1450}
1451
1452/* return true if addresses are the same, error other wise */
1453static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
1454{
1455 switch (a->sa_family) {
1456 case AF_IB:
1457 return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
1458 &((struct sockaddr_ib *)b)->sib_addr,
1459 sizeof(struct ib_addr)) &&
1460 (b->sa_family == AF_IB);
1461 case AF_INET:
1462 return memcmp(&((struct sockaddr_in *)a)->sin_addr,
1463 &((struct sockaddr_in *)b)->sin_addr,
1464 sizeof(struct in_addr)) &&
1465 (b->sa_family == AF_INET);
1466 case AF_INET6:
1467 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
1468 &((struct sockaddr_in6 *)b)->sin6_addr,
1469 sizeof(struct in6_addr)) &&
1470 (b->sa_family == AF_INET6);
1471 default:
1472 return -ENOENT;
1473 }
1474}
1475
1476static bool __is_path_w_addr_exists(struct rtrs_srv *srv,
1477 struct rdma_addr *addr)
1478{
1479 struct rtrs_srv_sess *sess;
1480
1481 list_for_each_entry(sess, &srv->paths_list, s.entry)
1482 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr,
1483 (struct sockaddr *)&addr->dst_addr) &&
1484 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr,
1485 (struct sockaddr *)&addr->src_addr))
1486 return true;
1487
1488 return false;
1489}
1490
1491static void free_sess(struct rtrs_srv_sess *sess)
1492{
1493 if (sess->kobj.state_in_sysfs)
1494 kobject_put(&sess->kobj);
1495 else
1496 kfree(sess);
1497}
1498
1499static void rtrs_srv_close_work(struct work_struct *work)
1500{
1501 struct rtrs_srv_sess *sess;
1502 struct rtrs_srv_con *con;
1503 int i;
1504
1505 sess = container_of(work, typeof(*sess), close_work);
1506
1507 rtrs_srv_destroy_sess_files(sess);
1508 rtrs_srv_stop_hb(sess);
1509
1510 for (i = 0; i < sess->s.con_num; i++) {
1511 if (!sess->s.con[i])
1512 continue;
1513 con = to_srv_con(sess->s.con[i]);
1514 rdma_disconnect(con->c.cm_id);
1515 ib_drain_qp(con->c.qp);
1516 }
1517 /* Wait for all inflights */
1518 rtrs_srv_wait_ops_ids(sess);
1519
1520 /* Notify upper layer if we are the last path */
1521 rtrs_srv_sess_down(sess);
1522
1523 unmap_cont_bufs(sess);
1524 rtrs_srv_free_ops_ids(sess);
1525
1526 for (i = 0; i < sess->s.con_num; i++) {
1527 if (!sess->s.con[i])
1528 continue;
1529 con = to_srv_con(sess->s.con[i]);
1530 rtrs_cq_qp_destroy(&con->c);
1531 rdma_destroy_id(con->c.cm_id);
1532 kfree(con);
1533 }
1534 rtrs_ib_dev_put(sess->s.dev);
1535
1536 del_path_from_srv(sess);
1537 put_srv(sess->srv);
1538 sess->srv = NULL;
1539 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED);
1540
1541 kfree(sess->dma_addr);
1542 kfree(sess->s.con);
1543 free_sess(sess);
1544}
1545
1546static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess,
1547 struct rdma_cm_id *cm_id)
1548{
1549 struct rtrs_srv *srv = sess->srv;
1550 struct rtrs_msg_conn_rsp msg;
1551 struct rdma_conn_param param;
1552 int err;
1553
1554 param = (struct rdma_conn_param) {
1555 .rnr_retry_count = 7,
1556 .private_data = &msg,
1557 .private_data_len = sizeof(msg),
1558 };
1559
1560 msg = (struct rtrs_msg_conn_rsp) {
1561 .magic = cpu_to_le16(RTRS_MAGIC),
1562 .version = cpu_to_le16(RTRS_PROTO_VER),
1563 .queue_depth = cpu_to_le16(srv->queue_depth),
1564 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
1565 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
1566 };
1567
1568 if (always_invalidate)
1569 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
1570
1571 err = rdma_accept(cm_id, ¶m);
1572 if (err)
1573 pr_err("rdma_accept(), err: %d\n", err);
1574
1575 return err;
1576}
1577
1578static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
1579{
1580 struct rtrs_msg_conn_rsp msg;
1581 int err;
1582
1583 msg = (struct rtrs_msg_conn_rsp) {
1584 .magic = cpu_to_le16(RTRS_MAGIC),
1585 .version = cpu_to_le16(RTRS_PROTO_VER),
1586 .errno = cpu_to_le16(errno),
1587 };
1588
1589 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
1590 if (err)
1591 pr_err("rdma_reject(), err: %d\n", err);
1592
1593 /* Bounce errno back */
1594 return errno;
1595}
1596
1597static struct rtrs_srv_sess *
1598__find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid)
1599{
1600 struct rtrs_srv_sess *sess;
1601
1602 list_for_each_entry(sess, &srv->paths_list, s.entry) {
1603 if (uuid_equal(&sess->s.uuid, sess_uuid))
1604 return sess;
1605 }
1606
1607 return NULL;
1608}
1609
1610static int create_con(struct rtrs_srv_sess *sess,
1611 struct rdma_cm_id *cm_id,
1612 unsigned int cid)
1613{
1614 struct rtrs_srv *srv = sess->srv;
1615 struct rtrs_sess *s = &sess->s;
1616 struct rtrs_srv_con *con;
1617
1618 u16 cq_size, wr_queue_size;
1619 int err, cq_vector;
1620
1621 con = kzalloc(sizeof(*con), GFP_KERNEL);
1622 if (!con) {
1623 err = -ENOMEM;
1624 goto err;
1625 }
1626
1627 spin_lock_init(&con->rsp_wr_wait_lock);
1628 INIT_LIST_HEAD(&con->rsp_wr_wait_list);
1629 con->c.cm_id = cm_id;
1630 con->c.sess = &sess->s;
1631 con->c.cid = cid;
1632 atomic_set(&con->wr_cnt, 0);
1633
1634 if (con->c.cid == 0) {
1635 /*
1636 * All receive and all send (each requiring invalidate)
1637 * + 2 for drain and heartbeat
1638 */
1639 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2;
1640 cq_size = wr_queue_size;
1641 } else {
1642 /*
1643 * If we have all receive requests posted and
1644 * all write requests posted and each read request
1645 * requires an invalidate request + drain
1646 * and qp gets into error state.
1647 */
1648 cq_size = srv->queue_depth * 3 + 1;
1649 /*
1650 * In theory we might have queue_depth * 32
1651 * outstanding requests if an unsafe global key is used
1652 * and we have queue_depth read requests each consisting
1653 * of 32 different addresses. div 3 for mlx5.
1654 */
1655 wr_queue_size = sess->s.dev->ib_dev->attrs.max_qp_wr / 3;
1656 }
1657 atomic_set(&con->sq_wr_avail, wr_queue_size);
1658 cq_vector = rtrs_srv_get_next_cq_vector(sess);
1659
1660 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */
1661 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_size,
1662 wr_queue_size, IB_POLL_WORKQUEUE);
1663 if (err) {
1664 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
1665 goto free_con;
1666 }
1667 if (con->c.cid == 0) {
1668 err = post_recv_info_req(con);
1669 if (err)
1670 goto free_cqqp;
1671 }
1672 WARN_ON(sess->s.con[cid]);
1673 sess->s.con[cid] = &con->c;
1674
1675 /*
1676 * Change context from server to current connection. The other
1677 * way is to use cm_id->qp->qp_context, which does not work on OFED.
1678 */
1679 cm_id->context = &con->c;
1680
1681 return 0;
1682
1683free_cqqp:
1684 rtrs_cq_qp_destroy(&con->c);
1685free_con:
1686 kfree(con);
1687
1688err:
1689 return err;
1690}
1691
1692static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv,
1693 struct rdma_cm_id *cm_id,
1694 unsigned int con_num,
1695 unsigned int recon_cnt,
1696 const uuid_t *uuid)
1697{
1698 struct rtrs_srv_sess *sess;
1699 int err = -ENOMEM;
1700
1701 if (srv->paths_num >= MAX_PATHS_NUM) {
1702 err = -ECONNRESET;
1703 goto err;
1704 }
1705 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
1706 err = -EEXIST;
1707 pr_err("Path with same addr exists\n");
1708 goto err;
1709 }
1710 sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1711 if (!sess)
1712 goto err;
1713
1714 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1715 if (!sess->stats)
1716 goto err_free_sess;
1717
1718 sess->stats->sess = sess;
1719
1720 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr),
1721 GFP_KERNEL);
1722 if (!sess->dma_addr)
1723 goto err_free_stats;
1724
1725 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL);
1726 if (!sess->s.con)
1727 goto err_free_dma_addr;
1728
1729 sess->state = RTRS_SRV_CONNECTING;
1730 sess->srv = srv;
1731 sess->cur_cq_vector = -1;
1732 sess->s.dst_addr = cm_id->route.addr.dst_addr;
1733 sess->s.src_addr = cm_id->route.addr.src_addr;
1734 sess->s.con_num = con_num;
1735 sess->s.recon_cnt = recon_cnt;
1736 uuid_copy(&sess->s.uuid, uuid);
1737 spin_lock_init(&sess->state_lock);
1738 INIT_WORK(&sess->close_work, rtrs_srv_close_work);
1739 rtrs_srv_init_hb(sess);
1740
1741 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
1742 if (!sess->s.dev) {
1743 err = -ENOMEM;
1744 goto err_free_con;
1745 }
1746 err = map_cont_bufs(sess);
1747 if (err)
1748 goto err_put_dev;
1749
1750 err = rtrs_srv_alloc_ops_ids(sess);
1751 if (err)
1752 goto err_unmap_bufs;
1753
1754 __add_path_to_srv(srv, sess);
1755
1756 return sess;
1757
1758err_unmap_bufs:
1759 unmap_cont_bufs(sess);
1760err_put_dev:
1761 rtrs_ib_dev_put(sess->s.dev);
1762err_free_con:
1763 kfree(sess->s.con);
1764err_free_dma_addr:
1765 kfree(sess->dma_addr);
1766err_free_stats:
1767 kfree(sess->stats);
1768err_free_sess:
1769 kfree(sess);
1770err:
1771 return ERR_PTR(err);
1772}
1773
1774static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
1775 const struct rtrs_msg_conn_req *msg,
1776 size_t len)
1777{
1778 struct rtrs_srv_ctx *ctx = cm_id->context;
1779 struct rtrs_srv_sess *sess;
1780 struct rtrs_srv *srv;
1781
1782 u16 version, con_num, cid;
1783 u16 recon_cnt;
1784 int err;
1785
1786 if (len < sizeof(*msg)) {
1787 pr_err("Invalid RTRS connection request\n");
1788 goto reject_w_econnreset;
1789 }
1790 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1791 pr_err("Invalid RTRS magic\n");
1792 goto reject_w_econnreset;
1793 }
1794 version = le16_to_cpu(msg->version);
1795 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1796 pr_err("Unsupported major RTRS version: %d, expected %d\n",
1797 version >> 8, RTRS_PROTO_VER_MAJOR);
1798 goto reject_w_econnreset;
1799 }
1800 con_num = le16_to_cpu(msg->cid_num);
1801 if (con_num > 4096) {
1802 /* Sanity check */
1803 pr_err("Too many connections requested: %d\n", con_num);
1804 goto reject_w_econnreset;
1805 }
1806 cid = le16_to_cpu(msg->cid);
1807 if (cid >= con_num) {
1808 /* Sanity check */
1809 pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
1810 goto reject_w_econnreset;
1811 }
1812 recon_cnt = le16_to_cpu(msg->recon_cnt);
1813 srv = get_or_create_srv(ctx, &msg->paths_uuid);
1814 if (!srv) {
1815 err = -ENOMEM;
1816 goto reject_w_err;
1817 }
1818 mutex_lock(&srv->paths_mutex);
1819 sess = __find_sess(srv, &msg->sess_uuid);
1820 if (sess) {
1821 struct rtrs_sess *s = &sess->s;
1822
1823 /* Session already holds a reference */
1824 put_srv(srv);
1825
1826 if (sess->state != RTRS_SRV_CONNECTING) {
1827 rtrs_err(s, "Session in wrong state: %s\n",
1828 rtrs_srv_state_str(sess->state));
1829 mutex_unlock(&srv->paths_mutex);
1830 goto reject_w_econnreset;
1831 }
1832 /*
1833 * Sanity checks
1834 */
1835 if (con_num != s->con_num || cid >= s->con_num) {
1836 rtrs_err(s, "Incorrect request: %d, %d\n",
1837 cid, con_num);
1838 mutex_unlock(&srv->paths_mutex);
1839 goto reject_w_econnreset;
1840 }
1841 if (s->con[cid]) {
1842 rtrs_err(s, "Connection already exists: %d\n",
1843 cid);
1844 mutex_unlock(&srv->paths_mutex);
1845 goto reject_w_econnreset;
1846 }
1847 } else {
1848 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt,
1849 &msg->sess_uuid);
1850 if (IS_ERR(sess)) {
1851 mutex_unlock(&srv->paths_mutex);
1852 put_srv(srv);
1853 err = PTR_ERR(sess);
1854 goto reject_w_err;
1855 }
1856 }
1857 err = create_con(sess, cm_id, cid);
1858 if (err) {
1859 (void)rtrs_rdma_do_reject(cm_id, err);
1860 /*
1861 * Since session has other connections we follow normal way
1862 * through workqueue, but still return an error to tell cma.c
1863 * to call rdma_destroy_id() for current connection.
1864 */
1865 goto close_and_return_err;
1866 }
1867 err = rtrs_rdma_do_accept(sess, cm_id);
1868 if (err) {
1869 (void)rtrs_rdma_do_reject(cm_id, err);
1870 /*
1871 * Since current connection was successfully added to the
1872 * session we follow normal way through workqueue to close the
1873 * session, thus return 0 to tell cma.c we call
1874 * rdma_destroy_id() ourselves.
1875 */
1876 err = 0;
1877 goto close_and_return_err;
1878 }
1879 mutex_unlock(&srv->paths_mutex);
1880
1881 return 0;
1882
1883reject_w_err:
1884 return rtrs_rdma_do_reject(cm_id, err);
1885
1886reject_w_econnreset:
1887 return rtrs_rdma_do_reject(cm_id, -ECONNRESET);
1888
1889close_and_return_err:
1890 close_sess(sess);
1891 mutex_unlock(&srv->paths_mutex);
1892
1893 return err;
1894}
1895
1896static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
1897 struct rdma_cm_event *ev)
1898{
1899 struct rtrs_srv_sess *sess = NULL;
1900 struct rtrs_sess *s = NULL;
1901
1902 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
1903 struct rtrs_con *c = cm_id->context;
1904
1905 s = c->sess;
1906 sess = to_srv_sess(s);
1907 }
1908
1909 switch (ev->event) {
1910 case RDMA_CM_EVENT_CONNECT_REQUEST:
1911 /*
1912 * In case of error cma.c will destroy cm_id,
1913 * see cma_process_remove()
1914 */
1915 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
1916 ev->param.conn.private_data_len);
1917 case RDMA_CM_EVENT_ESTABLISHED:
1918 /* Nothing here */
1919 break;
1920 case RDMA_CM_EVENT_REJECTED:
1921 case RDMA_CM_EVENT_CONNECT_ERROR:
1922 case RDMA_CM_EVENT_UNREACHABLE:
1923 rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
1924 rdma_event_msg(ev->event), ev->status);
1925 close_sess(sess);
1926 break;
1927 case RDMA_CM_EVENT_DISCONNECTED:
1928 case RDMA_CM_EVENT_ADDR_CHANGE:
1929 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1930 close_sess(sess);
1931 break;
1932 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1933 close_sess(sess);
1934 break;
1935 default:
1936 pr_err("Ignoring unexpected CM event %s, err %d\n",
1937 rdma_event_msg(ev->event), ev->status);
1938 break;
1939 }
1940
1941 return 0;
1942}
1943
1944static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
1945 struct sockaddr *addr,
1946 enum rdma_ucm_port_space ps)
1947{
1948 struct rdma_cm_id *cm_id;
1949 int ret;
1950
1951 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
1952 ctx, ps, IB_QPT_RC);
1953 if (IS_ERR(cm_id)) {
1954 ret = PTR_ERR(cm_id);
1955 pr_err("Creating id for RDMA connection failed, err: %d\n",
1956 ret);
1957 goto err_out;
1958 }
1959 ret = rdma_bind_addr(cm_id, addr);
1960 if (ret) {
1961 pr_err("Binding RDMA address failed, err: %d\n", ret);
1962 goto err_cm;
1963 }
1964 ret = rdma_listen(cm_id, 64);
1965 if (ret) {
1966 pr_err("Listening on RDMA connection failed, err: %d\n",
1967 ret);
1968 goto err_cm;
1969 }
1970
1971 return cm_id;
1972
1973err_cm:
1974 rdma_destroy_id(cm_id);
1975err_out:
1976
1977 return ERR_PTR(ret);
1978}
1979
1980static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
1981{
1982 struct sockaddr_in6 sin = {
1983 .sin6_family = AF_INET6,
1984 .sin6_addr = IN6ADDR_ANY_INIT,
1985 .sin6_port = htons(port),
1986 };
1987 struct sockaddr_ib sib = {
1988 .sib_family = AF_IB,
1989 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port),
1990 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL),
1991 .sib_pkey = cpu_to_be16(0xffff),
1992 };
1993 struct rdma_cm_id *cm_ip, *cm_ib;
1994 int ret;
1995
1996 /*
1997 * We accept both IPoIB and IB connections, so we need to keep
1998 * two cm id's, one for each socket type and port space.
1999 * If the cm initialization of one of the id's fails, we abort
2000 * everything.
2001 */
2002 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
2003 if (IS_ERR(cm_ip))
2004 return PTR_ERR(cm_ip);
2005
2006 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
2007 if (IS_ERR(cm_ib)) {
2008 ret = PTR_ERR(cm_ib);
2009 goto free_cm_ip;
2010 }
2011
2012 ctx->cm_id_ip = cm_ip;
2013 ctx->cm_id_ib = cm_ib;
2014
2015 return 0;
2016
2017free_cm_ip:
2018 rdma_destroy_id(cm_ip);
2019
2020 return ret;
2021}
2022
2023static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
2024{
2025 struct rtrs_srv_ctx *ctx;
2026
2027 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
2028 if (!ctx)
2029 return NULL;
2030
2031 ctx->ops = *ops;
2032 mutex_init(&ctx->srv_mutex);
2033 INIT_LIST_HEAD(&ctx->srv_list);
2034
2035 return ctx;
2036}
2037
2038static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
2039{
2040 WARN_ON(!list_empty(&ctx->srv_list));
2041 mutex_destroy(&ctx->srv_mutex);
2042 kfree(ctx);
2043}
2044
2045/**
2046 * rtrs_srv_open() - open RTRS server context
2047 * @ops: callback functions
2048 * @port: port to listen on
2049 *
2050 * Creates server context with specified callbacks.
2051 *
2052 * Return a valid pointer on success otherwise PTR_ERR.
2053 */
2054struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
2055{
2056 struct rtrs_srv_ctx *ctx;
2057 int err;
2058
2059 ctx = alloc_srv_ctx(ops);
2060 if (!ctx)
2061 return ERR_PTR(-ENOMEM);
2062
2063 err = rtrs_srv_rdma_init(ctx, port);
2064 if (err) {
2065 free_srv_ctx(ctx);
2066 return ERR_PTR(err);
2067 }
2068
2069 return ctx;
2070}
2071EXPORT_SYMBOL(rtrs_srv_open);
2072
2073static void close_sessions(struct rtrs_srv *srv)
2074{
2075 struct rtrs_srv_sess *sess;
2076
2077 mutex_lock(&srv->paths_mutex);
2078 list_for_each_entry(sess, &srv->paths_list, s.entry)
2079 close_sess(sess);
2080 mutex_unlock(&srv->paths_mutex);
2081}
2082
2083static void close_ctx(struct rtrs_srv_ctx *ctx)
2084{
2085 struct rtrs_srv *srv;
2086
2087 mutex_lock(&ctx->srv_mutex);
2088 list_for_each_entry(srv, &ctx->srv_list, ctx_list)
2089 close_sessions(srv);
2090 mutex_unlock(&ctx->srv_mutex);
2091 flush_workqueue(rtrs_wq);
2092}
2093
2094/**
2095 * rtrs_srv_close() - close RTRS server context
2096 * @ctx: pointer to server context
2097 *
2098 * Closes RTRS server context with all client sessions.
2099 */
2100void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
2101{
2102 rdma_destroy_id(ctx->cm_id_ip);
2103 rdma_destroy_id(ctx->cm_id_ib);
2104 close_ctx(ctx);
2105 free_srv_ctx(ctx);
2106}
2107EXPORT_SYMBOL(rtrs_srv_close);
2108
2109static int check_module_params(void)
2110{
2111 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
2112 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
2113 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
2114 return -EINVAL;
2115 }
2116 if (max_chunk_size < 4096 || !is_power_of_2(max_chunk_size)) {
2117 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
2118 max_chunk_size, 4096);
2119 return -EINVAL;
2120 }
2121
2122 /*
2123 * Check if IB immediate data size is enough to hold the mem_id and the
2124 * offset inside the memory chunk
2125 */
2126 if ((ilog2(sess_queue_depth - 1) + 1) +
2127 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
2128 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
2129 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
2130 return -EINVAL;
2131 }
2132
2133 return 0;
2134}
2135
2136static int __init rtrs_server_init(void)
2137{
2138 int err;
2139
2140 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
2141 KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
2142 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
2143 sess_queue_depth, always_invalidate);
2144
2145 rtrs_rdma_dev_pd_init(0, &dev_pd);
2146
2147 err = check_module_params();
2148 if (err) {
2149 pr_err("Failed to load module, invalid module parameters, err: %d\n",
2150 err);
2151 return err;
2152 }
2153 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ,
2154 get_order(max_chunk_size));
2155 if (!chunk_pool)
2156 return -ENOMEM;
2157 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
2158 if (IS_ERR(rtrs_dev_class)) {
2159 err = PTR_ERR(rtrs_dev_class);
2160 goto out_chunk_pool;
2161 }
2162 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
2163 if (!rtrs_wq) {
2164 err = -ENOMEM;
2165 goto out_dev_class;
2166 }
2167
2168 return 0;
2169
2170out_dev_class:
2171 class_destroy(rtrs_dev_class);
2172out_chunk_pool:
2173 mempool_destroy(chunk_pool);
2174
2175 return err;
2176}
2177
2178static void __exit rtrs_server_exit(void)
2179{
2180 destroy_workqueue(rtrs_wq);
2181 class_destroy(rtrs_dev_class);
2182 mempool_destroy(chunk_pool);
2183 rtrs_rdma_dev_pd_deinit(&dev_pd);
2184}
2185
2186module_init(rtrs_server_init);
2187module_exit(rtrs_server_exit);