Loading...
1# SPDX-License-Identifier: GPL-2.0
2# Copyright (c) 2020 SUSE LLC.
3
4import collections
5import functools
6import json
7import os
8import socket
9import subprocess
10import unittest
11
12
13# Add the source tree of bpftool and /usr/local/sbin to PATH
14cur_dir = os.path.dirname(os.path.realpath(__file__))
15bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
16 "tools", "bpf", "bpftool"))
17os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
18
19
20class IfaceNotFoundError(Exception):
21 pass
22
23
24class UnprivilegedUserError(Exception):
25 pass
26
27
28def _bpftool(args, json=True):
29 _args = ["bpftool"]
30 if json:
31 _args.append("-j")
32 _args.extend(args)
33
34 return subprocess.check_output(_args)
35
36
37def bpftool(args):
38 return _bpftool(args, json=False).decode("utf-8")
39
40
41def bpftool_json(args):
42 res = _bpftool(args)
43 return json.loads(res)
44
45
46def get_default_iface():
47 for iface in socket.if_nameindex():
48 if iface[1] != "lo":
49 return iface[1]
50 raise IfaceNotFoundError("Could not find any network interface to probe")
51
52
53def default_iface(f):
54 @functools.wraps(f)
55 def wrapper(*args, **kwargs):
56 iface = get_default_iface()
57 return f(*args, iface, **kwargs)
58 return wrapper
59
60DMESG_EMITTING_HELPERS = [
61 "bpf_probe_write_user",
62 "bpf_trace_printk",
63 "bpf_trace_vprintk",
64 ]
65
66class TestBpftool(unittest.TestCase):
67 @classmethod
68 def setUpClass(cls):
69 if os.getuid() != 0:
70 raise UnprivilegedUserError(
71 "This test suite needs root privileges")
72
73 @default_iface
74 def test_feature_dev_json(self, iface):
75 unexpected_helpers = DMESG_EMITTING_HELPERS
76 expected_keys = [
77 "syscall_config",
78 "program_types",
79 "map_types",
80 "helpers",
81 "misc",
82 ]
83
84 res = bpftool_json(["feature", "probe", "dev", iface])
85 # Check if the result has all expected keys.
86 self.assertCountEqual(res.keys(), expected_keys)
87 # Check if unexpected helpers are not included in helpers probes
88 # result.
89 for helpers in res["helpers"].values():
90 for unexpected_helper in unexpected_helpers:
91 self.assertNotIn(unexpected_helper, helpers)
92
93 def test_feature_kernel(self):
94 test_cases = [
95 bpftool_json(["feature", "probe", "kernel"]),
96 bpftool_json(["feature", "probe"]),
97 bpftool_json(["feature"]),
98 ]
99 unexpected_helpers = DMESG_EMITTING_HELPERS
100 expected_keys = [
101 "syscall_config",
102 "system_config",
103 "program_types",
104 "map_types",
105 "helpers",
106 "misc",
107 ]
108
109 for tc in test_cases:
110 # Check if the result has all expected keys.
111 self.assertCountEqual(tc.keys(), expected_keys)
112 # Check if unexpected helpers are not included in helpers probes
113 # result.
114 for helpers in tc["helpers"].values():
115 for unexpected_helper in unexpected_helpers:
116 self.assertNotIn(unexpected_helper, helpers)
117
118 def test_feature_kernel_full(self):
119 test_cases = [
120 bpftool_json(["feature", "probe", "kernel", "full"]),
121 bpftool_json(["feature", "probe", "full"]),
122 ]
123 expected_helpers = DMESG_EMITTING_HELPERS
124
125 for tc in test_cases:
126 # Check if expected helpers are included at least once in any
127 # helpers list for any program type. Unfortunately we cannot assume
128 # that they will be included in all program types or a specific
129 # subset of programs. It depends on the kernel version and
130 # configuration.
131 found_helpers = False
132
133 for helpers in tc["helpers"].values():
134 if all(expected_helper in helpers
135 for expected_helper in expected_helpers):
136 found_helpers = True
137 break
138
139 self.assertTrue(found_helpers)
140
141 def test_feature_kernel_full_vs_not_full(self):
142 full_res = bpftool_json(["feature", "probe", "full"])
143 not_full_res = bpftool_json(["feature", "probe"])
144 not_full_set = set()
145 full_set = set()
146
147 for helpers in full_res["helpers"].values():
148 for helper in helpers:
149 full_set.add(helper)
150
151 for helpers in not_full_res["helpers"].values():
152 for helper in helpers:
153 not_full_set.add(helper)
154
155 self.assertCountEqual(full_set - not_full_set,
156 set(DMESG_EMITTING_HELPERS))
157 self.assertCountEqual(not_full_set - full_set, set())
158
159 def test_feature_macros(self):
160 expected_patterns = [
161 r"/\*\*\* System call availability \*\*\*/",
162 r"#define HAVE_BPF_SYSCALL",
163 r"/\*\*\* eBPF program types \*\*\*/",
164 r"#define HAVE.*PROG_TYPE",
165 r"/\*\*\* eBPF map types \*\*\*/",
166 r"#define HAVE.*MAP_TYPE",
167 r"/\*\*\* eBPF helper functions \*\*\*/",
168 r"#define HAVE.*HELPER",
169 r"/\*\*\* eBPF misc features \*\*\*/",
170 ]
171
172 res = bpftool(["feature", "probe", "macros"])
173 for pattern in expected_patterns:
174 self.assertRegex(res, pattern)
1# SPDX-License-Identifier: GPL-2.0
2# Copyright (c) 2020 SUSE LLC.
3
4import collections
5import functools
6import json
7import os
8import socket
9import subprocess
10import unittest
11
12
13# Add the source tree of bpftool and /usr/local/sbin to PATH
14cur_dir = os.path.dirname(os.path.realpath(__file__))
15bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
16 "tools", "bpf", "bpftool"))
17os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
18
19
20class IfaceNotFoundError(Exception):
21 pass
22
23
24class UnprivilegedUserError(Exception):
25 pass
26
27
28def _bpftool(args, json=True):
29 _args = ["bpftool"]
30 if json:
31 _args.append("-j")
32 _args.extend(args)
33
34 return subprocess.check_output(_args)
35
36
37def bpftool(args):
38 return _bpftool(args, json=False).decode("utf-8")
39
40
41def bpftool_json(args):
42 res = _bpftool(args)
43 return json.loads(res)
44
45
46def get_default_iface():
47 for iface in socket.if_nameindex():
48 if iface[1] != "lo":
49 return iface[1]
50 raise IfaceNotFoundError("Could not find any network interface to probe")
51
52
53def default_iface(f):
54 @functools.wraps(f)
55 def wrapper(*args, **kwargs):
56 iface = get_default_iface()
57 return f(*args, iface, **kwargs)
58 return wrapper
59
60DMESG_EMITTING_HELPERS = [
61 "bpf_probe_write_user",
62 "bpf_trace_printk",
63 "bpf_trace_vprintk",
64 ]
65
66class TestBpftool(unittest.TestCase):
67 @classmethod
68 def setUpClass(cls):
69 if os.getuid() != 0:
70 raise UnprivilegedUserError(
71 "This test suite needs root privileges")
72
73 @default_iface
74 def test_feature_dev_json(self, iface):
75 unexpected_helpers = DMESG_EMITTING_HELPERS
76 expected_keys = [
77 "syscall_config",
78 "program_types",
79 "map_types",
80 "helpers",
81 "misc",
82 ]
83
84 res = bpftool_json(["feature", "probe", "dev", iface])
85 # Check if the result has all expected keys.
86 self.assertCountEqual(res.keys(), expected_keys)
87 # Check if unexpected helpers are not included in helpers probes
88 # result.
89 for helpers in res["helpers"].values():
90 for unexpected_helper in unexpected_helpers:
91 self.assertNotIn(unexpected_helper, helpers)
92
93 def test_feature_kernel(self):
94 test_cases = [
95 bpftool_json(["feature", "probe", "kernel"]),
96 bpftool_json(["feature", "probe"]),
97 bpftool_json(["feature"]),
98 ]
99 unexpected_helpers = DMESG_EMITTING_HELPERS
100 expected_keys = [
101 "syscall_config",
102 "system_config",
103 "program_types",
104 "map_types",
105 "helpers",
106 "misc",
107 ]
108
109 for tc in test_cases:
110 # Check if the result has all expected keys.
111 self.assertCountEqual(tc.keys(), expected_keys)
112 # Check if unexpected helpers are not included in helpers probes
113 # result.
114 for helpers in tc["helpers"].values():
115 for unexpected_helper in unexpected_helpers:
116 self.assertNotIn(unexpected_helper, helpers)
117
118 def test_feature_kernel_full(self):
119 test_cases = [
120 bpftool_json(["feature", "probe", "kernel", "full"]),
121 bpftool_json(["feature", "probe", "full"]),
122 ]
123 expected_helpers = DMESG_EMITTING_HELPERS
124
125 for tc in test_cases:
126 # Check if expected helpers are included at least once in any
127 # helpers list for any program type. Unfortunately we cannot assume
128 # that they will be included in all program types or a specific
129 # subset of programs. It depends on the kernel version and
130 # configuration.
131 found_helpers = False
132
133 for helpers in tc["helpers"].values():
134 if all(expected_helper in helpers
135 for expected_helper in expected_helpers):
136 found_helpers = True
137 break
138
139 self.assertTrue(found_helpers)
140
141 def test_feature_kernel_full_vs_not_full(self):
142 full_res = bpftool_json(["feature", "probe", "full"])
143 not_full_res = bpftool_json(["feature", "probe"])
144 not_full_set = set()
145 full_set = set()
146
147 for helpers in full_res["helpers"].values():
148 for helper in helpers:
149 full_set.add(helper)
150
151 for helpers in not_full_res["helpers"].values():
152 for helper in helpers:
153 not_full_set.add(helper)
154
155 self.assertCountEqual(full_set - not_full_set,
156 set(DMESG_EMITTING_HELPERS))
157 self.assertCountEqual(not_full_set - full_set, set())
158
159 def test_feature_macros(self):
160 expected_patterns = [
161 r"/\*\*\* System call availability \*\*\*/",
162 r"#define HAVE_BPF_SYSCALL",
163 r"/\*\*\* eBPF program types \*\*\*/",
164 r"#define HAVE.*PROG_TYPE",
165 r"/\*\*\* eBPF map types \*\*\*/",
166 r"#define HAVE.*MAP_TYPE",
167 r"/\*\*\* eBPF helper functions \*\*\*/",
168 r"#define HAVE.*HELPER",
169 r"/\*\*\* eBPF misc features \*\*\*/",
170 ]
171
172 res = bpftool(["feature", "probe", "macros"])
173 for pattern in expected_patterns:
174 self.assertRegex(res, pattern)