Linux Audio

Check our new training course

Linux debugging, profiling, tracing and performance analysis training

Apr 14-17, 2025
Register
Loading...
v6.8
  1// SPDX-License-Identifier: GPL-2.0-or-later
  2/* In-kernel rxperf server for testing purposes.
  3 *
  4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
  5 * Written by David Howells (dhowells@redhat.com)
  6 */
  7
  8#define pr_fmt(fmt) "rxperf: " fmt
  9#include <linux/module.h>
 10#include <linux/slab.h>
 11#include <net/sock.h>
 12#include <net/af_rxrpc.h>
 13#define RXRPC_TRACE_ONLY_DEFINE_ENUMS
 14#include <trace/events/rxrpc.h>
 15
 16MODULE_DESCRIPTION("rxperf test server (afs)");
 17MODULE_AUTHOR("Red Hat, Inc.");
 18MODULE_LICENSE("GPL");
 19
 20#define RXPERF_PORT		7009
 21#define RX_PERF_SERVICE		147
 22#define RX_PERF_VERSION		3
 23#define RX_PERF_SEND		0
 24#define RX_PERF_RECV		1
 25#define RX_PERF_RPC		3
 26#define RX_PERF_FILE		4
 27#define RX_PERF_MAGIC_COOKIE	0x4711
 28
 29struct rxperf_proto_params {
 30	__be32		version;
 31	__be32		type;
 32	__be32		rsize;
 33	__be32		wsize;
 34} __packed;
 35
 36static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 };
 37static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 };
 38
 39enum rxperf_call_state {
 40	RXPERF_CALL_SV_AWAIT_PARAMS,	/* Server: Awaiting parameter block */
 41	RXPERF_CALL_SV_AWAIT_REQUEST,	/* Server: Awaiting request data */
 42	RXPERF_CALL_SV_REPLYING,	/* Server: Replying */
 43	RXPERF_CALL_SV_AWAIT_ACK,	/* Server: Awaiting final ACK */
 44	RXPERF_CALL_COMPLETE,		/* Completed or failed */
 45};
 46
 47struct rxperf_call {
 48	struct rxrpc_call	*rxcall;
 49	struct iov_iter		iter;
 50	struct kvec		kvec[1];
 51	struct work_struct	work;
 52	const char		*type;
 53	size_t			iov_len;
 54	size_t			req_len;	/* Size of request blob */
 55	size_t			reply_len;	/* Size of reply blob */
 56	unsigned int		debug_id;
 57	unsigned int		operation_id;
 58	struct rxperf_proto_params params;
 59	__be32			tmp[2];
 60	s32			abort_code;
 61	enum rxperf_call_state	state;
 62	short			error;
 63	unsigned short		unmarshal;
 64	u16			service_id;
 65	int (*deliver)(struct rxperf_call *call);
 66	void (*processor)(struct work_struct *work);
 67};
 68
 69static struct socket *rxperf_socket;
 70static struct key *rxperf_sec_keyring;	/* Ring of security/crypto keys */
 71static struct workqueue_struct *rxperf_workqueue;
 72
 73static void rxperf_deliver_to_call(struct work_struct *work);
 74static int rxperf_deliver_param_block(struct rxperf_call *call);
 75static int rxperf_deliver_request(struct rxperf_call *call);
 76static int rxperf_process_call(struct rxperf_call *call);
 77static void rxperf_charge_preallocation(struct work_struct *work);
 78
 79static DECLARE_WORK(rxperf_charge_preallocation_work,
 80		    rxperf_charge_preallocation);
 81
 82static inline void rxperf_set_call_state(struct rxperf_call *call,
 83					 enum rxperf_call_state to)
 84{
 85	call->state = to;
 86}
 87
 88static inline void rxperf_set_call_complete(struct rxperf_call *call,
 89					    int error, s32 remote_abort)
 90{
 91	if (call->state != RXPERF_CALL_COMPLETE) {
 92		call->abort_code = remote_abort;
 93		call->error = error;
 94		call->state = RXPERF_CALL_COMPLETE;
 95	}
 96}
 97
 98static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall,
 99				       unsigned long user_call_ID)
100{
101	kfree((struct rxperf_call *)user_call_ID);
102}
103
104static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
105			       unsigned long user_call_ID)
106{
107	queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work);
108}
109
110static void rxperf_queue_call_work(struct rxperf_call *call)
111{
112	queue_work(rxperf_workqueue, &call->work);
113}
114
115static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
116			     unsigned long call_user_ID)
117{
118	struct rxperf_call *call = (struct rxperf_call *)call_user_ID;
119
120	if (call->state != RXPERF_CALL_COMPLETE)
121		rxperf_queue_call_work(call);
122}
123
124static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
125{
126	struct rxperf_call *call = (struct rxperf_call *)user_call_ID;
127
128	call->rxcall = rxcall;
129}
130
131static void rxperf_notify_end_reply_tx(struct sock *sock,
132				       struct rxrpc_call *rxcall,
133				       unsigned long call_user_ID)
134{
135	rxperf_set_call_state((struct rxperf_call *)call_user_ID,
136			      RXPERF_CALL_SV_AWAIT_ACK);
137}
138
139/*
140 * Charge the incoming call preallocation.
141 */
142static void rxperf_charge_preallocation(struct work_struct *work)
143{
144	struct rxperf_call *call;
145
146	for (;;) {
147		call = kzalloc(sizeof(*call), GFP_KERNEL);
148		if (!call)
149			break;
150
151		call->type		= "unset";
152		call->debug_id		= atomic_inc_return(&rxrpc_debug_id);
153		call->deliver		= rxperf_deliver_param_block;
154		call->state		= RXPERF_CALL_SV_AWAIT_PARAMS;
155		call->service_id	= RX_PERF_SERVICE;
156		call->iov_len		= sizeof(call->params);
157		call->kvec[0].iov_len	= sizeof(call->params);
158		call->kvec[0].iov_base	= &call->params;
159		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
160		INIT_WORK(&call->work, rxperf_deliver_to_call);
161
162		if (rxrpc_kernel_charge_accept(rxperf_socket,
163					       rxperf_notify_rx,
164					       rxperf_rx_attach,
165					       (unsigned long)call,
166					       GFP_KERNEL,
167					       call->debug_id) < 0)
168			break;
169		call = NULL;
170	}
171
172	kfree(call);
173}
174
175/*
176 * Open an rxrpc socket and bind it to be a server for callback notifications
177 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
178 */
179static int rxperf_open_socket(void)
180{
181	struct sockaddr_rxrpc srx;
182	struct socket *socket;
183	int ret;
184
185	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6,
186			       &socket);
187	if (ret < 0)
188		goto error_1;
189
190	socket->sk->sk_allocation = GFP_NOFS;
191
192	/* bind the callback manager's address to make this a server socket */
193	memset(&srx, 0, sizeof(srx));
194	srx.srx_family			= AF_RXRPC;
195	srx.srx_service			= RX_PERF_SERVICE;
196	srx.transport_type		= SOCK_DGRAM;
197	srx.transport_len		= sizeof(srx.transport.sin6);
198	srx.transport.sin6.sin6_family	= AF_INET6;
199	srx.transport.sin6.sin6_port	= htons(RXPERF_PORT);
200
201	ret = rxrpc_sock_set_min_security_level(socket->sk,
202						RXRPC_SECURITY_ENCRYPT);
203	if (ret < 0)
204		goto error_2;
205
206	ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring);
207
208	ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx));
209	if (ret < 0)
210		goto error_2;
211
212	rxrpc_kernel_new_call_notification(socket, rxperf_rx_new_call,
213					   rxperf_rx_discard_new_call);
214
215	ret = kernel_listen(socket, INT_MAX);
216	if (ret < 0)
217		goto error_2;
218
219	rxperf_socket = socket;
220	rxperf_charge_preallocation(&rxperf_charge_preallocation_work);
221	return 0;
222
223error_2:
224	sock_release(socket);
225error_1:
226	pr_err("Can't set up rxperf socket: %d\n", ret);
227	return ret;
228}
229
230/*
231 * close the rxrpc socket rxperf was using
232 */
233static void rxperf_close_socket(void)
234{
235	kernel_listen(rxperf_socket, 0);
236	kernel_sock_shutdown(rxperf_socket, SHUT_RDWR);
237	flush_workqueue(rxperf_workqueue);
238	sock_release(rxperf_socket);
239}
240
241/*
242 * Log remote abort codes that indicate that we have a protocol disagreement
243 * with the server.
244 */
245static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort)
246{
247	static int max = 0;
248	const char *msg;
249	int m;
250
251	switch (remote_abort) {
252	case RX_EOF:		 msg = "unexpected EOF";	break;
253	case RXGEN_CC_MARSHAL:	 msg = "client marshalling";	break;
254	case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";	break;
255	case RXGEN_SS_MARSHAL:	 msg = "server marshalling";	break;
256	case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";	break;
257	case RXGEN_DECODE:	 msg = "opcode decode";		break;
258	case RXGEN_SS_XDRFREE:	 msg = "server XDR cleanup";	break;
259	case RXGEN_CC_XDRFREE:	 msg = "client XDR cleanup";	break;
260	case -32:		 msg = "insufficient data";	break;
261	default:
262		return;
263	}
264
265	m = max;
266	if (m < 3) {
267		max = m + 1;
268		pr_info("Peer reported %s failure on %s\n", msg, call->type);
269	}
270}
271
272/*
273 * deliver messages to a call
274 */
275static void rxperf_deliver_to_call(struct work_struct *work)
276{
277	struct rxperf_call *call = container_of(work, struct rxperf_call, work);
278	enum rxperf_call_state state;
279	u32 abort_code, remote_abort = 0;
280	int ret = 0;
281
282	if (call->state == RXPERF_CALL_COMPLETE)
283		return;
284
285	while (state = call->state,
286	       state == RXPERF_CALL_SV_AWAIT_PARAMS ||
287	       state == RXPERF_CALL_SV_AWAIT_REQUEST ||
288	       state == RXPERF_CALL_SV_AWAIT_ACK
289	       ) {
290		if (state == RXPERF_CALL_SV_AWAIT_ACK) {
291			if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall))
292				goto call_complete;
293			return;
294		}
295
296		ret = call->deliver(call);
297		if (ret == 0)
298			ret = rxperf_process_call(call);
299
300		switch (ret) {
301		case 0:
302			continue;
303		case -EINPROGRESS:
304		case -EAGAIN:
305			return;
306		case -ECONNABORTED:
307			rxperf_log_error(call, call->abort_code);
308			goto call_complete;
309		case -EOPNOTSUPP:
310			abort_code = RXGEN_OPCODE;
311			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
312						abort_code, ret,
313						rxperf_abort_op_not_supported);
314			goto call_complete;
315		case -ENOTSUPP:
316			abort_code = RX_USER_ABORT;
317			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
318						abort_code, ret,
319						rxperf_abort_op_not_supported);
320			goto call_complete;
321		case -EIO:
322			pr_err("Call %u in bad state %u\n",
323			       call->debug_id, call->state);
324			fallthrough;
325		case -ENODATA:
326		case -EBADMSG:
327		case -EMSGSIZE:
328		case -ENOMEM:
329		case -EFAULT:
330			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
331						RXGEN_SS_UNMARSHAL, ret,
332						rxperf_abort_unmarshal_error);
333			goto call_complete;
334		default:
335			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
336						RX_CALL_DEAD, ret,
337						rxperf_abort_general_error);
338			goto call_complete;
339		}
340	}
341
342call_complete:
343	rxperf_set_call_complete(call, ret, remote_abort);
344	/* The call may have been requeued */
345	rxrpc_kernel_shutdown_call(rxperf_socket, call->rxcall);
346	rxrpc_kernel_put_call(rxperf_socket, call->rxcall);
347	cancel_work(&call->work);
348	kfree(call);
349}
350
351/*
352 * Extract a piece of data from the received data socket buffers.
353 */
354static int rxperf_extract_data(struct rxperf_call *call, bool want_more)
355{
356	u32 remote_abort = 0;
357	int ret;
358
359	ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter,
360				     &call->iov_len, want_more, &remote_abort,
361				     &call->service_id);
362	pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n",
363		 iov_iter_count(&call->iter), call->iov_len, want_more, ret);
364	if (ret == 0 || ret == -EAGAIN)
365		return ret;
366
367	if (ret == 1) {
368		switch (call->state) {
369		case RXPERF_CALL_SV_AWAIT_REQUEST:
370			rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING);
371			break;
372		case RXPERF_CALL_COMPLETE:
373			pr_debug("premature completion %d", call->error);
374			return call->error;
375		default:
376			break;
377		}
378		return 0;
379	}
380
381	rxperf_set_call_complete(call, ret, remote_abort);
382	return ret;
383}
384
385/*
386 * Grab the operation ID from an incoming manager call.
387 */
388static int rxperf_deliver_param_block(struct rxperf_call *call)
389{
390	u32 version;
391	int ret;
392
393	/* Extract the parameter block */
394	ret = rxperf_extract_data(call, true);
395	if (ret < 0)
396		return ret;
397
398	version			= ntohl(call->params.version);
399	call->operation_id	= ntohl(call->params.type);
400	call->deliver		= rxperf_deliver_request;
401
402	if (version != RX_PERF_VERSION) {
403		pr_info("Version mismatch %x\n", version);
404		return -ENOTSUPP;
405	}
406
407	switch (call->operation_id) {
408	case RX_PERF_SEND:
409		call->type = "send";
410		call->reply_len = 0;
411		call->iov_len = 4;	/* Expect req size */
412		break;
413	case RX_PERF_RECV:
414		call->type = "recv";
415		call->req_len = 0;
416		call->iov_len = 4;	/* Expect reply size */
417		break;
418	case RX_PERF_RPC:
419		call->type = "rpc";
420		call->iov_len = 8;	/* Expect req size and reply size */
421		break;
422	case RX_PERF_FILE:
423		call->type = "file";
424		fallthrough;
425	default:
426		return -EOPNOTSUPP;
427	}
428
429	rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST);
430	return call->deliver(call);
431}
432
433/*
434 * Deliver the request data.
435 */
436static int rxperf_deliver_request(struct rxperf_call *call)
437{
438	int ret;
439
440	switch (call->unmarshal) {
441	case 0:
442		call->kvec[0].iov_len	= call->iov_len;
443		call->kvec[0].iov_base	= call->tmp;
444		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
445		call->unmarshal++;
446		fallthrough;
447	case 1:
448		ret = rxperf_extract_data(call, true);
449		if (ret < 0)
450			return ret;
451
452		switch (call->operation_id) {
453		case RX_PERF_SEND:
454			call->type = "send";
455			call->req_len	= ntohl(call->tmp[0]);
456			call->reply_len	= 0;
457			break;
458		case RX_PERF_RECV:
459			call->type = "recv";
460			call->req_len = 0;
461			call->reply_len	= ntohl(call->tmp[0]);
462			break;
463		case RX_PERF_RPC:
464			call->type = "rpc";
465			call->req_len	= ntohl(call->tmp[0]);
466			call->reply_len	= ntohl(call->tmp[1]);
467			break;
468		default:
469			pr_info("Can't parse extra params\n");
470			return -EIO;
471		}
472
473		pr_debug("CALL op=%s rq=%zx rp=%zx\n",
474			 call->type, call->req_len, call->reply_len);
475
476		call->iov_len = call->req_len;
477		iov_iter_discard(&call->iter, READ, call->req_len);
478		call->unmarshal++;
479		fallthrough;
480	case 2:
481		ret = rxperf_extract_data(call, false);
482		if (ret < 0)
483			return ret;
484		call->unmarshal++;
485		fallthrough;
486	default:
487		return 0;
488	}
489}
490
491/*
492 * Process a call for which we've received the request.
493 */
494static int rxperf_process_call(struct rxperf_call *call)
495{
496	struct msghdr msg = {};
497	struct bio_vec bv;
498	struct kvec iov[1];
499	ssize_t n;
500	size_t reply_len = call->reply_len, len;
501
502	rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall,
503				   reply_len + sizeof(rxperf_magic_cookie));
504
505	while (reply_len > 0) {
506		len = min_t(size_t, reply_len, PAGE_SIZE);
507		bvec_set_page(&bv, ZERO_PAGE(0), len, 0);
508		iov_iter_bvec(&msg.msg_iter, WRITE, &bv, 1, len);
 
 
509		msg.msg_flags = MSG_MORE;
510		n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg,
511					   len, rxperf_notify_end_reply_tx);
512		if (n < 0)
513			return n;
514		if (n == 0)
515			return -EIO;
516		reply_len -= n;
517	}
518
519	len = sizeof(rxperf_magic_cookie);
520	iov[0].iov_base	= (void *)rxperf_magic_cookie;
521	iov[0].iov_len	= len;
522	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
523	msg.msg_flags = 0;
524	n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len,
525				   rxperf_notify_end_reply_tx);
526	if (n >= 0)
527		return 0; /* Success */
528
529	if (n == -ENOMEM)
530		rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
531					RXGEN_SS_MARSHAL, -ENOMEM,
532					rxperf_abort_oom);
533	return n;
534}
535
536/*
537 * Add a key to the security keyring.
538 */
539static int rxperf_add_key(struct key *keyring)
540{
541	key_ref_t kref;
542	int ret;
543
544	kref = key_create_or_update(make_key_ref(keyring, true),
545				    "rxrpc_s",
546				    __stringify(RX_PERF_SERVICE) ":2",
547				    secret,
548				    sizeof(secret),
549				    KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH
550				    | KEY_USR_VIEW,
551				    KEY_ALLOC_NOT_IN_QUOTA);
552
553	if (IS_ERR(kref)) {
554		pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
555		return PTR_ERR(kref);
556	}
557
558	ret = key_link(keyring, key_ref_to_ptr(kref));
559	if (ret < 0)
560		pr_err("Can't link rxperf server key: %d\n", ret);
561	key_ref_put(kref);
562	return ret;
563}
564
565/*
566 * Initialise the rxperf server.
567 */
568static int __init rxperf_init(void)
569{
570	struct key *keyring;
571	int ret = -ENOMEM;
572
573	pr_info("Server registering\n");
574
575	rxperf_workqueue = alloc_workqueue("rxperf", 0, 0);
576	if (!rxperf_workqueue)
577		goto error_workqueue;
578
579	keyring = keyring_alloc("rxperf_server",
580				GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(),
581				KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
582				KEY_POS_WRITE |
583				KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH |
584				KEY_USR_WRITE |
585				KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH,
586				KEY_ALLOC_NOT_IN_QUOTA,
587				NULL, NULL);
588	if (IS_ERR(keyring)) {
589		pr_err("Can't allocate rxperf server keyring: %ld\n",
590		       PTR_ERR(keyring));
591		goto error_keyring;
592	}
593	rxperf_sec_keyring = keyring;
594	ret = rxperf_add_key(keyring);
595	if (ret < 0)
596		goto error_key;
597
598	ret = rxperf_open_socket();
599	if (ret < 0)
600		goto error_socket;
601	return 0;
602
603error_socket:
604error_key:
605	key_put(rxperf_sec_keyring);
606error_keyring:
607	destroy_workqueue(rxperf_workqueue);
608	rcu_barrier();
609error_workqueue:
610	pr_err("Failed to register: %d\n", ret);
611	return ret;
612}
613late_initcall(rxperf_init); /* Must be called after net/ to create socket */
614
615static void __exit rxperf_exit(void)
616{
617	pr_info("Server unregistering.\n");
618
619	rxperf_close_socket();
620	key_put(rxperf_sec_keyring);
621	destroy_workqueue(rxperf_workqueue);
622	rcu_barrier();
623}
624module_exit(rxperf_exit);
625
v6.2
  1// SPDX-License-Identifier: GPL-2.0-or-later
  2/* In-kernel rxperf server for testing purposes.
  3 *
  4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
  5 * Written by David Howells (dhowells@redhat.com)
  6 */
  7
  8#define pr_fmt(fmt) "rxperf: " fmt
  9#include <linux/module.h>
 10#include <linux/slab.h>
 11#include <net/sock.h>
 12#include <net/af_rxrpc.h>
 13#define RXRPC_TRACE_ONLY_DEFINE_ENUMS
 14#include <trace/events/rxrpc.h>
 15
 16MODULE_DESCRIPTION("rxperf test server (afs)");
 17MODULE_AUTHOR("Red Hat, Inc.");
 18MODULE_LICENSE("GPL");
 19
 20#define RXPERF_PORT		7009
 21#define RX_PERF_SERVICE		147
 22#define RX_PERF_VERSION		3
 23#define RX_PERF_SEND		0
 24#define RX_PERF_RECV		1
 25#define RX_PERF_RPC		3
 26#define RX_PERF_FILE		4
 27#define RX_PERF_MAGIC_COOKIE	0x4711
 28
 29struct rxperf_proto_params {
 30	__be32		version;
 31	__be32		type;
 32	__be32		rsize;
 33	__be32		wsize;
 34} __packed;
 35
 36static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 };
 37static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 };
 38
 39enum rxperf_call_state {
 40	RXPERF_CALL_SV_AWAIT_PARAMS,	/* Server: Awaiting parameter block */
 41	RXPERF_CALL_SV_AWAIT_REQUEST,	/* Server: Awaiting request data */
 42	RXPERF_CALL_SV_REPLYING,	/* Server: Replying */
 43	RXPERF_CALL_SV_AWAIT_ACK,	/* Server: Awaiting final ACK */
 44	RXPERF_CALL_COMPLETE,		/* Completed or failed */
 45};
 46
 47struct rxperf_call {
 48	struct rxrpc_call	*rxcall;
 49	struct iov_iter		iter;
 50	struct kvec		kvec[1];
 51	struct work_struct	work;
 52	const char		*type;
 53	size_t			iov_len;
 54	size_t			req_len;	/* Size of request blob */
 55	size_t			reply_len;	/* Size of reply blob */
 56	unsigned int		debug_id;
 57	unsigned int		operation_id;
 58	struct rxperf_proto_params params;
 59	__be32			tmp[2];
 60	s32			abort_code;
 61	enum rxperf_call_state	state;
 62	short			error;
 63	unsigned short		unmarshal;
 64	u16			service_id;
 65	int (*deliver)(struct rxperf_call *call);
 66	void (*processor)(struct work_struct *work);
 67};
 68
 69static struct socket *rxperf_socket;
 70static struct key *rxperf_sec_keyring;	/* Ring of security/crypto keys */
 71static struct workqueue_struct *rxperf_workqueue;
 72
 73static void rxperf_deliver_to_call(struct work_struct *work);
 74static int rxperf_deliver_param_block(struct rxperf_call *call);
 75static int rxperf_deliver_request(struct rxperf_call *call);
 76static int rxperf_process_call(struct rxperf_call *call);
 77static void rxperf_charge_preallocation(struct work_struct *work);
 78
 79static DECLARE_WORK(rxperf_charge_preallocation_work,
 80		    rxperf_charge_preallocation);
 81
 82static inline void rxperf_set_call_state(struct rxperf_call *call,
 83					 enum rxperf_call_state to)
 84{
 85	call->state = to;
 86}
 87
 88static inline void rxperf_set_call_complete(struct rxperf_call *call,
 89					    int error, s32 remote_abort)
 90{
 91	if (call->state != RXPERF_CALL_COMPLETE) {
 92		call->abort_code = remote_abort;
 93		call->error = error;
 94		call->state = RXPERF_CALL_COMPLETE;
 95	}
 96}
 97
 98static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall,
 99				       unsigned long user_call_ID)
100{
101	kfree((struct rxperf_call *)user_call_ID);
102}
103
104static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
105			       unsigned long user_call_ID)
106{
107	queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work);
108}
109
110static void rxperf_queue_call_work(struct rxperf_call *call)
111{
112	queue_work(rxperf_workqueue, &call->work);
113}
114
115static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
116			     unsigned long call_user_ID)
117{
118	struct rxperf_call *call = (struct rxperf_call *)call_user_ID;
119
120	if (call->state != RXPERF_CALL_COMPLETE)
121		rxperf_queue_call_work(call);
122}
123
124static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
125{
126	struct rxperf_call *call = (struct rxperf_call *)user_call_ID;
127
128	call->rxcall = rxcall;
129}
130
131static void rxperf_notify_end_reply_tx(struct sock *sock,
132				       struct rxrpc_call *rxcall,
133				       unsigned long call_user_ID)
134{
135	rxperf_set_call_state((struct rxperf_call *)call_user_ID,
136			      RXPERF_CALL_SV_AWAIT_ACK);
137}
138
139/*
140 * Charge the incoming call preallocation.
141 */
142static void rxperf_charge_preallocation(struct work_struct *work)
143{
144	struct rxperf_call *call;
145
146	for (;;) {
147		call = kzalloc(sizeof(*call), GFP_KERNEL);
148		if (!call)
149			break;
150
151		call->type		= "unset";
152		call->debug_id		= atomic_inc_return(&rxrpc_debug_id);
153		call->deliver		= rxperf_deliver_param_block;
154		call->state		= RXPERF_CALL_SV_AWAIT_PARAMS;
155		call->service_id	= RX_PERF_SERVICE;
156		call->iov_len		= sizeof(call->params);
157		call->kvec[0].iov_len	= sizeof(call->params);
158		call->kvec[0].iov_base	= &call->params;
159		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
160		INIT_WORK(&call->work, rxperf_deliver_to_call);
161
162		if (rxrpc_kernel_charge_accept(rxperf_socket,
163					       rxperf_notify_rx,
164					       rxperf_rx_attach,
165					       (unsigned long)call,
166					       GFP_KERNEL,
167					       call->debug_id) < 0)
168			break;
169		call = NULL;
170	}
171
172	kfree(call);
173}
174
175/*
176 * Open an rxrpc socket and bind it to be a server for callback notifications
177 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
178 */
179static int rxperf_open_socket(void)
180{
181	struct sockaddr_rxrpc srx;
182	struct socket *socket;
183	int ret;
184
185	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6,
186			       &socket);
187	if (ret < 0)
188		goto error_1;
189
190	socket->sk->sk_allocation = GFP_NOFS;
191
192	/* bind the callback manager's address to make this a server socket */
193	memset(&srx, 0, sizeof(srx));
194	srx.srx_family			= AF_RXRPC;
195	srx.srx_service			= RX_PERF_SERVICE;
196	srx.transport_type		= SOCK_DGRAM;
197	srx.transport_len		= sizeof(srx.transport.sin6);
198	srx.transport.sin6.sin6_family	= AF_INET6;
199	srx.transport.sin6.sin6_port	= htons(RXPERF_PORT);
200
201	ret = rxrpc_sock_set_min_security_level(socket->sk,
202						RXRPC_SECURITY_ENCRYPT);
203	if (ret < 0)
204		goto error_2;
205
206	ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring);
207
208	ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx));
209	if (ret < 0)
210		goto error_2;
211
212	rxrpc_kernel_new_call_notification(socket, rxperf_rx_new_call,
213					   rxperf_rx_discard_new_call);
214
215	ret = kernel_listen(socket, INT_MAX);
216	if (ret < 0)
217		goto error_2;
218
219	rxperf_socket = socket;
220	rxperf_charge_preallocation(&rxperf_charge_preallocation_work);
221	return 0;
222
223error_2:
224	sock_release(socket);
225error_1:
226	pr_err("Can't set up rxperf socket: %d\n", ret);
227	return ret;
228}
229
230/*
231 * close the rxrpc socket rxperf was using
232 */
233static void rxperf_close_socket(void)
234{
235	kernel_listen(rxperf_socket, 0);
236	kernel_sock_shutdown(rxperf_socket, SHUT_RDWR);
237	flush_workqueue(rxperf_workqueue);
238	sock_release(rxperf_socket);
239}
240
241/*
242 * Log remote abort codes that indicate that we have a protocol disagreement
243 * with the server.
244 */
245static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort)
246{
247	static int max = 0;
248	const char *msg;
249	int m;
250
251	switch (remote_abort) {
252	case RX_EOF:		 msg = "unexpected EOF";	break;
253	case RXGEN_CC_MARSHAL:	 msg = "client marshalling";	break;
254	case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";	break;
255	case RXGEN_SS_MARSHAL:	 msg = "server marshalling";	break;
256	case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";	break;
257	case RXGEN_DECODE:	 msg = "opcode decode";		break;
258	case RXGEN_SS_XDRFREE:	 msg = "server XDR cleanup";	break;
259	case RXGEN_CC_XDRFREE:	 msg = "client XDR cleanup";	break;
260	case -32:		 msg = "insufficient data";	break;
261	default:
262		return;
263	}
264
265	m = max;
266	if (m < 3) {
267		max = m + 1;
268		pr_info("Peer reported %s failure on %s\n", msg, call->type);
269	}
270}
271
272/*
273 * deliver messages to a call
274 */
275static void rxperf_deliver_to_call(struct work_struct *work)
276{
277	struct rxperf_call *call = container_of(work, struct rxperf_call, work);
278	enum rxperf_call_state state;
279	u32 abort_code, remote_abort = 0;
280	int ret = 0;
281
282	if (call->state == RXPERF_CALL_COMPLETE)
283		return;
284
285	while (state = call->state,
286	       state == RXPERF_CALL_SV_AWAIT_PARAMS ||
287	       state == RXPERF_CALL_SV_AWAIT_REQUEST ||
288	       state == RXPERF_CALL_SV_AWAIT_ACK
289	       ) {
290		if (state == RXPERF_CALL_SV_AWAIT_ACK) {
291			if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall))
292				goto call_complete;
293			return;
294		}
295
296		ret = call->deliver(call);
297		if (ret == 0)
298			ret = rxperf_process_call(call);
299
300		switch (ret) {
301		case 0:
302			continue;
303		case -EINPROGRESS:
304		case -EAGAIN:
305			return;
306		case -ECONNABORTED:
307			rxperf_log_error(call, call->abort_code);
308			goto call_complete;
309		case -EOPNOTSUPP:
310			abort_code = RXGEN_OPCODE;
311			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
312						abort_code, ret,
313						rxperf_abort_op_not_supported);
314			goto call_complete;
315		case -ENOTSUPP:
316			abort_code = RX_USER_ABORT;
317			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
318						abort_code, ret,
319						rxperf_abort_op_not_supported);
320			goto call_complete;
321		case -EIO:
322			pr_err("Call %u in bad state %u\n",
323			       call->debug_id, call->state);
324			fallthrough;
325		case -ENODATA:
326		case -EBADMSG:
327		case -EMSGSIZE:
328		case -ENOMEM:
329		case -EFAULT:
330			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
331						RXGEN_SS_UNMARSHAL, ret,
332						rxperf_abort_unmarshal_error);
333			goto call_complete;
334		default:
335			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
336						RX_CALL_DEAD, ret,
337						rxperf_abort_general_error);
338			goto call_complete;
339		}
340	}
341
342call_complete:
343	rxperf_set_call_complete(call, ret, remote_abort);
344	/* The call may have been requeued */
345	rxrpc_kernel_end_call(rxperf_socket, call->rxcall);
 
346	cancel_work(&call->work);
347	kfree(call);
348}
349
350/*
351 * Extract a piece of data from the received data socket buffers.
352 */
353static int rxperf_extract_data(struct rxperf_call *call, bool want_more)
354{
355	u32 remote_abort = 0;
356	int ret;
357
358	ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter,
359				     &call->iov_len, want_more, &remote_abort,
360				     &call->service_id);
361	pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n",
362		 iov_iter_count(&call->iter), call->iov_len, want_more, ret);
363	if (ret == 0 || ret == -EAGAIN)
364		return ret;
365
366	if (ret == 1) {
367		switch (call->state) {
368		case RXPERF_CALL_SV_AWAIT_REQUEST:
369			rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING);
370			break;
371		case RXPERF_CALL_COMPLETE:
372			pr_debug("premature completion %d", call->error);
373			return call->error;
374		default:
375			break;
376		}
377		return 0;
378	}
379
380	rxperf_set_call_complete(call, ret, remote_abort);
381	return ret;
382}
383
384/*
385 * Grab the operation ID from an incoming manager call.
386 */
387static int rxperf_deliver_param_block(struct rxperf_call *call)
388{
389	u32 version;
390	int ret;
391
392	/* Extract the parameter block */
393	ret = rxperf_extract_data(call, true);
394	if (ret < 0)
395		return ret;
396
397	version			= ntohl(call->params.version);
398	call->operation_id	= ntohl(call->params.type);
399	call->deliver		= rxperf_deliver_request;
400
401	if (version != RX_PERF_VERSION) {
402		pr_info("Version mismatch %x\n", version);
403		return -ENOTSUPP;
404	}
405
406	switch (call->operation_id) {
407	case RX_PERF_SEND:
408		call->type = "send";
409		call->reply_len = 0;
410		call->iov_len = 4;	/* Expect req size */
411		break;
412	case RX_PERF_RECV:
413		call->type = "recv";
414		call->req_len = 0;
415		call->iov_len = 4;	/* Expect reply size */
416		break;
417	case RX_PERF_RPC:
418		call->type = "rpc";
419		call->iov_len = 8;	/* Expect req size and reply size */
420		break;
421	case RX_PERF_FILE:
422		call->type = "file";
423		fallthrough;
424	default:
425		return -EOPNOTSUPP;
426	}
427
428	rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST);
429	return call->deliver(call);
430}
431
432/*
433 * Deliver the request data.
434 */
435static int rxperf_deliver_request(struct rxperf_call *call)
436{
437	int ret;
438
439	switch (call->unmarshal) {
440	case 0:
441		call->kvec[0].iov_len	= call->iov_len;
442		call->kvec[0].iov_base	= call->tmp;
443		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
444		call->unmarshal++;
445		fallthrough;
446	case 1:
447		ret = rxperf_extract_data(call, true);
448		if (ret < 0)
449			return ret;
450
451		switch (call->operation_id) {
452		case RX_PERF_SEND:
453			call->type = "send";
454			call->req_len	= ntohl(call->tmp[0]);
455			call->reply_len	= 0;
456			break;
457		case RX_PERF_RECV:
458			call->type = "recv";
459			call->req_len = 0;
460			call->reply_len	= ntohl(call->tmp[0]);
461			break;
462		case RX_PERF_RPC:
463			call->type = "rpc";
464			call->req_len	= ntohl(call->tmp[0]);
465			call->reply_len	= ntohl(call->tmp[1]);
466			break;
467		default:
468			pr_info("Can't parse extra params\n");
469			return -EIO;
470		}
471
472		pr_debug("CALL op=%s rq=%zx rp=%zx\n",
473			 call->type, call->req_len, call->reply_len);
474
475		call->iov_len = call->req_len;
476		iov_iter_discard(&call->iter, READ, call->req_len);
477		call->unmarshal++;
478		fallthrough;
479	case 2:
480		ret = rxperf_extract_data(call, false);
481		if (ret < 0)
482			return ret;
483		call->unmarshal++;
484		fallthrough;
485	default:
486		return 0;
487	}
488}
489
490/*
491 * Process a call for which we've received the request.
492 */
493static int rxperf_process_call(struct rxperf_call *call)
494{
495	struct msghdr msg = {};
496	struct bio_vec bv[1];
497	struct kvec iov[1];
498	ssize_t n;
499	size_t reply_len = call->reply_len, len;
500
501	rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall,
502				   reply_len + sizeof(rxperf_magic_cookie));
503
504	while (reply_len > 0) {
505		len = min_t(size_t, reply_len, PAGE_SIZE);
506		bv[0].bv_page	= ZERO_PAGE(0);
507		bv[0].bv_offset	= 0;
508		bv[0].bv_len	= len;
509		iov_iter_bvec(&msg.msg_iter, WRITE, bv, 1, len);
510		msg.msg_flags = MSG_MORE;
511		n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg,
512					   len, rxperf_notify_end_reply_tx);
513		if (n < 0)
514			return n;
515		if (n == 0)
516			return -EIO;
517		reply_len -= n;
518	}
519
520	len = sizeof(rxperf_magic_cookie);
521	iov[0].iov_base	= (void *)rxperf_magic_cookie;
522	iov[0].iov_len	= len;
523	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
524	msg.msg_flags = 0;
525	n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len,
526				   rxperf_notify_end_reply_tx);
527	if (n >= 0)
528		return 0; /* Success */
529
530	if (n == -ENOMEM)
531		rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
532					RXGEN_SS_MARSHAL, -ENOMEM,
533					rxperf_abort_oom);
534	return n;
535}
536
537/*
538 * Add a key to the security keyring.
539 */
540static int rxperf_add_key(struct key *keyring)
541{
542	key_ref_t kref;
543	int ret;
544
545	kref = key_create_or_update(make_key_ref(keyring, true),
546				    "rxrpc_s",
547				    __stringify(RX_PERF_SERVICE) ":2",
548				    secret,
549				    sizeof(secret),
550				    KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH
551				    | KEY_USR_VIEW,
552				    KEY_ALLOC_NOT_IN_QUOTA);
553
554	if (IS_ERR(kref)) {
555		pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
556		return PTR_ERR(kref);
557	}
558
559	ret = key_link(keyring, key_ref_to_ptr(kref));
560	if (ret < 0)
561		pr_err("Can't link rxperf server key: %d\n", ret);
562	key_ref_put(kref);
563	return ret;
564}
565
566/*
567 * Initialise the rxperf server.
568 */
569static int __init rxperf_init(void)
570{
571	struct key *keyring;
572	int ret = -ENOMEM;
573
574	pr_info("Server registering\n");
575
576	rxperf_workqueue = alloc_workqueue("rxperf", 0, 0);
577	if (!rxperf_workqueue)
578		goto error_workqueue;
579
580	keyring = keyring_alloc("rxperf_server",
581				GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(),
582				KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
583				KEY_POS_WRITE |
584				KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH |
585				KEY_USR_WRITE |
586				KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH,
587				KEY_ALLOC_NOT_IN_QUOTA,
588				NULL, NULL);
589	if (IS_ERR(keyring)) {
590		pr_err("Can't allocate rxperf server keyring: %ld\n",
591		       PTR_ERR(keyring));
592		goto error_keyring;
593	}
594	rxperf_sec_keyring = keyring;
595	ret = rxperf_add_key(keyring);
596	if (ret < 0)
597		goto error_key;
598
599	ret = rxperf_open_socket();
600	if (ret < 0)
601		goto error_socket;
602	return 0;
603
604error_socket:
605error_key:
606	key_put(rxperf_sec_keyring);
607error_keyring:
608	destroy_workqueue(rxperf_workqueue);
609	rcu_barrier();
610error_workqueue:
611	pr_err("Failed to register: %d\n", ret);
612	return ret;
613}
614late_initcall(rxperf_init); /* Must be called after net/ to create socket */
615
616static void __exit rxperf_exit(void)
617{
618	pr_info("Server unregistering.\n");
619
620	rxperf_close_socket();
621	key_put(rxperf_sec_keyring);
622	destroy_workqueue(rxperf_workqueue);
623	rcu_barrier();
624}
625module_exit(rxperf_exit);
626