Linux Audio

Check our new training course

Loading...
Note: File does not exist in v3.5.6.
  1// SPDX-License-Identifier: GPL-2.0
  2/*
  3 * Shared Memory Communications over RDMA (SMC-R) and RoCE
  4 *
  5 * Manage RMBE
  6 * copy new RMBE data into user space
  7 *
  8 * Copyright IBM Corp. 2016
  9 *
 10 * Author(s):  Ursula Braun <ubraun@linux.vnet.ibm.com>
 11 */
 12
 13#include <linux/net.h>
 14#include <linux/rcupdate.h>
 15#include <linux/sched/signal.h>
 16
 17#include <net/sock.h>
 18
 19#include "smc.h"
 20#include "smc_core.h"
 21#include "smc_cdc.h"
 22#include "smc_tx.h" /* smc_tx_consumer_update() */
 23#include "smc_rx.h"
 24
 25/* callback implementation to wakeup consumers blocked with smc_rx_wait().
 26 * indirectly called by smc_cdc_msg_recv_action().
 27 */
 28static void smc_rx_wake_up(struct sock *sk)
 29{
 30	struct socket_wq *wq;
 31
 32	/* derived from sock_def_readable() */
 33	/* called already in smc_listen_work() */
 34	rcu_read_lock();
 35	wq = rcu_dereference(sk->sk_wq);
 36	if (skwq_has_sleeper(wq))
 37		wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
 38						EPOLLRDNORM | EPOLLRDBAND);
 39	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
 40	if ((sk->sk_shutdown == SHUTDOWN_MASK) ||
 41	    (sk->sk_state == SMC_CLOSED))
 42		sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_HUP);
 43	rcu_read_unlock();
 44}
 45
 46/* Update consumer cursor
 47 *   @conn   connection to update
 48 *   @cons   consumer cursor
 49 *   @len    number of Bytes consumed
 50 *   Returns:
 51 *   1 if we should end our receive, 0 otherwise
 52 */
 53static int smc_rx_update_consumer(struct smc_sock *smc,
 54				  union smc_host_cursor cons, size_t len)
 55{
 56	struct smc_connection *conn = &smc->conn;
 57	struct sock *sk = &smc->sk;
 58	bool force = false;
 59	int diff, rc = 0;
 60
 61	smc_curs_add(conn->rmb_desc->len, &cons, len);
 62
 63	/* did we process urgent data? */
 64	if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) {
 65		diff = smc_curs_comp(conn->rmb_desc->len, &cons,
 66				     &conn->urg_curs);
 67		if (sock_flag(sk, SOCK_URGINLINE)) {
 68			if (diff == 0) {
 69				force = true;
 70				rc = 1;
 71				conn->urg_state = SMC_URG_READ;
 72			}
 73		} else {
 74			if (diff == 1) {
 75				/* skip urgent byte */
 76				force = true;
 77				smc_curs_add(conn->rmb_desc->len, &cons, 1);
 78				conn->urg_rx_skip_pend = false;
 79			} else if (diff < -1)
 80				/* we read past urgent byte */
 81				conn->urg_state = SMC_URG_READ;
 82		}
 83	}
 84
 85	smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn);
 86
 87	/* send consumer cursor update if required */
 88	/* similar to advertising new TCP rcv_wnd if required */
 89	smc_tx_consumer_update(conn, force);
 90
 91	return rc;
 92}
 93
 94static void smc_rx_update_cons(struct smc_sock *smc, size_t len)
 95{
 96	struct smc_connection *conn = &smc->conn;
 97	union smc_host_cursor cons;
 98
 99	smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
100	smc_rx_update_consumer(smc, cons, len);
101}
102
103struct smc_spd_priv {
104	struct smc_sock *smc;
105	size_t		 len;
106};
107
108static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
109				    struct pipe_buffer *buf)
110{
111	struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
112	struct smc_sock *smc = priv->smc;
113	struct smc_connection *conn;
114	struct sock *sk = &smc->sk;
115
116	if (sk->sk_state == SMC_CLOSED ||
117	    sk->sk_state == SMC_PEERFINCLOSEWAIT ||
118	    sk->sk_state == SMC_APPFINCLOSEWAIT)
119		goto out;
120	conn = &smc->conn;
121	lock_sock(sk);
122	smc_rx_update_cons(smc, priv->len);
123	release_sock(sk);
124	if (atomic_sub_and_test(priv->len, &conn->splice_pending))
125		smc_rx_wake_up(sk);
126out:
127	kfree(priv);
128	put_page(buf->page);
129	sock_put(sk);
130}
131
132static const struct pipe_buf_operations smc_pipe_ops = {
133	.release = smc_rx_pipe_buf_release,
134	.get = generic_pipe_buf_get
135};
136
137static void smc_rx_spd_release(struct splice_pipe_desc *spd,
138			       unsigned int i)
139{
140	put_page(spd->pages[i]);
141}
142
143static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
144			 struct smc_sock *smc)
145{
146	struct splice_pipe_desc spd;
147	struct partial_page partial;
148	struct smc_spd_priv *priv;
149	int bytes;
150
151	priv = kzalloc(sizeof(*priv), GFP_KERNEL);
152	if (!priv)
153		return -ENOMEM;
154	priv->len = len;
155	priv->smc = smc;
156	partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
157	partial.len = len;
158	partial.private = (unsigned long)priv;
159
160	spd.nr_pages_max = 1;
161	spd.nr_pages = 1;
162	spd.pages = &smc->conn.rmb_desc->pages;
163	spd.partial = &partial;
164	spd.ops = &smc_pipe_ops;
165	spd.spd_release = smc_rx_spd_release;
166
167	bytes = splice_to_pipe(pipe, &spd);
168	if (bytes > 0) {
169		sock_hold(&smc->sk);
170		get_page(smc->conn.rmb_desc->pages);
171		atomic_add(bytes, &smc->conn.splice_pending);
172	}
173
174	return bytes;
175}
176
177static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
178{
179	return atomic_read(&conn->bytes_to_rcv) &&
180	       !atomic_read(&conn->splice_pending);
181}
182
183/* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
184 *   @smc    smc socket
185 *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout
186 *   @fcrit  add'l criterion to evaluate as function pointer
187 * Returns:
188 * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
189 * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
190 */
191int smc_rx_wait(struct smc_sock *smc, long *timeo,
192		int (*fcrit)(struct smc_connection *conn))
193{
194	DEFINE_WAIT_FUNC(wait, woken_wake_function);
195	struct smc_connection *conn = &smc->conn;
196	struct smc_cdc_conn_state_flags *cflags =
197					&conn->local_tx_ctrl.conn_state_flags;
198	struct sock *sk = &smc->sk;
199	int rc;
200
201	if (fcrit(conn))
202		return 1;
203	sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
204	add_wait_queue(sk_sleep(sk), &wait);
205	rc = sk_wait_event(sk, timeo,
206			   sk->sk_err ||
207			   cflags->peer_conn_abort ||
208			   sk->sk_shutdown & RCV_SHUTDOWN ||
209			   conn->killed ||
210			   fcrit(conn),
211			   &wait);
212	remove_wait_queue(sk_sleep(sk), &wait);
213	sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
214	return rc;
215}
216
217static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
218			   int flags)
219{
220	struct smc_connection *conn = &smc->conn;
221	union smc_host_cursor cons;
222	struct sock *sk = &smc->sk;
223	int rc = 0;
224
225	if (sock_flag(sk, SOCK_URGINLINE) ||
226	    !(conn->urg_state == SMC_URG_VALID) ||
227	    conn->urg_state == SMC_URG_READ)
228		return -EINVAL;
229
230	if (conn->urg_state == SMC_URG_VALID) {
231		if (!(flags & MSG_PEEK))
232			smc->conn.urg_state = SMC_URG_READ;
233		msg->msg_flags |= MSG_OOB;
234		if (len > 0) {
235			if (!(flags & MSG_TRUNC))
236				rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1);
237			len = 1;
238			smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
239			if (smc_curs_diff(conn->rmb_desc->len, &cons,
240					  &conn->urg_curs) > 1)
241				conn->urg_rx_skip_pend = true;
242			/* Urgent Byte was already accounted for, but trigger
243			 * skipping the urgent byte in non-inline case
244			 */
245			if (!(flags & MSG_PEEK))
246				smc_rx_update_consumer(smc, cons, 0);
247		} else {
248			msg->msg_flags |= MSG_TRUNC;
249		}
250
251		return rc ? -EFAULT : len;
252	}
253
254	if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN)
255		return 0;
256
257	return -EAGAIN;
258}
259
260static bool smc_rx_recvmsg_data_available(struct smc_sock *smc)
261{
262	struct smc_connection *conn = &smc->conn;
263
264	if (smc_rx_data_available(conn))
265		return true;
266	else if (conn->urg_state == SMC_URG_VALID)
267		/* we received a single urgent Byte - skip */
268		smc_rx_update_cons(smc, 0);
269	return false;
270}
271
272/* smc_rx_recvmsg - receive data from RMBE
273 * @msg:	copy data to receive buffer
274 * @pipe:	copy data to pipe if set - indicates splice() call
275 *
276 * rcvbuf consumer: main API called by socket layer.
277 * Called under sk lock.
278 */
279int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
280		   struct pipe_inode_info *pipe, size_t len, int flags)
281{
282	size_t copylen, read_done = 0, read_remaining = len;
283	size_t chunk_len, chunk_off, chunk_len_sum;
284	struct smc_connection *conn = &smc->conn;
285	int (*func)(struct smc_connection *conn);
286	union smc_host_cursor cons;
287	int readable, chunk;
288	char *rcvbuf_base;
289	struct sock *sk;
290	int splbytes;
291	long timeo;
292	int target;		/* Read at least these many bytes */
293	int rc;
294
295	if (unlikely(flags & MSG_ERRQUEUE))
296		return -EINVAL; /* future work for sk.sk_family == AF_SMC */
297
298	sk = &smc->sk;
299	if (sk->sk_state == SMC_LISTEN)
300		return -ENOTCONN;
301	if (flags & MSG_OOB)
302		return smc_rx_recv_urg(smc, msg, len, flags);
303	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
304	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
305
306	/* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
307	rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr;
308
309	do { /* while (read_remaining) */
310		if (read_done >= target || (pipe && read_done))
311			break;
312
313		if (conn->killed)
314			break;
315
316		if (smc_rx_recvmsg_data_available(smc))
317			goto copy;
318
319		if (sk->sk_shutdown & RCV_SHUTDOWN) {
320			/* smc_cdc_msg_recv_action() could have run after
321			 * above smc_rx_recvmsg_data_available()
322			 */
323			if (smc_rx_recvmsg_data_available(smc))
324				goto copy;
325			break;
326		}
327
328		if (read_done) {
329			if (sk->sk_err ||
330			    sk->sk_state == SMC_CLOSED ||
331			    !timeo ||
332			    signal_pending(current))
333				break;
334		} else {
335			if (sk->sk_err) {
336				read_done = sock_error(sk);
337				break;
338			}
339			if (sk->sk_state == SMC_CLOSED) {
340				if (!sock_flag(sk, SOCK_DONE)) {
341					/* This occurs when user tries to read
342					 * from never connected socket.
343					 */
344					read_done = -ENOTCONN;
345					break;
346				}
347				break;
348			}
349			if (signal_pending(current)) {
350				read_done = sock_intr_errno(timeo);
351				break;
352			}
353			if (!timeo)
354				return -EAGAIN;
355		}
356
357		if (!smc_rx_data_available(conn)) {
358			smc_rx_wait(smc, &timeo, smc_rx_data_available);
359			continue;
360		}
361
362copy:
363		/* initialize variables for 1st iteration of subsequent loop */
364		/* could be just 1 byte, even after waiting on data above */
365		readable = atomic_read(&conn->bytes_to_rcv);
366		splbytes = atomic_read(&conn->splice_pending);
367		if (!readable || (msg && splbytes)) {
368			if (splbytes)
369				func = smc_rx_data_available_and_no_splice_pend;
370			else
371				func = smc_rx_data_available;
372			smc_rx_wait(smc, &timeo, func);
373			continue;
374		}
375
376		smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
377		/* subsequent splice() calls pick up where previous left */
378		if (splbytes)
379			smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
380		if (conn->urg_state == SMC_URG_VALID &&
381		    sock_flag(&smc->sk, SOCK_URGINLINE) &&
382		    readable > 1)
383			readable--;	/* always stop at urgent Byte */
384		/* not more than what user space asked for */
385		copylen = min_t(size_t, read_remaining, readable);
386		/* determine chunks where to read from rcvbuf */
387		/* either unwrapped case, or 1st chunk of wrapped case */
388		chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
389				  cons.count);
390		chunk_len_sum = chunk_len;
391		chunk_off = cons.count;
392		smc_rmb_sync_sg_for_cpu(conn);
393		for (chunk = 0; chunk < 2; chunk++) {
394			if (!(flags & MSG_TRUNC)) {
395				if (msg) {
396					rc = memcpy_to_msg(msg, rcvbuf_base +
397							   chunk_off,
398							   chunk_len);
399				} else {
400					rc = smc_rx_splice(pipe, rcvbuf_base +
401							chunk_off, chunk_len,
402							smc);
403				}
404				if (rc < 0) {
405					if (!read_done)
406						read_done = -EFAULT;
407					smc_rmb_sync_sg_for_device(conn);
408					goto out;
409				}
410			}
411			read_remaining -= chunk_len;
412			read_done += chunk_len;
413
414			if (chunk_len_sum == copylen)
415				break; /* either on 1st or 2nd iteration */
416			/* prepare next (== 2nd) iteration */
417			chunk_len = copylen - chunk_len; /* remainder */
418			chunk_len_sum += chunk_len;
419			chunk_off = 0; /* modulo offset in recv ring buffer */
420		}
421		smc_rmb_sync_sg_for_device(conn);
422
423		/* update cursors */
424		if (!(flags & MSG_PEEK)) {
425			/* increased in recv tasklet smc_cdc_msg_rcv() */
426			smp_mb__before_atomic();
427			atomic_sub(copylen, &conn->bytes_to_rcv);
428			/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
429			smp_mb__after_atomic();
430			if (msg && smc_rx_update_consumer(smc, cons, copylen))
431				goto out;
432		}
433	} while (read_remaining);
434out:
435	return read_done;
436}
437
438/* Initialize receive properties on connection establishment. NB: not __init! */
439void smc_rx_init(struct smc_sock *smc)
440{
441	smc->sk.sk_data_ready = smc_rx_wake_up;
442	atomic_set(&smc->conn.splice_pending, 0);
443	smc->conn.urg_state = SMC_URG_READ;
444}