Linux Audio

Check our new training course

Loading...
Note: File does not exist in v3.1.
  1// SPDX-License-Identifier: GPL-2.0-only
  2/*
  3 * Copyright 2018 Google Inc.
  4 * Author: Eric Dumazet (edumazet@google.com)
  5 *
  6 * Reference program demonstrating tcp mmap() usage,
  7 * and SO_RCVLOWAT hints for receiver.
  8 *
  9 * Note : NIC with header split is needed to use mmap() on TCP :
 10 * Each incoming frame must be a multiple of PAGE_SIZE bytes of TCP payload.
 11 *
 12 * How to use on loopback interface :
 13 *
 14 *  ifconfig lo mtu 61512  # 15*4096 + 40 (ipv6 header) + 32 (TCP with TS option header)
 15 *  tcp_mmap -s -z &
 16 *  tcp_mmap -H ::1 -z
 17 *
 18 *  Or leave default lo mtu, but use -M option to set TCP_MAXSEG option to (4096 + 12)
 19 *      (4096 : page size on x86, 12: TCP TS option length)
 20 *  tcp_mmap -s -z -M $((4096+12)) &
 21 *  tcp_mmap -H ::1 -z -M $((4096+12))
 22 *
 23 * Note: -z option on sender uses MSG_ZEROCOPY, which forces a copy when packets go through loopback interface.
 24 *       We might use sendfile() instead, but really this test program is about mmap(), for receivers ;)
 25 *
 26 * $ ./tcp_mmap -s &                                 # Without mmap()
 27 * $ for i in {1..4}; do ./tcp_mmap -H ::1 -z ; done
 28 * received 32768 MB (0 % mmap'ed) in 14.1157 s, 19.4732 Gbit
 29 *   cpu usage user:0.057 sys:7.815, 240.234 usec per MB, 65531 c-switches
 30 * received 32768 MB (0 % mmap'ed) in 14.6833 s, 18.7204 Gbit
 31 *  cpu usage user:0.043 sys:8.103, 248.596 usec per MB, 65524 c-switches
 32 * received 32768 MB (0 % mmap'ed) in 11.143 s, 24.6682 Gbit
 33 *   cpu usage user:0.044 sys:6.576, 202.026 usec per MB, 65519 c-switches
 34 * received 32768 MB (0 % mmap'ed) in 14.9056 s, 18.4413 Gbit
 35 *   cpu usage user:0.036 sys:8.193, 251.129 usec per MB, 65530 c-switches
 36 * $ kill %1   # kill tcp_mmap server
 37 *
 38 * $ ./tcp_mmap -s -z &                              # With mmap()
 39 * $ for i in {1..4}; do ./tcp_mmap -H ::1 -z ; done
 40 * received 32768 MB (99.9939 % mmap'ed) in 6.73792 s, 40.7956 Gbit
 41 *   cpu usage user:0.045 sys:2.827, 87.6465 usec per MB, 65532 c-switches
 42 * received 32768 MB (99.9939 % mmap'ed) in 7.26732 s, 37.8238 Gbit
 43 *   cpu usage user:0.037 sys:3.087, 95.3369 usec per MB, 65532 c-switches
 44 * received 32768 MB (99.9939 % mmap'ed) in 7.61661 s, 36.0893 Gbit
 45 *   cpu usage user:0.046 sys:3.559, 110.016 usec per MB, 65529 c-switches
 46 * received 32768 MB (99.9939 % mmap'ed) in 7.43764 s, 36.9577 Gbit
 47 *   cpu usage user:0.035 sys:3.467, 106.873 usec per MB, 65530 c-switches
 48 */
 49#define _GNU_SOURCE
 50#include <pthread.h>
 51#include <sys/types.h>
 52#include <fcntl.h>
 53#include <error.h>
 54#include <sys/socket.h>
 55#include <sys/mman.h>
 56#include <sys/resource.h>
 57#include <unistd.h>
 58#include <string.h>
 59#include <stdlib.h>
 60#include <stdio.h>
 61#include <errno.h>
 62#include <time.h>
 63#include <sys/time.h>
 64#include <netinet/in.h>
 65#include <arpa/inet.h>
 66#include <poll.h>
 67#include <linux/tcp.h>
 68#include <assert.h>
 69
 70#ifndef MSG_ZEROCOPY
 71#define MSG_ZEROCOPY    0x4000000
 72#endif
 73
 74#define FILE_SZ (1ULL << 35)
 75static int cfg_family = AF_INET6;
 76static socklen_t cfg_alen = sizeof(struct sockaddr_in6);
 77static int cfg_port = 8787;
 78
 79static int rcvbuf; /* Default: autotuning.  Can be set with -r <integer> option */
 80static int sndbuf; /* Default: autotuning.  Can be set with -w <integer> option */
 81static int zflg; /* zero copy option. (MSG_ZEROCOPY for sender, mmap() for receiver */
 82static int xflg; /* hash received data (simple xor) (-h option) */
 83static int keepflag; /* -k option: receiver shall keep all received file in memory (no munmap() calls) */
 84
 85static size_t chunk_size  = 512*1024;
 86
 87static size_t map_align;
 88
 89unsigned long htotal;
 90
 91static inline void prefetch(const void *x)
 92{
 93#if defined(__x86_64__)
 94	asm volatile("prefetcht0 %P0" : : "m" (*(const char *)x));
 95#endif
 96}
 97
 98void hash_zone(void *zone, unsigned int length)
 99{
100	unsigned long temp = htotal;
101
102	while (length >= 8*sizeof(long)) {
103		prefetch(zone + 384);
104		temp ^= *(unsigned long *)zone;
105		temp ^= *(unsigned long *)(zone + sizeof(long));
106		temp ^= *(unsigned long *)(zone + 2*sizeof(long));
107		temp ^= *(unsigned long *)(zone + 3*sizeof(long));
108		temp ^= *(unsigned long *)(zone + 4*sizeof(long));
109		temp ^= *(unsigned long *)(zone + 5*sizeof(long));
110		temp ^= *(unsigned long *)(zone + 6*sizeof(long));
111		temp ^= *(unsigned long *)(zone + 7*sizeof(long));
112		zone += 8*sizeof(long);
113		length -= 8*sizeof(long);
114	}
115	while (length >= 1) {
116		temp ^= *(unsigned char *)zone;
117		zone += 1;
118		length--;
119	}
120	htotal = temp;
121}
122
123#define ALIGN_UP(x, align_to)	(((x) + ((align_to)-1)) & ~((align_to)-1))
124#define ALIGN_PTR_UP(p, ptr_align_to)	((typeof(p))ALIGN_UP((unsigned long)(p), ptr_align_to))
125
126
127static void *mmap_large_buffer(size_t need, size_t *allocated)
128{
129	void *buffer;
130	size_t sz;
131
132	/* Attempt to use huge pages if possible. */
133	sz = ALIGN_UP(need, map_align);
134	buffer = mmap(NULL, sz, PROT_READ | PROT_WRITE,
135		      MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, -1, 0);
136
137	if (buffer == (void *)-1) {
138		sz = need;
139		buffer = mmap(NULL, sz, PROT_READ | PROT_WRITE,
140			      MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
141		if (buffer != (void *)-1)
142			fprintf(stderr, "MAP_HUGETLB attempt failed, look at /sys/kernel/mm/hugepages for optimal performance\n");
143	}
144	*allocated = sz;
145	return buffer;
146}
147
148void *child_thread(void *arg)
149{
150	unsigned long total_mmap = 0, total = 0;
151	struct tcp_zerocopy_receive zc;
152	unsigned long delta_usec;
153	int flags = MAP_SHARED;
154	struct timeval t0, t1;
155	char *buffer = NULL;
156	void *raddr = NULL;
157	void *addr = NULL;
158	double throughput;
159	struct rusage ru;
160	size_t buffer_sz;
161	int lu, fd;
162
163	fd = (int)(unsigned long)arg;
164
165	gettimeofday(&t0, NULL);
166
167	fcntl(fd, F_SETFL, O_NDELAY);
168	buffer = mmap_large_buffer(chunk_size, &buffer_sz);
169	if (buffer == (void *)-1) {
170		perror("mmap");
171		goto error;
172	}
173	if (zflg) {
174		raddr = mmap(NULL, chunk_size + map_align, PROT_READ, flags, fd, 0);
175		if (raddr == (void *)-1) {
176			perror("mmap");
177			zflg = 0;
178		} else {
179			addr = ALIGN_PTR_UP(raddr, map_align);
180		}
181	}
182	while (1) {
183		struct pollfd pfd = { .fd = fd, .events = POLLIN, };
184		int sub;
185
186		poll(&pfd, 1, 10000);
187		if (zflg) {
188			socklen_t zc_len = sizeof(zc);
189			int res;
190
191			memset(&zc, 0, sizeof(zc));
192			zc.address = (__u64)((unsigned long)addr);
193			zc.length = chunk_size;
194
195			res = getsockopt(fd, IPPROTO_TCP, TCP_ZEROCOPY_RECEIVE,
196					 &zc, &zc_len);
197			if (res == -1)
198				break;
199
200			if (zc.length) {
201				assert(zc.length <= chunk_size);
202				total_mmap += zc.length;
203				if (xflg)
204					hash_zone(addr, zc.length);
205				/* It is more efficient to unmap the pages right now,
206				 * instead of doing this in next TCP_ZEROCOPY_RECEIVE.
207				 */
208				madvise(addr, zc.length, MADV_DONTNEED);
209				total += zc.length;
210			}
211			if (zc.recv_skip_hint) {
212				assert(zc.recv_skip_hint <= chunk_size);
213				lu = read(fd, buffer, zc.recv_skip_hint);
214				if (lu > 0) {
215					if (xflg)
216						hash_zone(buffer, lu);
217					total += lu;
218				}
219			}
220			continue;
221		}
222		sub = 0;
223		while (sub < chunk_size) {
224			lu = read(fd, buffer + sub, chunk_size - sub);
225			if (lu == 0)
226				goto end;
227			if (lu < 0)
228				break;
229			if (xflg)
230				hash_zone(buffer + sub, lu);
231			total += lu;
232			sub += lu;
233		}
234	}
235end:
236	gettimeofday(&t1, NULL);
237	delta_usec = (t1.tv_sec - t0.tv_sec) * 1000000 + t1.tv_usec - t0.tv_usec;
238
239	throughput = 0;
240	if (delta_usec)
241		throughput = total * 8.0 / (double)delta_usec / 1000.0;
242	getrusage(RUSAGE_THREAD, &ru);
243	if (total > 1024*1024) {
244		unsigned long total_usec;
245		unsigned long mb = total >> 20;
246		total_usec = 1000000*ru.ru_utime.tv_sec + ru.ru_utime.tv_usec +
247			     1000000*ru.ru_stime.tv_sec + ru.ru_stime.tv_usec;
248		printf("received %lg MB (%lg %% mmap'ed) in %lg s, %lg Gbit\n"
249		       "  cpu usage user:%lg sys:%lg, %lg usec per MB, %lu c-switches\n",
250				total / (1024.0 * 1024.0),
251				100.0*total_mmap/total,
252				(double)delta_usec / 1000000.0,
253				throughput,
254				(double)ru.ru_utime.tv_sec + (double)ru.ru_utime.tv_usec / 1000000.0,
255				(double)ru.ru_stime.tv_sec + (double)ru.ru_stime.tv_usec / 1000000.0,
256				(double)total_usec/mb,
257				ru.ru_nvcsw);
258	}
259error:
260	munmap(buffer, buffer_sz);
261	close(fd);
262	if (zflg)
263		munmap(raddr, chunk_size + map_align);
264	pthread_exit(0);
265}
266
267static void apply_rcvsnd_buf(int fd)
268{
269	if (rcvbuf && setsockopt(fd, SOL_SOCKET,
270				 SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) == -1) {
271		perror("setsockopt SO_RCVBUF");
272	}
273
274	if (sndbuf && setsockopt(fd, SOL_SOCKET,
275				 SO_SNDBUF, &sndbuf, sizeof(sndbuf)) == -1) {
276		perror("setsockopt SO_SNDBUF");
277	}
278}
279
280
281static void setup_sockaddr(int domain, const char *str_addr,
282			   struct sockaddr_storage *sockaddr)
283{
284	struct sockaddr_in6 *addr6 = (void *) sockaddr;
285	struct sockaddr_in *addr4 = (void *) sockaddr;
286
287	switch (domain) {
288	case PF_INET:
289		memset(addr4, 0, sizeof(*addr4));
290		addr4->sin_family = AF_INET;
291		addr4->sin_port = htons(cfg_port);
292		if (str_addr &&
293		    inet_pton(AF_INET, str_addr, &(addr4->sin_addr)) != 1)
294			error(1, 0, "ipv4 parse error: %s", str_addr);
295		break;
296	case PF_INET6:
297		memset(addr6, 0, sizeof(*addr6));
298		addr6->sin6_family = AF_INET6;
299		addr6->sin6_port = htons(cfg_port);
300		if (str_addr &&
301		    inet_pton(AF_INET6, str_addr, &(addr6->sin6_addr)) != 1)
302			error(1, 0, "ipv6 parse error: %s", str_addr);
303		break;
304	default:
305		error(1, 0, "illegal domain");
306	}
307}
308
309static void do_accept(int fdlisten)
310{
311	pthread_attr_t attr;
312	int rcvlowat;
313
314	pthread_attr_init(&attr);
315	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
316
317	rcvlowat = chunk_size;
318	if (setsockopt(fdlisten, SOL_SOCKET, SO_RCVLOWAT,
319		       &rcvlowat, sizeof(rcvlowat)) == -1) {
320		perror("setsockopt SO_RCVLOWAT");
321	}
322
323	apply_rcvsnd_buf(fdlisten);
324
325	while (1) {
326		struct sockaddr_in addr;
327		socklen_t addrlen = sizeof(addr);
328		pthread_t th;
329		int fd, res;
330
331		fd = accept(fdlisten, (struct sockaddr *)&addr, &addrlen);
332		if (fd == -1) {
333			perror("accept");
334			continue;
335		}
336		res = pthread_create(&th, &attr, child_thread,
337				     (void *)(unsigned long)fd);
338		if (res) {
339			errno = res;
340			perror("pthread_create");
341			close(fd);
342		}
343	}
344}
345
346/* Each thread should reserve a big enough vma to avoid
347 * spinlock collisions in ptl locks.
348 * This size is 2MB on x86_64, and is exported in /proc/meminfo.
349 */
350static unsigned long default_huge_page_size(void)
351{
352	FILE *f = fopen("/proc/meminfo", "r");
353	unsigned long hps = 0;
354	size_t linelen = 0;
355	char *line = NULL;
356
357	if (!f)
358		return 0;
359	while (getline(&line, &linelen, f) > 0) {
360		if (sscanf(line, "Hugepagesize:       %lu kB", &hps) == 1) {
361			hps <<= 10;
362			break;
363		}
364	}
365	free(line);
366	fclose(f);
367	return hps;
368}
369
370int main(int argc, char *argv[])
371{
372	struct sockaddr_storage listenaddr, addr;
373	unsigned int max_pacing_rate = 0;
374	uint64_t total = 0;
375	char *host = NULL;
376	int fd, c, on = 1;
377	size_t buffer_sz;
378	char *buffer;
379	int sflg = 0;
380	int mss = 0;
381
382	while ((c = getopt(argc, argv, "46p:svr:w:H:zxkP:M:C:a:")) != -1) {
383		switch (c) {
384		case '4':
385			cfg_family = PF_INET;
386			cfg_alen = sizeof(struct sockaddr_in);
387			break;
388		case '6':
389			cfg_family = PF_INET6;
390			cfg_alen = sizeof(struct sockaddr_in6);
391			break;
392		case 'p':
393			cfg_port = atoi(optarg);
394			break;
395		case 'H':
396			host = optarg;
397			break;
398		case 's': /* server : listen for incoming connections */
399			sflg++;
400			break;
401		case 'r':
402			rcvbuf = atoi(optarg);
403			break;
404		case 'w':
405			sndbuf = atoi(optarg);
406			break;
407		case 'z':
408			zflg = 1;
409			break;
410		case 'M':
411			mss = atoi(optarg);
412			break;
413		case 'x':
414			xflg = 1;
415			break;
416		case 'k':
417			keepflag = 1;
418			break;
419		case 'P':
420			max_pacing_rate = atoi(optarg) ;
421			break;
422		case 'C':
423			chunk_size = atol(optarg);
424			break;
425		case 'a':
426			map_align = atol(optarg);
427			break;
428		default:
429			exit(1);
430		}
431	}
432	if (!map_align) {
433		map_align = default_huge_page_size();
434		/* if really /proc/meminfo is not helping,
435		 * we use the default x86_64 hugepagesize.
436		 */
437		if (!map_align)
438			map_align = 2*1024*1024;
439	}
440	if (sflg) {
441		int fdlisten = socket(cfg_family, SOCK_STREAM, 0);
442
443		if (fdlisten == -1) {
444			perror("socket");
445			exit(1);
446		}
447		apply_rcvsnd_buf(fdlisten);
448		setsockopt(fdlisten, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
449
450		setup_sockaddr(cfg_family, host, &listenaddr);
451
452		if (mss &&
453		    setsockopt(fdlisten, IPPROTO_TCP, TCP_MAXSEG,
454			       &mss, sizeof(mss)) == -1) {
455			perror("setsockopt TCP_MAXSEG");
456			exit(1);
457		}
458		if (bind(fdlisten, (const struct sockaddr *)&listenaddr, cfg_alen) == -1) {
459			perror("bind");
460			exit(1);
461		}
462		if (listen(fdlisten, 128) == -1) {
463			perror("listen");
464			exit(1);
465		}
466		do_accept(fdlisten);
467	}
468
469	buffer = mmap_large_buffer(chunk_size, &buffer_sz);
470	if (buffer == (char *)-1) {
471		perror("mmap");
472		exit(1);
473	}
474
475	fd = socket(cfg_family, SOCK_STREAM, 0);
476	if (fd == -1) {
477		perror("socket");
478		exit(1);
479	}
480	apply_rcvsnd_buf(fd);
481
482	setup_sockaddr(cfg_family, host, &addr);
483
484	if (mss &&
485	    setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &mss, sizeof(mss)) == -1) {
486		perror("setsockopt TCP_MAXSEG");
487		exit(1);
488	}
489	if (connect(fd, (const struct sockaddr *)&addr, cfg_alen) == -1) {
490		perror("connect");
491		exit(1);
492	}
493	if (max_pacing_rate &&
494	    setsockopt(fd, SOL_SOCKET, SO_MAX_PACING_RATE,
495		       &max_pacing_rate, sizeof(max_pacing_rate)) == -1)
496		perror("setsockopt SO_MAX_PACING_RATE");
497
498	if (zflg && setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY,
499			       &on, sizeof(on)) == -1) {
500		perror("setsockopt SO_ZEROCOPY, (-z option disabled)");
501		zflg = 0;
502	}
503	while (total < FILE_SZ) {
504		int64_t wr = FILE_SZ - total;
505
506		if (wr > chunk_size)
507			wr = chunk_size;
508		/* Note : we just want to fill the pipe with 0 bytes */
509		wr = send(fd, buffer, (size_t)wr, zflg ? MSG_ZEROCOPY : 0);
510		if (wr <= 0)
511			break;
512		total += wr;
513	}
514	close(fd);
515	munmap(buffer, buffer_sz);
516	return 0;
517}