Loading...
1/*
2 * This application is Copyright 2012 Red Hat, Inc.
3 * Doug Ledford <dledford@redhat.com>
4 *
5 * mq_perf_tests is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3.
8 *
9 * mq_perf_tests is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * For the full text of the license, see <http://www.gnu.org/licenses/>.
15 *
16 * mq_perf_tests.c
17 * Tests various types of message queue workloads, concentrating on those
18 * situations that invole large message sizes, large message queue depths,
19 * or both, and reports back useful metrics about kernel message queue
20 * performance.
21 *
22 */
23#define _GNU_SOURCE
24#include <stdio.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <fcntl.h>
28#include <string.h>
29#include <limits.h>
30#include <errno.h>
31#include <signal.h>
32#include <pthread.h>
33#include <sched.h>
34#include <sys/types.h>
35#include <sys/time.h>
36#include <sys/resource.h>
37#include <sys/stat.h>
38#include <mqueue.h>
39#include <popt.h>
40#include <error.h>
41
42static char *usage =
43"Usage:\n"
44" %s [-c #[,#..] -f] path\n"
45"\n"
46" -c # Skip most tests and go straight to a high queue depth test\n"
47" and then run that test continuously (useful for running at\n"
48" the same time as some other workload to see how much the\n"
49" cache thrashing caused by adding messages to a very deep\n"
50" queue impacts the performance of other programs). The number\n"
51" indicates which CPU core we should bind the process to during\n"
52" the run. If you have more than one physical CPU, then you\n"
53" will need one copy per physical CPU package, and you should\n"
54" specify the CPU cores to pin ourself to via a comma separated\n"
55" list of CPU values.\n"
56" -f Only usable with continuous mode. Pin ourself to the CPUs\n"
57" as requested, then instead of looping doing a high mq\n"
58" workload, just busy loop. This will allow us to lock up a\n"
59" single CPU just like we normally would, but without actually\n"
60" thrashing the CPU cache. This is to make it easier to get\n"
61" comparable numbers from some other workload running on the\n"
62" other CPUs. One set of numbers with # CPUs locked up running\n"
63" an mq workload, and another set of numbers with those same\n"
64" CPUs locked away from the test workload, but not doing\n"
65" anything to trash the cache like the mq workload might.\n"
66" path Path name of the message queue to create\n"
67"\n"
68" Note: this program must be run as root in order to enable all tests\n"
69"\n";
70
71char *MAX_MSGS = "/proc/sys/fs/mqueue/msg_max";
72char *MAX_MSGSIZE = "/proc/sys/fs/mqueue/msgsize_max";
73
74#define min(a, b) ((a) < (b) ? (a) : (b))
75#define MAX_CPUS 64
76char *cpu_option_string;
77int cpus_to_pin[MAX_CPUS];
78int num_cpus_to_pin;
79pthread_t cpu_threads[MAX_CPUS];
80pthread_t main_thread;
81cpu_set_t *cpu_set;
82int cpu_set_size;
83int cpus_online;
84
85#define MSG_SIZE 16
86#define TEST1_LOOPS 10000000
87#define TEST2_LOOPS 100000
88int continuous_mode;
89int continuous_mode_fake;
90
91struct rlimit saved_limits, cur_limits;
92int saved_max_msgs, saved_max_msgsize;
93int cur_max_msgs, cur_max_msgsize;
94FILE *max_msgs, *max_msgsize;
95int cur_nice;
96char *queue_path = "/mq_perf_tests";
97mqd_t queue = -1;
98struct mq_attr result;
99int mq_prio_max;
100
101const struct poptOption options[] = {
102 {
103 .longName = "continuous",
104 .shortName = 'c',
105 .argInfo = POPT_ARG_STRING,
106 .arg = &cpu_option_string,
107 .val = 'c',
108 .descrip = "Run continuous tests at a high queue depth in "
109 "order to test the effects of cache thrashing on "
110 "other tasks on the system. This test is intended "
111 "to be run on one core of each physical CPU while "
112 "some other CPU intensive task is run on all the other "
113 "cores of that same physical CPU and the other task "
114 "is timed. It is assumed that the process of adding "
115 "messages to the message queue in a tight loop will "
116 "impact that other task to some degree. Once the "
117 "tests are performed in this way, you should then "
118 "re-run the tests using fake mode in order to check "
119 "the difference in time required to perform the CPU "
120 "intensive task",
121 .argDescrip = "cpu[,cpu]",
122 },
123 {
124 .longName = "fake",
125 .shortName = 'f',
126 .argInfo = POPT_ARG_NONE,
127 .arg = &continuous_mode_fake,
128 .val = 0,
129 .descrip = "Tie up the CPUs that we would normally tie up in"
130 "continuous mode, but don't actually do any mq stuff, "
131 "just keep the CPU busy so it can't be used to process "
132 "system level tasks as this would free up resources on "
133 "the other CPU cores and skew the comparison between "
134 "the no-mqueue work and mqueue work tests",
135 .argDescrip = NULL,
136 },
137 {
138 .longName = "path",
139 .shortName = 'p',
140 .argInfo = POPT_ARG_STRING | POPT_ARGFLAG_SHOW_DEFAULT,
141 .arg = &queue_path,
142 .val = 'p',
143 .descrip = "The name of the path to use in the mqueue "
144 "filesystem for our tests",
145 .argDescrip = "pathname",
146 },
147 POPT_AUTOHELP
148 POPT_TABLEEND
149};
150
151static inline void __set(FILE *stream, int value, char *err_msg);
152void shutdown(int exit_val, char *err_cause, int line_no);
153void sig_action_SIGUSR1(int signum, siginfo_t *info, void *context);
154void sig_action(int signum, siginfo_t *info, void *context);
155static inline int get(FILE *stream);
156static inline void set(FILE *stream, int value);
157static inline int try_set(FILE *stream, int value);
158static inline void getr(int type, struct rlimit *rlim);
159static inline void setr(int type, struct rlimit *rlim);
160static inline void open_queue(struct mq_attr *attr);
161void increase_limits(void);
162
163static inline void __set(FILE *stream, int value, char *err_msg)
164{
165 rewind(stream);
166 if (fprintf(stream, "%d", value) < 0)
167 perror(err_msg);
168}
169
170
171void shutdown(int exit_val, char *err_cause, int line_no)
172{
173 static int in_shutdown = 0;
174 int errno_at_shutdown = errno;
175 int i;
176
177 /* In case we get called by multiple threads or from an sighandler */
178 if (in_shutdown++)
179 return;
180
181 for (i = 0; i < num_cpus_to_pin; i++)
182 if (cpu_threads[i]) {
183 pthread_kill(cpu_threads[i], SIGUSR1);
184 pthread_join(cpu_threads[i], NULL);
185 }
186
187 if (queue != -1)
188 if (mq_close(queue))
189 perror("mq_close() during shutdown");
190 if (queue_path)
191 /*
192 * Be silent if this fails, if we cleaned up already it's
193 * expected to fail
194 */
195 mq_unlink(queue_path);
196 if (saved_max_msgs)
197 __set(max_msgs, saved_max_msgs,
198 "failed to restore saved_max_msgs");
199 if (saved_max_msgsize)
200 __set(max_msgsize, saved_max_msgsize,
201 "failed to restore saved_max_msgsize");
202 if (exit_val)
203 error(exit_val, errno_at_shutdown, "%s at %d",
204 err_cause, line_no);
205 exit(0);
206}
207
208void sig_action_SIGUSR1(int signum, siginfo_t *info, void *context)
209{
210 if (pthread_self() != main_thread)
211 pthread_exit(0);
212 else {
213 fprintf(stderr, "Caught signal %d in SIGUSR1 handler, "
214 "exiting\n", signum);
215 shutdown(0, "", 0);
216 fprintf(stderr, "\n\nReturned from shutdown?!?!\n\n");
217 exit(0);
218 }
219}
220
221void sig_action(int signum, siginfo_t *info, void *context)
222{
223 if (pthread_self() != main_thread)
224 pthread_kill(main_thread, signum);
225 else {
226 fprintf(stderr, "Caught signal %d, exiting\n", signum);
227 shutdown(0, "", 0);
228 fprintf(stderr, "\n\nReturned from shutdown?!?!\n\n");
229 exit(0);
230 }
231}
232
233static inline int get(FILE *stream)
234{
235 int value;
236 rewind(stream);
237 if (fscanf(stream, "%d", &value) != 1)
238 shutdown(4, "Error reading /proc entry", __LINE__);
239 return value;
240}
241
242static inline void set(FILE *stream, int value)
243{
244 int new_value;
245
246 rewind(stream);
247 if (fprintf(stream, "%d", value) < 0)
248 return shutdown(5, "Failed writing to /proc file", __LINE__);
249 new_value = get(stream);
250 if (new_value != value)
251 return shutdown(5, "We didn't get what we wrote to /proc back",
252 __LINE__);
253}
254
255static inline int try_set(FILE *stream, int value)
256{
257 int new_value;
258
259 rewind(stream);
260 fprintf(stream, "%d", value);
261 new_value = get(stream);
262 return new_value == value;
263}
264
265static inline void getr(int type, struct rlimit *rlim)
266{
267 if (getrlimit(type, rlim))
268 shutdown(6, "getrlimit()", __LINE__);
269}
270
271static inline void setr(int type, struct rlimit *rlim)
272{
273 if (setrlimit(type, rlim))
274 shutdown(7, "setrlimit()", __LINE__);
275}
276
277/**
278 * open_queue - open the global queue for testing
279 * @attr - An attr struct specifying the desired queue traits
280 * @result - An attr struct that lists the actual traits the queue has
281 *
282 * This open is not allowed to fail, failure will result in an orderly
283 * shutdown of the program. The global queue_path is used to set what
284 * queue to open, the queue descriptor is saved in the global queue
285 * variable.
286 */
287static inline void open_queue(struct mq_attr *attr)
288{
289 int flags = O_RDWR | O_EXCL | O_CREAT | O_NONBLOCK;
290 int perms = DEFFILEMODE;
291
292 queue = mq_open(queue_path, flags, perms, attr);
293 if (queue == -1)
294 shutdown(1, "mq_open()", __LINE__);
295 if (mq_getattr(queue, &result))
296 shutdown(1, "mq_getattr()", __LINE__);
297 printf("\n\tQueue %s created:\n", queue_path);
298 printf("\t\tmq_flags:\t\t\t%s\n", result.mq_flags & O_NONBLOCK ?
299 "O_NONBLOCK" : "(null)");
300 printf("\t\tmq_maxmsg:\t\t\t%lu\n", result.mq_maxmsg);
301 printf("\t\tmq_msgsize:\t\t\t%lu\n", result.mq_msgsize);
302 printf("\t\tmq_curmsgs:\t\t\t%lu\n", result.mq_curmsgs);
303}
304
305void *fake_cont_thread(void *arg)
306{
307 int i;
308
309 for (i = 0; i < num_cpus_to_pin; i++)
310 if (cpu_threads[i] == pthread_self())
311 break;
312 printf("\tStarted fake continuous mode thread %d on CPU %d\n", i,
313 cpus_to_pin[i]);
314 while (1)
315 ;
316}
317
318void *cont_thread(void *arg)
319{
320 char buff[MSG_SIZE];
321 int i, priority;
322
323 for (i = 0; i < num_cpus_to_pin; i++)
324 if (cpu_threads[i] == pthread_self())
325 break;
326 printf("\tStarted continuous mode thread %d on CPU %d\n", i,
327 cpus_to_pin[i]);
328 while (1) {
329 while (mq_send(queue, buff, sizeof(buff), 0) == 0)
330 ;
331 mq_receive(queue, buff, sizeof(buff), &priority);
332 }
333}
334
335#define drain_queue() \
336 while (mq_receive(queue, buff, MSG_SIZE, &prio_in) == MSG_SIZE)
337
338#define do_untimed_send() \
339 do { \
340 if (mq_send(queue, buff, MSG_SIZE, prio_out)) \
341 shutdown(3, "Test send failure", __LINE__); \
342 } while (0)
343
344#define do_send_recv() \
345 do { \
346 clock_gettime(clock, &start); \
347 if (mq_send(queue, buff, MSG_SIZE, prio_out)) \
348 shutdown(3, "Test send failure", __LINE__); \
349 clock_gettime(clock, &middle); \
350 if (mq_receive(queue, buff, MSG_SIZE, &prio_in) != MSG_SIZE) \
351 shutdown(3, "Test receive failure", __LINE__); \
352 clock_gettime(clock, &end); \
353 nsec = ((middle.tv_sec - start.tv_sec) * 1000000000) + \
354 (middle.tv_nsec - start.tv_nsec); \
355 send_total.tv_nsec += nsec; \
356 if (send_total.tv_nsec >= 1000000000) { \
357 send_total.tv_sec++; \
358 send_total.tv_nsec -= 1000000000; \
359 } \
360 nsec = ((end.tv_sec - middle.tv_sec) * 1000000000) + \
361 (end.tv_nsec - middle.tv_nsec); \
362 recv_total.tv_nsec += nsec; \
363 if (recv_total.tv_nsec >= 1000000000) { \
364 recv_total.tv_sec++; \
365 recv_total.tv_nsec -= 1000000000; \
366 } \
367 } while (0)
368
369struct test {
370 char *desc;
371 void (*func)(int *);
372};
373
374void const_prio(int *prio)
375{
376 return;
377}
378
379void inc_prio(int *prio)
380{
381 if (++*prio == mq_prio_max)
382 *prio = 0;
383}
384
385void dec_prio(int *prio)
386{
387 if (--*prio < 0)
388 *prio = mq_prio_max - 1;
389}
390
391void random_prio(int *prio)
392{
393 *prio = random() % mq_prio_max;
394}
395
396struct test test2[] = {
397 {"\n\tTest #2a: Time send/recv message, queue full, constant prio\n",
398 const_prio},
399 {"\n\tTest #2b: Time send/recv message, queue full, increasing prio\n",
400 inc_prio},
401 {"\n\tTest #2c: Time send/recv message, queue full, decreasing prio\n",
402 dec_prio},
403 {"\n\tTest #2d: Time send/recv message, queue full, random prio\n",
404 random_prio},
405 {NULL, NULL}
406};
407
408/**
409 * Tests to perform (all done with MSG_SIZE messages):
410 *
411 * 1) Time to add/remove message with 0 messages on queue
412 * 1a) with constant prio
413 * 2) Time to add/remove message when queue close to capacity:
414 * 2a) with constant prio
415 * 2b) with increasing prio
416 * 2c) with decreasing prio
417 * 2d) with random prio
418 * 3) Test limits of priorities honored (double check _SC_MQ_PRIO_MAX)
419 */
420void *perf_test_thread(void *arg)
421{
422 char buff[MSG_SIZE];
423 int prio_out, prio_in;
424 int i;
425 clockid_t clock;
426 pthread_t *t;
427 struct timespec res, start, middle, end, send_total, recv_total;
428 unsigned long long nsec;
429 struct test *cur_test;
430
431 t = &cpu_threads[0];
432 printf("\n\tStarted mqueue performance test thread on CPU %d\n",
433 cpus_to_pin[0]);
434 mq_prio_max = sysconf(_SC_MQ_PRIO_MAX);
435 if (mq_prio_max == -1)
436 shutdown(2, "sysconf(_SC_MQ_PRIO_MAX)", __LINE__);
437 if (pthread_getcpuclockid(cpu_threads[0], &clock) != 0)
438 shutdown(2, "pthread_getcpuclockid", __LINE__);
439
440 if (clock_getres(clock, &res))
441 shutdown(2, "clock_getres()", __LINE__);
442
443 printf("\t\tMax priorities:\t\t\t%d\n", mq_prio_max);
444 printf("\t\tClock resolution:\t\t%lu nsec%s\n", res.tv_nsec,
445 res.tv_nsec > 1 ? "s" : "");
446
447
448
449 printf("\n\tTest #1: Time send/recv message, queue empty\n");
450 printf("\t\t(%d iterations)\n", TEST1_LOOPS);
451 prio_out = 0;
452 send_total.tv_sec = 0;
453 send_total.tv_nsec = 0;
454 recv_total.tv_sec = 0;
455 recv_total.tv_nsec = 0;
456 for (i = 0; i < TEST1_LOOPS; i++)
457 do_send_recv();
458 printf("\t\tSend msg:\t\t\t%ld.%lus total time\n",
459 send_total.tv_sec, send_total.tv_nsec);
460 nsec = ((unsigned long long)send_total.tv_sec * 1000000000 +
461 send_total.tv_nsec) / TEST1_LOOPS;
462 printf("\t\t\t\t\t\t%lld nsec/msg\n", nsec);
463 printf("\t\tRecv msg:\t\t\t%ld.%lus total time\n",
464 recv_total.tv_sec, recv_total.tv_nsec);
465 nsec = ((unsigned long long)recv_total.tv_sec * 1000000000 +
466 recv_total.tv_nsec) / TEST1_LOOPS;
467 printf("\t\t\t\t\t\t%lld nsec/msg\n", nsec);
468
469
470 for (cur_test = test2; cur_test->desc != NULL; cur_test++) {
471 printf("%s:\n", cur_test->desc);
472 printf("\t\t(%d iterations)\n", TEST2_LOOPS);
473 prio_out = 0;
474 send_total.tv_sec = 0;
475 send_total.tv_nsec = 0;
476 recv_total.tv_sec = 0;
477 recv_total.tv_nsec = 0;
478 printf("\t\tFilling queue...");
479 fflush(stdout);
480 clock_gettime(clock, &start);
481 for (i = 0; i < result.mq_maxmsg - 1; i++) {
482 do_untimed_send();
483 cur_test->func(&prio_out);
484 }
485 clock_gettime(clock, &end);
486 nsec = ((unsigned long long)(end.tv_sec - start.tv_sec) *
487 1000000000) + (end.tv_nsec - start.tv_nsec);
488 printf("done.\t\t%lld.%llds\n", nsec / 1000000000,
489 nsec % 1000000000);
490 printf("\t\tTesting...");
491 fflush(stdout);
492 for (i = 0; i < TEST2_LOOPS; i++) {
493 do_send_recv();
494 cur_test->func(&prio_out);
495 }
496 printf("done.\n");
497 printf("\t\tSend msg:\t\t\t%ld.%lus total time\n",
498 send_total.tv_sec, send_total.tv_nsec);
499 nsec = ((unsigned long long)send_total.tv_sec * 1000000000 +
500 send_total.tv_nsec) / TEST2_LOOPS;
501 printf("\t\t\t\t\t\t%lld nsec/msg\n", nsec);
502 printf("\t\tRecv msg:\t\t\t%ld.%lus total time\n",
503 recv_total.tv_sec, recv_total.tv_nsec);
504 nsec = ((unsigned long long)recv_total.tv_sec * 1000000000 +
505 recv_total.tv_nsec) / TEST2_LOOPS;
506 printf("\t\t\t\t\t\t%lld nsec/msg\n", nsec);
507 printf("\t\tDraining queue...");
508 fflush(stdout);
509 clock_gettime(clock, &start);
510 drain_queue();
511 clock_gettime(clock, &end);
512 nsec = ((unsigned long long)(end.tv_sec - start.tv_sec) *
513 1000000000) + (end.tv_nsec - start.tv_nsec);
514 printf("done.\t\t%lld.%llds\n", nsec / 1000000000,
515 nsec % 1000000000);
516 }
517 return 0;
518}
519
520void increase_limits(void)
521{
522 cur_limits.rlim_cur = RLIM_INFINITY;
523 cur_limits.rlim_max = RLIM_INFINITY;
524 setr(RLIMIT_MSGQUEUE, &cur_limits);
525 while (try_set(max_msgs, cur_max_msgs += 10))
526 ;
527 cur_max_msgs = get(max_msgs);
528 while (try_set(max_msgsize, cur_max_msgsize += 1024))
529 ;
530 cur_max_msgsize = get(max_msgsize);
531 if (setpriority(PRIO_PROCESS, 0, -20) != 0)
532 shutdown(2, "setpriority()", __LINE__);
533 cur_nice = -20;
534}
535
536int main(int argc, char *argv[])
537{
538 struct mq_attr attr;
539 char *option, *next_option;
540 int i, cpu, rc;
541 struct sigaction sa;
542 poptContext popt_context;
543 void *retval;
544
545 main_thread = pthread_self();
546 num_cpus_to_pin = 0;
547
548 if (sysconf(_SC_NPROCESSORS_ONLN) == -1) {
549 perror("sysconf(_SC_NPROCESSORS_ONLN)");
550 exit(1);
551 }
552 cpus_online = min(MAX_CPUS, sysconf(_SC_NPROCESSORS_ONLN));
553 cpu_set = CPU_ALLOC(cpus_online);
554 if (cpu_set == NULL) {
555 perror("CPU_ALLOC()");
556 exit(1);
557 }
558 cpu_set_size = CPU_ALLOC_SIZE(cpus_online);
559 CPU_ZERO_S(cpu_set_size, cpu_set);
560
561 popt_context = poptGetContext(NULL, argc, (const char **)argv,
562 options, 0);
563
564 while ((rc = poptGetNextOpt(popt_context)) > 0) {
565 switch (rc) {
566 case 'c':
567 continuous_mode = 1;
568 option = cpu_option_string;
569 do {
570 next_option = strchr(option, ',');
571 if (next_option)
572 *next_option = '\0';
573 cpu = atoi(option);
574 if (cpu >= cpus_online)
575 fprintf(stderr, "CPU %d exceeds "
576 "cpus online, ignoring.\n",
577 cpu);
578 else
579 cpus_to_pin[num_cpus_to_pin++] = cpu;
580 if (next_option)
581 option = ++next_option;
582 } while (next_option && num_cpus_to_pin < MAX_CPUS);
583 /* Double check that they didn't give us the same CPU
584 * more than once */
585 for (cpu = 0; cpu < num_cpus_to_pin; cpu++) {
586 if (CPU_ISSET_S(cpus_to_pin[cpu], cpu_set_size,
587 cpu_set)) {
588 fprintf(stderr, "Any given CPU may "
589 "only be given once.\n");
590 exit(1);
591 } else
592 CPU_SET_S(cpus_to_pin[cpu],
593 cpu_set_size, cpu_set);
594 }
595 break;
596 case 'p':
597 /*
598 * Although we can create a msg queue with a
599 * non-absolute path name, unlink will fail. So,
600 * if the name doesn't start with a /, add one
601 * when we save it.
602 */
603 option = queue_path;
604 if (*option != '/') {
605 queue_path = malloc(strlen(option) + 2);
606 if (!queue_path) {
607 perror("malloc()");
608 exit(1);
609 }
610 queue_path[0] = '/';
611 queue_path[1] = 0;
612 strcat(queue_path, option);
613 free(option);
614 }
615 break;
616 }
617 }
618
619 if (continuous_mode && num_cpus_to_pin == 0) {
620 fprintf(stderr, "Must pass at least one CPU to continuous "
621 "mode.\n");
622 poptPrintUsage(popt_context, stderr, 0);
623 exit(1);
624 } else if (!continuous_mode) {
625 num_cpus_to_pin = 1;
626 cpus_to_pin[0] = cpus_online - 1;
627 }
628
629 if (getuid() != 0) {
630 fprintf(stderr, "Not running as root, but almost all tests "
631 "require root in order to modify\nsystem settings. "
632 "Exiting.\n");
633 exit(1);
634 }
635
636 max_msgs = fopen(MAX_MSGS, "r+");
637 max_msgsize = fopen(MAX_MSGSIZE, "r+");
638 if (!max_msgs)
639 shutdown(2, "Failed to open msg_max", __LINE__);
640 if (!max_msgsize)
641 shutdown(2, "Failed to open msgsize_max", __LINE__);
642
643 /* Load up the current system values for everything we can */
644 getr(RLIMIT_MSGQUEUE, &saved_limits);
645 cur_limits = saved_limits;
646 saved_max_msgs = cur_max_msgs = get(max_msgs);
647 saved_max_msgsize = cur_max_msgsize = get(max_msgsize);
648 errno = 0;
649 cur_nice = getpriority(PRIO_PROCESS, 0);
650 if (errno)
651 shutdown(2, "getpriority()", __LINE__);
652
653 /* Tell the user our initial state */
654 printf("\nInitial system state:\n");
655 printf("\tUsing queue path:\t\t\t%s\n", queue_path);
656 printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t%ld\n",
657 (long) saved_limits.rlim_cur);
658 printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t%ld\n",
659 (long) saved_limits.rlim_max);
660 printf("\tMaximum Message Size:\t\t\t%d\n", saved_max_msgsize);
661 printf("\tMaximum Queue Size:\t\t\t%d\n", saved_max_msgs);
662 printf("\tNice value:\t\t\t\t%d\n", cur_nice);
663 printf("\n");
664
665 increase_limits();
666
667 printf("Adjusted system state for testing:\n");
668 if (cur_limits.rlim_cur == RLIM_INFINITY) {
669 printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t(unlimited)\n");
670 printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t(unlimited)\n");
671 } else {
672 printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t%ld\n",
673 (long) cur_limits.rlim_cur);
674 printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t%ld\n",
675 (long) cur_limits.rlim_max);
676 }
677 printf("\tMaximum Message Size:\t\t\t%d\n", cur_max_msgsize);
678 printf("\tMaximum Queue Size:\t\t\t%d\n", cur_max_msgs);
679 printf("\tNice value:\t\t\t\t%d\n", cur_nice);
680 printf("\tContinuous mode:\t\t\t(%s)\n", continuous_mode ?
681 (continuous_mode_fake ? "fake mode" : "enabled") :
682 "disabled");
683 printf("\tCPUs to pin:\t\t\t\t%d", cpus_to_pin[0]);
684 for (cpu = 1; cpu < num_cpus_to_pin; cpu++)
685 printf(",%d", cpus_to_pin[cpu]);
686 printf("\n");
687
688 sa.sa_sigaction = sig_action_SIGUSR1;
689 sigemptyset(&sa.sa_mask);
690 sigaddset(&sa.sa_mask, SIGHUP);
691 sigaddset(&sa.sa_mask, SIGINT);
692 sigaddset(&sa.sa_mask, SIGQUIT);
693 sigaddset(&sa.sa_mask, SIGTERM);
694 sa.sa_flags = SA_SIGINFO;
695 if (sigaction(SIGUSR1, &sa, NULL) == -1)
696 shutdown(1, "sigaction(SIGUSR1)", __LINE__);
697 sa.sa_sigaction = sig_action;
698 if (sigaction(SIGHUP, &sa, NULL) == -1)
699 shutdown(1, "sigaction(SIGHUP)", __LINE__);
700 if (sigaction(SIGINT, &sa, NULL) == -1)
701 shutdown(1, "sigaction(SIGINT)", __LINE__);
702 if (sigaction(SIGQUIT, &sa, NULL) == -1)
703 shutdown(1, "sigaction(SIGQUIT)", __LINE__);
704 if (sigaction(SIGTERM, &sa, NULL) == -1)
705 shutdown(1, "sigaction(SIGTERM)", __LINE__);
706
707 if (!continuous_mode_fake) {
708 attr.mq_flags = O_NONBLOCK;
709 attr.mq_maxmsg = cur_max_msgs;
710 attr.mq_msgsize = MSG_SIZE;
711 open_queue(&attr);
712 }
713 for (i = 0; i < num_cpus_to_pin; i++) {
714 pthread_attr_t thread_attr;
715 void *thread_func;
716
717 if (continuous_mode_fake)
718 thread_func = &fake_cont_thread;
719 else if (continuous_mode)
720 thread_func = &cont_thread;
721 else
722 thread_func = &perf_test_thread;
723
724 CPU_ZERO_S(cpu_set_size, cpu_set);
725 CPU_SET_S(cpus_to_pin[i], cpu_set_size, cpu_set);
726 pthread_attr_init(&thread_attr);
727 pthread_attr_setaffinity_np(&thread_attr, cpu_set_size,
728 cpu_set);
729 if (pthread_create(&cpu_threads[i], &thread_attr, thread_func,
730 NULL))
731 shutdown(1, "pthread_create()", __LINE__);
732 pthread_attr_destroy(&thread_attr);
733 }
734
735 if (!continuous_mode) {
736 pthread_join(cpu_threads[0], &retval);
737 shutdown((long)retval, "perf_test_thread()", __LINE__);
738 } else {
739 while (1)
740 sleep(1);
741 }
742 shutdown(0, "", 0);
743}
1/*
2 * This application is Copyright 2012 Red Hat, Inc.
3 * Doug Ledford <dledford@redhat.com>
4 *
5 * mq_perf_tests is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, version 3.
8 *
9 * mq_perf_tests is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * For the full text of the license, see <http://www.gnu.org/licenses/>.
15 *
16 * mq_perf_tests.c
17 * Tests various types of message queue workloads, concentrating on those
18 * situations that invole large message sizes, large message queue depths,
19 * or both, and reports back useful metrics about kernel message queue
20 * performance.
21 *
22 */
23#define _GNU_SOURCE
24#include <stdio.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <fcntl.h>
28#include <string.h>
29#include <limits.h>
30#include <errno.h>
31#include <signal.h>
32#include <pthread.h>
33#include <sched.h>
34#include <sys/types.h>
35#include <sys/time.h>
36#include <sys/resource.h>
37#include <sys/stat.h>
38#include <mqueue.h>
39#include <popt.h>
40#include <error.h>
41
42static char *usage =
43"Usage:\n"
44" %s [-c #[,#..] -f] path\n"
45"\n"
46" -c # Skip most tests and go straight to a high queue depth test\n"
47" and then run that test continuously (useful for running at\n"
48" the same time as some other workload to see how much the\n"
49" cache thrashing caused by adding messages to a very deep\n"
50" queue impacts the performance of other programs). The number\n"
51" indicates which CPU core we should bind the process to during\n"
52" the run. If you have more than one physical CPU, then you\n"
53" will need one copy per physical CPU package, and you should\n"
54" specify the CPU cores to pin ourself to via a comma separated\n"
55" list of CPU values.\n"
56" -f Only usable with continuous mode. Pin ourself to the CPUs\n"
57" as requested, then instead of looping doing a high mq\n"
58" workload, just busy loop. This will allow us to lock up a\n"
59" single CPU just like we normally would, but without actually\n"
60" thrashing the CPU cache. This is to make it easier to get\n"
61" comparable numbers from some other workload running on the\n"
62" other CPUs. One set of numbers with # CPUs locked up running\n"
63" an mq workload, and another set of numbers with those same\n"
64" CPUs locked away from the test workload, but not doing\n"
65" anything to trash the cache like the mq workload might.\n"
66" path Path name of the message queue to create\n"
67"\n"
68" Note: this program must be run as root in order to enable all tests\n"
69"\n";
70
71char *MAX_MSGS = "/proc/sys/fs/mqueue/msg_max";
72char *MAX_MSGSIZE = "/proc/sys/fs/mqueue/msgsize_max";
73
74#define min(a, b) ((a) < (b) ? (a) : (b))
75#define MAX_CPUS 64
76char *cpu_option_string;
77int cpus_to_pin[MAX_CPUS];
78int num_cpus_to_pin;
79pthread_t cpu_threads[MAX_CPUS];
80pthread_t main_thread;
81cpu_set_t *cpu_set;
82int cpu_set_size;
83int cpus_online;
84
85#define MSG_SIZE 16
86#define TEST1_LOOPS 10000000
87#define TEST2_LOOPS 100000
88int continuous_mode;
89int continuous_mode_fake;
90
91struct rlimit saved_limits, cur_limits;
92int saved_max_msgs, saved_max_msgsize;
93int cur_max_msgs, cur_max_msgsize;
94FILE *max_msgs, *max_msgsize;
95int cur_nice;
96char *queue_path = "/mq_perf_tests";
97mqd_t queue = -1;
98struct mq_attr result;
99int mq_prio_max;
100
101const struct poptOption options[] = {
102 {
103 .longName = "continuous",
104 .shortName = 'c',
105 .argInfo = POPT_ARG_STRING,
106 .arg = &cpu_option_string,
107 .val = 'c',
108 .descrip = "Run continuous tests at a high queue depth in "
109 "order to test the effects of cache thrashing on "
110 "other tasks on the system. This test is intended "
111 "to be run on one core of each physical CPU while "
112 "some other CPU intensive task is run on all the other "
113 "cores of that same physical CPU and the other task "
114 "is timed. It is assumed that the process of adding "
115 "messages to the message queue in a tight loop will "
116 "impact that other task to some degree. Once the "
117 "tests are performed in this way, you should then "
118 "re-run the tests using fake mode in order to check "
119 "the difference in time required to perform the CPU "
120 "intensive task",
121 .argDescrip = "cpu[,cpu]",
122 },
123 {
124 .longName = "fake",
125 .shortName = 'f',
126 .argInfo = POPT_ARG_NONE,
127 .arg = &continuous_mode_fake,
128 .val = 0,
129 .descrip = "Tie up the CPUs that we would normally tie up in"
130 "continuous mode, but don't actually do any mq stuff, "
131 "just keep the CPU busy so it can't be used to process "
132 "system level tasks as this would free up resources on "
133 "the other CPU cores and skew the comparison between "
134 "the no-mqueue work and mqueue work tests",
135 .argDescrip = NULL,
136 },
137 {
138 .longName = "path",
139 .shortName = 'p',
140 .argInfo = POPT_ARG_STRING | POPT_ARGFLAG_SHOW_DEFAULT,
141 .arg = &queue_path,
142 .val = 'p',
143 .descrip = "The name of the path to use in the mqueue "
144 "filesystem for our tests",
145 .argDescrip = "pathname",
146 },
147 POPT_AUTOHELP
148 POPT_TABLEEND
149};
150
151static inline void __set(FILE *stream, int value, char *err_msg);
152void shutdown(int exit_val, char *err_cause, int line_no);
153void sig_action_SIGUSR1(int signum, siginfo_t *info, void *context);
154void sig_action(int signum, siginfo_t *info, void *context);
155static inline int get(FILE *stream);
156static inline void set(FILE *stream, int value);
157static inline int try_set(FILE *stream, int value);
158static inline void getr(int type, struct rlimit *rlim);
159static inline void setr(int type, struct rlimit *rlim);
160static inline void open_queue(struct mq_attr *attr);
161void increase_limits(void);
162
163static inline void __set(FILE *stream, int value, char *err_msg)
164{
165 rewind(stream);
166 if (fprintf(stream, "%d", value) < 0)
167 perror(err_msg);
168}
169
170
171void shutdown(int exit_val, char *err_cause, int line_no)
172{
173 static int in_shutdown = 0;
174 int errno_at_shutdown = errno;
175 int i;
176
177 /* In case we get called by multiple threads or from an sighandler */
178 if (in_shutdown++)
179 return;
180
181 for (i = 0; i < num_cpus_to_pin; i++)
182 if (cpu_threads[i]) {
183 pthread_kill(cpu_threads[i], SIGUSR1);
184 pthread_join(cpu_threads[i], NULL);
185 }
186
187 if (queue != -1)
188 if (mq_close(queue))
189 perror("mq_close() during shutdown");
190 if (queue_path)
191 /*
192 * Be silent if this fails, if we cleaned up already it's
193 * expected to fail
194 */
195 mq_unlink(queue_path);
196 if (saved_max_msgs)
197 __set(max_msgs, saved_max_msgs,
198 "failed to restore saved_max_msgs");
199 if (saved_max_msgsize)
200 __set(max_msgsize, saved_max_msgsize,
201 "failed to restore saved_max_msgsize");
202 if (exit_val)
203 error(exit_val, errno_at_shutdown, "%s at %d",
204 err_cause, line_no);
205 exit(0);
206}
207
208void sig_action_SIGUSR1(int signum, siginfo_t *info, void *context)
209{
210 if (pthread_self() != main_thread)
211 pthread_exit(0);
212 else {
213 fprintf(stderr, "Caught signal %d in SIGUSR1 handler, "
214 "exiting\n", signum);
215 shutdown(0, "", 0);
216 fprintf(stderr, "\n\nReturned from shutdown?!?!\n\n");
217 exit(0);
218 }
219}
220
221void sig_action(int signum, siginfo_t *info, void *context)
222{
223 if (pthread_self() != main_thread)
224 pthread_kill(main_thread, signum);
225 else {
226 fprintf(stderr, "Caught signal %d, exiting\n", signum);
227 shutdown(0, "", 0);
228 fprintf(stderr, "\n\nReturned from shutdown?!?!\n\n");
229 exit(0);
230 }
231}
232
233static inline int get(FILE *stream)
234{
235 int value;
236 rewind(stream);
237 if (fscanf(stream, "%d", &value) != 1)
238 shutdown(4, "Error reading /proc entry", __LINE__);
239 return value;
240}
241
242static inline void set(FILE *stream, int value)
243{
244 int new_value;
245
246 rewind(stream);
247 if (fprintf(stream, "%d", value) < 0)
248 return shutdown(5, "Failed writing to /proc file", __LINE__);
249 new_value = get(stream);
250 if (new_value != value)
251 return shutdown(5, "We didn't get what we wrote to /proc back",
252 __LINE__);
253}
254
255static inline int try_set(FILE *stream, int value)
256{
257 int new_value;
258
259 rewind(stream);
260 fprintf(stream, "%d", value);
261 new_value = get(stream);
262 return new_value == value;
263}
264
265static inline void getr(int type, struct rlimit *rlim)
266{
267 if (getrlimit(type, rlim))
268 shutdown(6, "getrlimit()", __LINE__);
269}
270
271static inline void setr(int type, struct rlimit *rlim)
272{
273 if (setrlimit(type, rlim))
274 shutdown(7, "setrlimit()", __LINE__);
275}
276
277/**
278 * open_queue - open the global queue for testing
279 * @attr - An attr struct specifying the desired queue traits
280 * @result - An attr struct that lists the actual traits the queue has
281 *
282 * This open is not allowed to fail, failure will result in an orderly
283 * shutdown of the program. The global queue_path is used to set what
284 * queue to open, the queue descriptor is saved in the global queue
285 * variable.
286 */
287static inline void open_queue(struct mq_attr *attr)
288{
289 int flags = O_RDWR | O_EXCL | O_CREAT | O_NONBLOCK;
290 int perms = DEFFILEMODE;
291
292 queue = mq_open(queue_path, flags, perms, attr);
293 if (queue == -1)
294 shutdown(1, "mq_open()", __LINE__);
295 if (mq_getattr(queue, &result))
296 shutdown(1, "mq_getattr()", __LINE__);
297 printf("\n\tQueue %s created:\n", queue_path);
298 printf("\t\tmq_flags:\t\t\t%s\n", result.mq_flags & O_NONBLOCK ?
299 "O_NONBLOCK" : "(null)");
300 printf("\t\tmq_maxmsg:\t\t\t%lu\n", result.mq_maxmsg);
301 printf("\t\tmq_msgsize:\t\t\t%lu\n", result.mq_msgsize);
302 printf("\t\tmq_curmsgs:\t\t\t%lu\n", result.mq_curmsgs);
303}
304
305void *fake_cont_thread(void *arg)
306{
307 int i;
308
309 for (i = 0; i < num_cpus_to_pin; i++)
310 if (cpu_threads[i] == pthread_self())
311 break;
312 printf("\tStarted fake continuous mode thread %d on CPU %d\n", i,
313 cpus_to_pin[i]);
314 while (1)
315 ;
316}
317
318void *cont_thread(void *arg)
319{
320 char buff[MSG_SIZE];
321 int i, priority;
322
323 for (i = 0; i < num_cpus_to_pin; i++)
324 if (cpu_threads[i] == pthread_self())
325 break;
326 printf("\tStarted continuous mode thread %d on CPU %d\n", i,
327 cpus_to_pin[i]);
328 while (1) {
329 while (mq_send(queue, buff, sizeof(buff), 0) == 0)
330 ;
331 mq_receive(queue, buff, sizeof(buff), &priority);
332 }
333}
334
335#define drain_queue() \
336 while (mq_receive(queue, buff, MSG_SIZE, &prio_in) == MSG_SIZE)
337
338#define do_untimed_send() \
339 do { \
340 if (mq_send(queue, buff, MSG_SIZE, prio_out)) \
341 shutdown(3, "Test send failure", __LINE__); \
342 } while (0)
343
344#define do_send_recv() \
345 do { \
346 clock_gettime(clock, &start); \
347 if (mq_send(queue, buff, MSG_SIZE, prio_out)) \
348 shutdown(3, "Test send failure", __LINE__); \
349 clock_gettime(clock, &middle); \
350 if (mq_receive(queue, buff, MSG_SIZE, &prio_in) != MSG_SIZE) \
351 shutdown(3, "Test receive failure", __LINE__); \
352 clock_gettime(clock, &end); \
353 nsec = ((middle.tv_sec - start.tv_sec) * 1000000000) + \
354 (middle.tv_nsec - start.tv_nsec); \
355 send_total.tv_nsec += nsec; \
356 if (send_total.tv_nsec >= 1000000000) { \
357 send_total.tv_sec++; \
358 send_total.tv_nsec -= 1000000000; \
359 } \
360 nsec = ((end.tv_sec - middle.tv_sec) * 1000000000) + \
361 (end.tv_nsec - middle.tv_nsec); \
362 recv_total.tv_nsec += nsec; \
363 if (recv_total.tv_nsec >= 1000000000) { \
364 recv_total.tv_sec++; \
365 recv_total.tv_nsec -= 1000000000; \
366 } \
367 } while (0)
368
369struct test {
370 char *desc;
371 void (*func)(int *);
372};
373
374void const_prio(int *prio)
375{
376 return;
377}
378
379void inc_prio(int *prio)
380{
381 if (++*prio == mq_prio_max)
382 *prio = 0;
383}
384
385void dec_prio(int *prio)
386{
387 if (--*prio < 0)
388 *prio = mq_prio_max - 1;
389}
390
391void random_prio(int *prio)
392{
393 *prio = random() % mq_prio_max;
394}
395
396struct test test2[] = {
397 {"\n\tTest #2a: Time send/recv message, queue full, constant prio\n",
398 const_prio},
399 {"\n\tTest #2b: Time send/recv message, queue full, increasing prio\n",
400 inc_prio},
401 {"\n\tTest #2c: Time send/recv message, queue full, decreasing prio\n",
402 dec_prio},
403 {"\n\tTest #2d: Time send/recv message, queue full, random prio\n",
404 random_prio},
405 {NULL, NULL}
406};
407
408/**
409 * Tests to perform (all done with MSG_SIZE messages):
410 *
411 * 1) Time to add/remove message with 0 messages on queue
412 * 1a) with constant prio
413 * 2) Time to add/remove message when queue close to capacity:
414 * 2a) with constant prio
415 * 2b) with increasing prio
416 * 2c) with decreasing prio
417 * 2d) with random prio
418 * 3) Test limits of priorities honored (double check _SC_MQ_PRIO_MAX)
419 */
420void *perf_test_thread(void *arg)
421{
422 char buff[MSG_SIZE];
423 int prio_out, prio_in;
424 int i;
425 clockid_t clock;
426 pthread_t *t;
427 struct timespec res, start, middle, end, send_total, recv_total;
428 unsigned long long nsec;
429 struct test *cur_test;
430
431 t = &cpu_threads[0];
432 printf("\n\tStarted mqueue performance test thread on CPU %d\n",
433 cpus_to_pin[0]);
434 mq_prio_max = sysconf(_SC_MQ_PRIO_MAX);
435 if (mq_prio_max == -1)
436 shutdown(2, "sysconf(_SC_MQ_PRIO_MAX)", __LINE__);
437 if (pthread_getcpuclockid(cpu_threads[0], &clock) != 0)
438 shutdown(2, "pthread_getcpuclockid", __LINE__);
439
440 if (clock_getres(clock, &res))
441 shutdown(2, "clock_getres()", __LINE__);
442
443 printf("\t\tMax priorities:\t\t\t%d\n", mq_prio_max);
444 printf("\t\tClock resolution:\t\t%lu nsec%s\n", res.tv_nsec,
445 res.tv_nsec > 1 ? "s" : "");
446
447
448
449 printf("\n\tTest #1: Time send/recv message, queue empty\n");
450 printf("\t\t(%d iterations)\n", TEST1_LOOPS);
451 prio_out = 0;
452 send_total.tv_sec = 0;
453 send_total.tv_nsec = 0;
454 recv_total.tv_sec = 0;
455 recv_total.tv_nsec = 0;
456 for (i = 0; i < TEST1_LOOPS; i++)
457 do_send_recv();
458 printf("\t\tSend msg:\t\t\t%ld.%lus total time\n",
459 send_total.tv_sec, send_total.tv_nsec);
460 nsec = ((unsigned long long)send_total.tv_sec * 1000000000 +
461 send_total.tv_nsec) / TEST1_LOOPS;
462 printf("\t\t\t\t\t\t%lld nsec/msg\n", nsec);
463 printf("\t\tRecv msg:\t\t\t%ld.%lus total time\n",
464 recv_total.tv_sec, recv_total.tv_nsec);
465 nsec = ((unsigned long long)recv_total.tv_sec * 1000000000 +
466 recv_total.tv_nsec) / TEST1_LOOPS;
467 printf("\t\t\t\t\t\t%lld nsec/msg\n", nsec);
468
469
470 for (cur_test = test2; cur_test->desc != NULL; cur_test++) {
471 printf("%s:\n", cur_test->desc);
472 printf("\t\t(%d iterations)\n", TEST2_LOOPS);
473 prio_out = 0;
474 send_total.tv_sec = 0;
475 send_total.tv_nsec = 0;
476 recv_total.tv_sec = 0;
477 recv_total.tv_nsec = 0;
478 printf("\t\tFilling queue...");
479 fflush(stdout);
480 clock_gettime(clock, &start);
481 for (i = 0; i < result.mq_maxmsg - 1; i++) {
482 do_untimed_send();
483 cur_test->func(&prio_out);
484 }
485 clock_gettime(clock, &end);
486 nsec = ((unsigned long long)(end.tv_sec - start.tv_sec) *
487 1000000000) + (end.tv_nsec - start.tv_nsec);
488 printf("done.\t\t%lld.%llds\n", nsec / 1000000000,
489 nsec % 1000000000);
490 printf("\t\tTesting...");
491 fflush(stdout);
492 for (i = 0; i < TEST2_LOOPS; i++) {
493 do_send_recv();
494 cur_test->func(&prio_out);
495 }
496 printf("done.\n");
497 printf("\t\tSend msg:\t\t\t%ld.%lus total time\n",
498 send_total.tv_sec, send_total.tv_nsec);
499 nsec = ((unsigned long long)send_total.tv_sec * 1000000000 +
500 send_total.tv_nsec) / TEST2_LOOPS;
501 printf("\t\t\t\t\t\t%lld nsec/msg\n", nsec);
502 printf("\t\tRecv msg:\t\t\t%ld.%lus total time\n",
503 recv_total.tv_sec, recv_total.tv_nsec);
504 nsec = ((unsigned long long)recv_total.tv_sec * 1000000000 +
505 recv_total.tv_nsec) / TEST2_LOOPS;
506 printf("\t\t\t\t\t\t%lld nsec/msg\n", nsec);
507 printf("\t\tDraining queue...");
508 fflush(stdout);
509 clock_gettime(clock, &start);
510 drain_queue();
511 clock_gettime(clock, &end);
512 nsec = ((unsigned long long)(end.tv_sec - start.tv_sec) *
513 1000000000) + (end.tv_nsec - start.tv_nsec);
514 printf("done.\t\t%lld.%llds\n", nsec / 1000000000,
515 nsec % 1000000000);
516 }
517 return 0;
518}
519
520void increase_limits(void)
521{
522 cur_limits.rlim_cur = RLIM_INFINITY;
523 cur_limits.rlim_max = RLIM_INFINITY;
524 setr(RLIMIT_MSGQUEUE, &cur_limits);
525 while (try_set(max_msgs, cur_max_msgs += 10))
526 ;
527 cur_max_msgs = get(max_msgs);
528 while (try_set(max_msgsize, cur_max_msgsize += 1024))
529 ;
530 cur_max_msgsize = get(max_msgsize);
531 if (setpriority(PRIO_PROCESS, 0, -20) != 0)
532 shutdown(2, "setpriority()", __LINE__);
533 cur_nice = -20;
534}
535
536int main(int argc, char *argv[])
537{
538 struct mq_attr attr;
539 char *option, *next_option;
540 int i, cpu, rc;
541 struct sigaction sa;
542 poptContext popt_context;
543 void *retval;
544
545 main_thread = pthread_self();
546 num_cpus_to_pin = 0;
547
548 if (sysconf(_SC_NPROCESSORS_ONLN) == -1) {
549 perror("sysconf(_SC_NPROCESSORS_ONLN)");
550 exit(1);
551 }
552 cpus_online = min(MAX_CPUS, sysconf(_SC_NPROCESSORS_ONLN));
553 cpu_set = CPU_ALLOC(cpus_online);
554 if (cpu_set == NULL) {
555 perror("CPU_ALLOC()");
556 exit(1);
557 }
558 cpu_set_size = CPU_ALLOC_SIZE(cpus_online);
559 CPU_ZERO_S(cpu_set_size, cpu_set);
560
561 popt_context = poptGetContext(NULL, argc, (const char **)argv,
562 options, 0);
563
564 while ((rc = poptGetNextOpt(popt_context)) > 0) {
565 switch (rc) {
566 case 'c':
567 continuous_mode = 1;
568 option = cpu_option_string;
569 do {
570 next_option = strchr(option, ',');
571 if (next_option)
572 *next_option = '\0';
573 cpu = atoi(option);
574 if (cpu >= cpus_online)
575 fprintf(stderr, "CPU %d exceeds "
576 "cpus online, ignoring.\n",
577 cpu);
578 else
579 cpus_to_pin[num_cpus_to_pin++] = cpu;
580 if (next_option)
581 option = ++next_option;
582 } while (next_option && num_cpus_to_pin < MAX_CPUS);
583 /* Double check that they didn't give us the same CPU
584 * more than once */
585 for (cpu = 0; cpu < num_cpus_to_pin; cpu++) {
586 if (CPU_ISSET_S(cpus_to_pin[cpu], cpu_set_size,
587 cpu_set)) {
588 fprintf(stderr, "Any given CPU may "
589 "only be given once.\n");
590 exit(1);
591 } else
592 CPU_SET_S(cpus_to_pin[cpu],
593 cpu_set_size, cpu_set);
594 }
595 break;
596 case 'p':
597 /*
598 * Although we can create a msg queue with a
599 * non-absolute path name, unlink will fail. So,
600 * if the name doesn't start with a /, add one
601 * when we save it.
602 */
603 option = queue_path;
604 if (*option != '/') {
605 queue_path = malloc(strlen(option) + 2);
606 if (!queue_path) {
607 perror("malloc()");
608 exit(1);
609 }
610 queue_path[0] = '/';
611 queue_path[1] = 0;
612 strcat(queue_path, option);
613 free(option);
614 }
615 break;
616 }
617 }
618
619 if (continuous_mode && num_cpus_to_pin == 0) {
620 fprintf(stderr, "Must pass at least one CPU to continuous "
621 "mode.\n");
622 poptPrintUsage(popt_context, stderr, 0);
623 exit(1);
624 } else if (!continuous_mode) {
625 num_cpus_to_pin = 1;
626 cpus_to_pin[0] = cpus_online - 1;
627 }
628
629 if (getuid() != 0) {
630 fprintf(stderr, "Not running as root, but almost all tests "
631 "require root in order to modify\nsystem settings. "
632 "Exiting.\n");
633 exit(1);
634 }
635
636 max_msgs = fopen(MAX_MSGS, "r+");
637 max_msgsize = fopen(MAX_MSGSIZE, "r+");
638 if (!max_msgs)
639 shutdown(2, "Failed to open msg_max", __LINE__);
640 if (!max_msgsize)
641 shutdown(2, "Failed to open msgsize_max", __LINE__);
642
643 /* Load up the current system values for everything we can */
644 getr(RLIMIT_MSGQUEUE, &saved_limits);
645 cur_limits = saved_limits;
646 saved_max_msgs = cur_max_msgs = get(max_msgs);
647 saved_max_msgsize = cur_max_msgsize = get(max_msgsize);
648 errno = 0;
649 cur_nice = getpriority(PRIO_PROCESS, 0);
650 if (errno)
651 shutdown(2, "getpriority()", __LINE__);
652
653 /* Tell the user our initial state */
654 printf("\nInitial system state:\n");
655 printf("\tUsing queue path:\t\t\t%s\n", queue_path);
656 printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t%ld\n",
657 (long) saved_limits.rlim_cur);
658 printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t%ld\n",
659 (long) saved_limits.rlim_max);
660 printf("\tMaximum Message Size:\t\t\t%d\n", saved_max_msgsize);
661 printf("\tMaximum Queue Size:\t\t\t%d\n", saved_max_msgs);
662 printf("\tNice value:\t\t\t\t%d\n", cur_nice);
663 printf("\n");
664
665 increase_limits();
666
667 printf("Adjusted system state for testing:\n");
668 if (cur_limits.rlim_cur == RLIM_INFINITY) {
669 printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t(unlimited)\n");
670 printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t(unlimited)\n");
671 } else {
672 printf("\tRLIMIT_MSGQUEUE(soft):\t\t\t%ld\n",
673 (long) cur_limits.rlim_cur);
674 printf("\tRLIMIT_MSGQUEUE(hard):\t\t\t%ld\n",
675 (long) cur_limits.rlim_max);
676 }
677 printf("\tMaximum Message Size:\t\t\t%d\n", cur_max_msgsize);
678 printf("\tMaximum Queue Size:\t\t\t%d\n", cur_max_msgs);
679 printf("\tNice value:\t\t\t\t%d\n", cur_nice);
680 printf("\tContinuous mode:\t\t\t(%s)\n", continuous_mode ?
681 (continuous_mode_fake ? "fake mode" : "enabled") :
682 "disabled");
683 printf("\tCPUs to pin:\t\t\t\t%d", cpus_to_pin[0]);
684 for (cpu = 1; cpu < num_cpus_to_pin; cpu++)
685 printf(",%d", cpus_to_pin[cpu]);
686 printf("\n");
687
688 sa.sa_sigaction = sig_action_SIGUSR1;
689 sigemptyset(&sa.sa_mask);
690 sigaddset(&sa.sa_mask, SIGHUP);
691 sigaddset(&sa.sa_mask, SIGINT);
692 sigaddset(&sa.sa_mask, SIGQUIT);
693 sigaddset(&sa.sa_mask, SIGTERM);
694 sa.sa_flags = SA_SIGINFO;
695 if (sigaction(SIGUSR1, &sa, NULL) == -1)
696 shutdown(1, "sigaction(SIGUSR1)", __LINE__);
697 sa.sa_sigaction = sig_action;
698 if (sigaction(SIGHUP, &sa, NULL) == -1)
699 shutdown(1, "sigaction(SIGHUP)", __LINE__);
700 if (sigaction(SIGINT, &sa, NULL) == -1)
701 shutdown(1, "sigaction(SIGINT)", __LINE__);
702 if (sigaction(SIGQUIT, &sa, NULL) == -1)
703 shutdown(1, "sigaction(SIGQUIT)", __LINE__);
704 if (sigaction(SIGTERM, &sa, NULL) == -1)
705 shutdown(1, "sigaction(SIGTERM)", __LINE__);
706
707 if (!continuous_mode_fake) {
708 attr.mq_flags = O_NONBLOCK;
709 attr.mq_maxmsg = cur_max_msgs;
710 attr.mq_msgsize = MSG_SIZE;
711 open_queue(&attr);
712 }
713 for (i = 0; i < num_cpus_to_pin; i++) {
714 pthread_attr_t thread_attr;
715 void *thread_func;
716
717 if (continuous_mode_fake)
718 thread_func = &fake_cont_thread;
719 else if (continuous_mode)
720 thread_func = &cont_thread;
721 else
722 thread_func = &perf_test_thread;
723
724 CPU_ZERO_S(cpu_set_size, cpu_set);
725 CPU_SET_S(cpus_to_pin[i], cpu_set_size, cpu_set);
726 pthread_attr_init(&thread_attr);
727 pthread_attr_setaffinity_np(&thread_attr, cpu_set_size,
728 cpu_set);
729 if (pthread_create(&cpu_threads[i], &thread_attr, thread_func,
730 NULL))
731 shutdown(1, "pthread_create()", __LINE__);
732 pthread_attr_destroy(&thread_attr);
733 }
734
735 if (!continuous_mode) {
736 pthread_join(cpu_threads[0], &retval);
737 shutdown((long)retval, "perf_test_thread()", __LINE__);
738 } else {
739 while (1)
740 sleep(1);
741 }
742 shutdown(0, "", 0);
743}