Loading...
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35 """Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39 """Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43 """An abstraction over command line operations performed on a source tree."""
44
45 def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46 self._linux_arch = linux_arch
47 self._cross_compile = cross_compile
48
49 def make_mrproper(self) -> None:
50 try:
51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52 except OSError as e:
53 raise ConfigError('Could not call make command: ' + str(e))
54 except subprocess.CalledProcessError as e:
55 raise ConfigError(e.output.decode())
56
57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58 return base_kunitconfig
59
60 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62 if self._cross_compile:
63 command += ['CROSS_COMPILE=' + self._cross_compile]
64 if make_options:
65 command.extend(make_options)
66 print('Populating config with:\n$', ' '.join(command))
67 try:
68 subprocess.check_output(command, stderr=subprocess.STDOUT)
69 except OSError as e:
70 raise ConfigError('Could not call make command: ' + str(e))
71 except subprocess.CalledProcessError as e:
72 raise ConfigError(e.output.decode())
73
74 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
76 if make_options:
77 command.extend(make_options)
78 if self._cross_compile:
79 command += ['CROSS_COMPILE=' + self._cross_compile]
80 print('Building with:\n$', ' '.join(command))
81 try:
82 proc = subprocess.Popen(command,
83 stderr=subprocess.PIPE,
84 stdout=subprocess.DEVNULL)
85 except OSError as e:
86 raise BuildError('Could not call execute make: ' + str(e))
87 except subprocess.CalledProcessError as e:
88 raise BuildError(e.output)
89 _, stderr = proc.communicate()
90 if proc.returncode != 0:
91 raise BuildError(stderr.decode())
92 if stderr: # likely only due to build warnings
93 print(stderr.decode())
94
95 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
96 raise RuntimeError('not implemented!')
97
98
99class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
100
101 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
102 super().__init__(linux_arch=qemu_arch_params.linux_arch,
103 cross_compile=cross_compile)
104 self._kconfig = qemu_arch_params.kconfig
105 self._qemu_arch = qemu_arch_params.qemu_arch
106 self._kernel_path = qemu_arch_params.kernel_path
107 self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
108 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
109 self._serial = qemu_arch_params.serial
110
111 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
112 kconfig = kunit_config.parse_from_string(self._kconfig)
113 kconfig.merge_in_entries(base_kunitconfig)
114 return kconfig
115
116 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
117 kernel_path = os.path.join(build_dir, self._kernel_path)
118 qemu_command = ['qemu-system-' + self._qemu_arch,
119 '-nodefaults',
120 '-m', '1024',
121 '-kernel', kernel_path,
122 '-append', ' '.join(params + [self._kernel_command_line]),
123 '-no-reboot',
124 '-nographic',
125 '-serial', self._serial] + self._extra_qemu_params
126 # Note: shlex.join() does what we want, but requires python 3.8+.
127 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
128 return subprocess.Popen(qemu_command,
129 stdin=subprocess.PIPE,
130 stdout=subprocess.PIPE,
131 stderr=subprocess.STDOUT,
132 text=True, errors='backslashreplace')
133
134class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
135 """An abstraction over command line operations performed on a source tree."""
136
137 def __init__(self, cross_compile: Optional[str]=None):
138 super().__init__(linux_arch='um', cross_compile=cross_compile)
139
140 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
141 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
142 kconfig.merge_in_entries(base_kunitconfig)
143 return kconfig
144
145 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
146 """Runs the Linux UML binary. Must be named 'linux'."""
147 linux_bin = os.path.join(build_dir, 'linux')
148 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
149 return subprocess.Popen([linux_bin] + params,
150 stdin=subprocess.PIPE,
151 stdout=subprocess.PIPE,
152 stderr=subprocess.STDOUT,
153 text=True, errors='backslashreplace')
154
155def get_kconfig_path(build_dir: str) -> str:
156 return os.path.join(build_dir, KCONFIG_PATH)
157
158def get_kunitconfig_path(build_dir: str) -> str:
159 return os.path.join(build_dir, KUNITCONFIG_PATH)
160
161def get_old_kunitconfig_path(build_dir: str) -> str:
162 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
163
164def get_parsed_kunitconfig(build_dir: str,
165 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
166 if not kunitconfig_paths:
167 path = get_kunitconfig_path(build_dir)
168 if not os.path.exists(path):
169 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
170 return kunit_config.parse_file(path)
171
172 merged = kunit_config.Kconfig()
173
174 for path in kunitconfig_paths:
175 if os.path.isdir(path):
176 path = os.path.join(path, KUNITCONFIG_PATH)
177 if not os.path.exists(path):
178 raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
179
180 partial = kunit_config.parse_file(path)
181 diff = merged.conflicting_options(partial)
182 if diff:
183 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff)
184 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
185 merged.merge_in_entries(partial)
186 return merged
187
188def get_outfile_path(build_dir: str) -> str:
189 return os.path.join(build_dir, OUTFILE_PATH)
190
191def _default_qemu_config_path(arch: str) -> str:
192 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
193 if os.path.isfile(config_path):
194 return config_path
195
196 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
197 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
198
199def _get_qemu_ops(config_path: str,
200 extra_qemu_args: Optional[List[str]],
201 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
202 # The module name/path has very little to do with where the actual file
203 # exists (I learned this through experimentation and could not find it
204 # anywhere in the Python documentation).
205 #
206 # Bascially, we completely ignore the actual file location of the config
207 # we are loading and just tell Python that the module lives in the
208 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
209 # exists as a file.
210 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
211 spec = importlib.util.spec_from_file_location(module_path, config_path)
212 assert spec is not None
213 config = importlib.util.module_from_spec(spec)
214 # See https://github.com/python/typeshed/pull/2626 for context.
215 assert isinstance(spec.loader, importlib.abc.Loader)
216 spec.loader.exec_module(config)
217
218 if not hasattr(config, 'QEMU_ARCH'):
219 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
220 params: qemu_config.QemuArchParams = config.QEMU_ARCH
221 if extra_qemu_args:
222 params.extra_qemu_params.extend(extra_qemu_args)
223 return params.linux_arch, LinuxSourceTreeOperationsQemu(
224 params, cross_compile=cross_compile)
225
226class LinuxSourceTree:
227 """Represents a Linux kernel source tree with KUnit tests."""
228
229 def __init__(
230 self,
231 build_dir: str,
232 kunitconfig_paths: Optional[List[str]]=None,
233 kconfig_add: Optional[List[str]]=None,
234 arch: Optional[str]=None,
235 cross_compile: Optional[str]=None,
236 qemu_config_path: Optional[str]=None,
237 extra_qemu_args: Optional[List[str]]=None) -> None:
238 signal.signal(signal.SIGINT, self.signal_handler)
239 if qemu_config_path:
240 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
241 else:
242 self._arch = 'um' if arch is None else arch
243 if self._arch == 'um':
244 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
245 else:
246 qemu_config_path = _default_qemu_config_path(self._arch)
247 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
248
249 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
250 if kconfig_add:
251 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
252 self._kconfig.merge_in_entries(kconfig)
253
254 def arch(self) -> str:
255 return self._arch
256
257 def clean(self) -> bool:
258 try:
259 self._ops.make_mrproper()
260 except ConfigError as e:
261 logging.error(e)
262 return False
263 return True
264
265 def validate_config(self, build_dir: str) -> bool:
266 kconfig_path = get_kconfig_path(build_dir)
267 validated_kconfig = kunit_config.parse_file(kconfig_path)
268 if self._kconfig.is_subset_of(validated_kconfig):
269 return True
270 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
271 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
272 'This is probably due to unsatisfied dependencies.\n' \
273 'Missing: ' + ', '.join(str(e) for e in missing)
274 if self._arch == 'um':
275 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
276 'on a different architecture with something like "--arch=x86_64".'
277 logging.error(message)
278 return False
279
280 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
281 kconfig_path = get_kconfig_path(build_dir)
282 if build_dir and not os.path.exists(build_dir):
283 os.mkdir(build_dir)
284 try:
285 self._kconfig = self._ops.make_arch_config(self._kconfig)
286 self._kconfig.write_to_file(kconfig_path)
287 self._ops.make_olddefconfig(build_dir, make_options)
288 except ConfigError as e:
289 logging.error(e)
290 return False
291 if not self.validate_config(build_dir):
292 return False
293
294 old_path = get_old_kunitconfig_path(build_dir)
295 if os.path.exists(old_path):
296 os.remove(old_path) # write_to_file appends to the file
297 self._kconfig.write_to_file(old_path)
298 return True
299
300 def _kunitconfig_changed(self, build_dir: str) -> bool:
301 old_path = get_old_kunitconfig_path(build_dir)
302 if not os.path.exists(old_path):
303 return True
304
305 old_kconfig = kunit_config.parse_file(old_path)
306 return old_kconfig != self._kconfig
307
308 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
309 """Creates a new .config if it is not a subset of the .kunitconfig."""
310 kconfig_path = get_kconfig_path(build_dir)
311 if not os.path.exists(kconfig_path):
312 print('Generating .config ...')
313 return self.build_config(build_dir, make_options)
314
315 existing_kconfig = kunit_config.parse_file(kconfig_path)
316 self._kconfig = self._ops.make_arch_config(self._kconfig)
317
318 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
319 return True
320 print('Regenerating .config ...')
321 os.remove(kconfig_path)
322 return self.build_config(build_dir, make_options)
323
324 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
325 try:
326 self._ops.make_olddefconfig(build_dir, make_options)
327 self._ops.make(jobs, build_dir, make_options)
328 except (ConfigError, BuildError) as e:
329 logging.error(e)
330 return False
331 return self.validate_config(build_dir)
332
333 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
334 if not args:
335 args = []
336 if filter_glob:
337 args.append('kunit.filter_glob=' + filter_glob)
338 if filter:
339 args.append('kunit.filter="' + filter + '"')
340 if filter_action:
341 args.append('kunit.filter_action=' + filter_action)
342 args.append('kunit.enable=1')
343
344 process = self._ops.start(args, build_dir)
345 assert process.stdout is not None # tell mypy it's set
346
347 # Enforce the timeout in a background thread.
348 def _wait_proc() -> None:
349 try:
350 process.wait(timeout=timeout)
351 except Exception as e:
352 print(e)
353 process.terminate()
354 process.wait()
355 waiter = threading.Thread(target=_wait_proc)
356 waiter.start()
357
358 output = open(get_outfile_path(build_dir), 'w')
359 try:
360 # Tee the output to the file and to our caller in real time.
361 for line in process.stdout:
362 output.write(line)
363 yield line
364 # This runs even if our caller doesn't consume every line.
365 finally:
366 # Flush any leftover output to the file
367 output.write(process.stdout.read())
368 output.close()
369 process.stdout.close()
370
371 waiter.join()
372 subprocess.call(['stty', 'sane'])
373
374 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
375 logging.error('Build interruption occurred. Cleaning console.')
376 subprocess.call(['stty', 'sane'])
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35 """Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39 """Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43 """An abstraction over command line operations performed on a source tree."""
44
45 def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46 self._linux_arch = linux_arch
47 self._cross_compile = cross_compile
48
49 def make_mrproper(self) -> None:
50 try:
51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52 except OSError as e:
53 raise ConfigError('Could not call make command: ' + str(e))
54 except subprocess.CalledProcessError as e:
55 raise ConfigError(e.output.decode())
56
57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58 return base_kunitconfig
59
60 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62 if self._cross_compile:
63 command += ['CROSS_COMPILE=' + self._cross_compile]
64 if make_options:
65 command.extend(make_options)
66 print('Populating config with:\n$', ' '.join(command))
67 try:
68 subprocess.check_output(command, stderr=subprocess.STDOUT)
69 except OSError as e:
70 raise ConfigError('Could not call make command: ' + str(e))
71 except subprocess.CalledProcessError as e:
72 raise ConfigError(e.output.decode())
73
74 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75 command = ['make', 'all', 'compile_commands.json', 'ARCH=' + self._linux_arch,
76 'O=' + build_dir, '--jobs=' + str(jobs)]
77 if make_options:
78 command.extend(make_options)
79 if self._cross_compile:
80 command += ['CROSS_COMPILE=' + self._cross_compile]
81 print('Building with:\n$', ' '.join(command))
82 try:
83 proc = subprocess.Popen(command,
84 stderr=subprocess.PIPE,
85 stdout=subprocess.DEVNULL)
86 except OSError as e:
87 raise BuildError('Could not call execute make: ' + str(e))
88 except subprocess.CalledProcessError as e:
89 raise BuildError(e.output)
90 _, stderr = proc.communicate()
91 if proc.returncode != 0:
92 raise BuildError(stderr.decode())
93 if stderr: # likely only due to build warnings
94 print(stderr.decode())
95
96 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
97 raise RuntimeError('not implemented!')
98
99
100class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
101
102 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
103 super().__init__(linux_arch=qemu_arch_params.linux_arch,
104 cross_compile=cross_compile)
105 self._kconfig = qemu_arch_params.kconfig
106 self._qemu_arch = qemu_arch_params.qemu_arch
107 self._kernel_path = qemu_arch_params.kernel_path
108 self._kernel_command_line = qemu_arch_params.kernel_command_line
109 if 'kunit_shutdown=' not in self._kernel_command_line:
110 self._kernel_command_line += ' kunit_shutdown=reboot'
111 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
112 self._serial = qemu_arch_params.serial
113
114 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
115 kconfig = kunit_config.parse_from_string(self._kconfig)
116 kconfig.merge_in_entries(base_kunitconfig)
117 return kconfig
118
119 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
120 kernel_path = os.path.join(build_dir, self._kernel_path)
121 qemu_command = ['qemu-system-' + self._qemu_arch,
122 '-nodefaults',
123 '-m', '1024',
124 '-kernel', kernel_path,
125 '-append', ' '.join(params + [self._kernel_command_line]),
126 '-no-reboot',
127 '-nographic',
128 '-serial', self._serial] + self._extra_qemu_params
129 # Note: shlex.join() does what we want, but requires python 3.8+.
130 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
131 return subprocess.Popen(qemu_command,
132 stdin=subprocess.PIPE,
133 stdout=subprocess.PIPE,
134 stderr=subprocess.STDOUT,
135 text=True, errors='backslashreplace')
136
137class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
138 """An abstraction over command line operations performed on a source tree."""
139
140 def __init__(self, cross_compile: Optional[str]=None):
141 super().__init__(linux_arch='um', cross_compile=cross_compile)
142
143 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
144 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
145 kconfig.merge_in_entries(base_kunitconfig)
146 return kconfig
147
148 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
149 """Runs the Linux UML binary. Must be named 'linux'."""
150 linux_bin = os.path.join(build_dir, 'linux')
151 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
152 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
153 return subprocess.Popen([linux_bin] + params,
154 stdin=subprocess.PIPE,
155 stdout=subprocess.PIPE,
156 stderr=subprocess.STDOUT,
157 text=True, errors='backslashreplace')
158
159def get_kconfig_path(build_dir: str) -> str:
160 return os.path.join(build_dir, KCONFIG_PATH)
161
162def get_kunitconfig_path(build_dir: str) -> str:
163 return os.path.join(build_dir, KUNITCONFIG_PATH)
164
165def get_old_kunitconfig_path(build_dir: str) -> str:
166 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
167
168def get_parsed_kunitconfig(build_dir: str,
169 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
170 if not kunitconfig_paths:
171 path = get_kunitconfig_path(build_dir)
172 if not os.path.exists(path):
173 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
174 return kunit_config.parse_file(path)
175
176 merged = kunit_config.Kconfig()
177
178 for path in kunitconfig_paths:
179 if os.path.isdir(path):
180 path = os.path.join(path, KUNITCONFIG_PATH)
181 if not os.path.exists(path):
182 raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
183
184 partial = kunit_config.parse_file(path)
185 diff = merged.conflicting_options(partial)
186 if diff:
187 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff)
188 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
189 merged.merge_in_entries(partial)
190 return merged
191
192def get_outfile_path(build_dir: str) -> str:
193 return os.path.join(build_dir, OUTFILE_PATH)
194
195def _default_qemu_config_path(arch: str) -> str:
196 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
197 if os.path.isfile(config_path):
198 return config_path
199
200 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
201 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
202
203def _get_qemu_ops(config_path: str,
204 extra_qemu_args: Optional[List[str]],
205 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
206 # The module name/path has very little to do with where the actual file
207 # exists (I learned this through experimentation and could not find it
208 # anywhere in the Python documentation).
209 #
210 # Bascially, we completely ignore the actual file location of the config
211 # we are loading and just tell Python that the module lives in the
212 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
213 # exists as a file.
214 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
215 spec = importlib.util.spec_from_file_location(module_path, config_path)
216 assert spec is not None
217 config = importlib.util.module_from_spec(spec)
218 # See https://github.com/python/typeshed/pull/2626 for context.
219 assert isinstance(spec.loader, importlib.abc.Loader)
220 spec.loader.exec_module(config)
221
222 if not hasattr(config, 'QEMU_ARCH'):
223 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
224 params: qemu_config.QemuArchParams = config.QEMU_ARCH
225 if extra_qemu_args:
226 params.extra_qemu_params.extend(extra_qemu_args)
227 return params.linux_arch, LinuxSourceTreeOperationsQemu(
228 params, cross_compile=cross_compile)
229
230class LinuxSourceTree:
231 """Represents a Linux kernel source tree with KUnit tests."""
232
233 def __init__(
234 self,
235 build_dir: str,
236 kunitconfig_paths: Optional[List[str]]=None,
237 kconfig_add: Optional[List[str]]=None,
238 arch: Optional[str]=None,
239 cross_compile: Optional[str]=None,
240 qemu_config_path: Optional[str]=None,
241 extra_qemu_args: Optional[List[str]]=None) -> None:
242 signal.signal(signal.SIGINT, self.signal_handler)
243 if qemu_config_path:
244 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
245 else:
246 self._arch = 'um' if arch is None else arch
247 if self._arch == 'um':
248 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
249 else:
250 qemu_config_path = _default_qemu_config_path(self._arch)
251 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
252
253 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
254 if kconfig_add:
255 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
256 self._kconfig.merge_in_entries(kconfig)
257
258 def arch(self) -> str:
259 return self._arch
260
261 def clean(self) -> bool:
262 try:
263 self._ops.make_mrproper()
264 except ConfigError as e:
265 logging.error(e)
266 return False
267 return True
268
269 def validate_config(self, build_dir: str) -> bool:
270 kconfig_path = get_kconfig_path(build_dir)
271 validated_kconfig = kunit_config.parse_file(kconfig_path)
272 if self._kconfig.is_subset_of(validated_kconfig):
273 return True
274 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
275 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
276 'This is probably due to unsatisfied dependencies.\n' \
277 'Missing: ' + ', '.join(str(e) for e in missing)
278 if self._arch == 'um':
279 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
280 'on a different architecture with something like "--arch=x86_64".'
281 logging.error(message)
282 return False
283
284 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
285 kconfig_path = get_kconfig_path(build_dir)
286 if build_dir and not os.path.exists(build_dir):
287 os.mkdir(build_dir)
288 try:
289 self._kconfig = self._ops.make_arch_config(self._kconfig)
290 self._kconfig.write_to_file(kconfig_path)
291 self._ops.make_olddefconfig(build_dir, make_options)
292 except ConfigError as e:
293 logging.error(e)
294 return False
295 if not self.validate_config(build_dir):
296 return False
297
298 old_path = get_old_kunitconfig_path(build_dir)
299 if os.path.exists(old_path):
300 os.remove(old_path) # write_to_file appends to the file
301 self._kconfig.write_to_file(old_path)
302 return True
303
304 def _kunitconfig_changed(self, build_dir: str) -> bool:
305 old_path = get_old_kunitconfig_path(build_dir)
306 if not os.path.exists(old_path):
307 return True
308
309 old_kconfig = kunit_config.parse_file(old_path)
310 return old_kconfig != self._kconfig
311
312 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
313 """Creates a new .config if it is not a subset of the .kunitconfig."""
314 kconfig_path = get_kconfig_path(build_dir)
315 if not os.path.exists(kconfig_path):
316 print('Generating .config ...')
317 return self.build_config(build_dir, make_options)
318
319 existing_kconfig = kunit_config.parse_file(kconfig_path)
320 self._kconfig = self._ops.make_arch_config(self._kconfig)
321
322 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
323 return True
324 print('Regenerating .config ...')
325 os.remove(kconfig_path)
326 return self.build_config(build_dir, make_options)
327
328 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
329 try:
330 self._ops.make_olddefconfig(build_dir, make_options)
331 self._ops.make(jobs, build_dir, make_options)
332 except (ConfigError, BuildError) as e:
333 logging.error(e)
334 return False
335 return self.validate_config(build_dir)
336
337 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
338 if not args:
339 args = []
340 if filter_glob:
341 args.append('kunit.filter_glob=' + filter_glob)
342 if filter:
343 args.append('kunit.filter="' + filter + '"')
344 if filter_action:
345 args.append('kunit.filter_action=' + filter_action)
346 args.append('kunit.enable=1')
347
348 process = self._ops.start(args, build_dir)
349 assert process.stdout is not None # tell mypy it's set
350
351 # Enforce the timeout in a background thread.
352 def _wait_proc() -> None:
353 try:
354 process.wait(timeout=timeout)
355 except Exception as e:
356 print(e)
357 process.terminate()
358 process.wait()
359 waiter = threading.Thread(target=_wait_proc)
360 waiter.start()
361
362 output = open(get_outfile_path(build_dir), 'w')
363 try:
364 # Tee the output to the file and to our caller in real time.
365 for line in process.stdout:
366 output.write(line)
367 yield line
368 # This runs even if our caller doesn't consume every line.
369 finally:
370 # Flush any leftover output to the file
371 output.write(process.stdout.read())
372 output.close()
373 process.stdout.close()
374
375 waiter.join()
376 subprocess.call(['stty', 'sane'])
377
378 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
379 logging.error('Build interruption occurred. Cleaning console.')
380 subprocess.call(['stty', 'sane'])