Loading...
Note: File does not exist in v3.15.
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35 """Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39 """Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43 """An abstraction over command line operations performed on a source tree."""
44
45 def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46 self._linux_arch = linux_arch
47 self._cross_compile = cross_compile
48
49 def make_mrproper(self) -> None:
50 try:
51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52 except OSError as e:
53 raise ConfigError('Could not call make command: ' + str(e))
54 except subprocess.CalledProcessError as e:
55 raise ConfigError(e.output.decode())
56
57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58 return base_kunitconfig
59
60 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62 if self._cross_compile:
63 command += ['CROSS_COMPILE=' + self._cross_compile]
64 if make_options:
65 command.extend(make_options)
66 print('Populating config with:\n$', ' '.join(command))
67 try:
68 subprocess.check_output(command, stderr=subprocess.STDOUT)
69 except OSError as e:
70 raise ConfigError('Could not call make command: ' + str(e))
71 except subprocess.CalledProcessError as e:
72 raise ConfigError(e.output.decode())
73
74 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
76 if make_options:
77 command.extend(make_options)
78 if self._cross_compile:
79 command += ['CROSS_COMPILE=' + self._cross_compile]
80 print('Building with:\n$', ' '.join(command))
81 try:
82 proc = subprocess.Popen(command,
83 stderr=subprocess.PIPE,
84 stdout=subprocess.DEVNULL)
85 except OSError as e:
86 raise BuildError('Could not call execute make: ' + str(e))
87 except subprocess.CalledProcessError as e:
88 raise BuildError(e.output)
89 _, stderr = proc.communicate()
90 if proc.returncode != 0:
91 raise BuildError(stderr.decode())
92 if stderr: # likely only due to build warnings
93 print(stderr.decode())
94
95 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
96 raise RuntimeError('not implemented!')
97
98
99class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
100
101 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
102 super().__init__(linux_arch=qemu_arch_params.linux_arch,
103 cross_compile=cross_compile)
104 self._kconfig = qemu_arch_params.kconfig
105 self._qemu_arch = qemu_arch_params.qemu_arch
106 self._kernel_path = qemu_arch_params.kernel_path
107 self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
108 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
109 self._serial = qemu_arch_params.serial
110
111 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
112 kconfig = kunit_config.parse_from_string(self._kconfig)
113 kconfig.merge_in_entries(base_kunitconfig)
114 return kconfig
115
116 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
117 kernel_path = os.path.join(build_dir, self._kernel_path)
118 qemu_command = ['qemu-system-' + self._qemu_arch,
119 '-nodefaults',
120 '-m', '1024',
121 '-kernel', kernel_path,
122 '-append', ' '.join(params + [self._kernel_command_line]),
123 '-no-reboot',
124 '-nographic',
125 '-serial', self._serial] + self._extra_qemu_params
126 # Note: shlex.join() does what we want, but requires python 3.8+.
127 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
128 return subprocess.Popen(qemu_command,
129 stdin=subprocess.PIPE,
130 stdout=subprocess.PIPE,
131 stderr=subprocess.STDOUT,
132 text=True, errors='backslashreplace')
133
134class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
135 """An abstraction over command line operations performed on a source tree."""
136
137 def __init__(self, cross_compile: Optional[str]=None):
138 super().__init__(linux_arch='um', cross_compile=cross_compile)
139
140 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
141 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
142 kconfig.merge_in_entries(base_kunitconfig)
143 return kconfig
144
145 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
146 """Runs the Linux UML binary. Must be named 'linux'."""
147 linux_bin = os.path.join(build_dir, 'linux')
148 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
149 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
150 return subprocess.Popen([linux_bin] + params,
151 stdin=subprocess.PIPE,
152 stdout=subprocess.PIPE,
153 stderr=subprocess.STDOUT,
154 text=True, errors='backslashreplace')
155
156def get_kconfig_path(build_dir: str) -> str:
157 return os.path.join(build_dir, KCONFIG_PATH)
158
159def get_kunitconfig_path(build_dir: str) -> str:
160 return os.path.join(build_dir, KUNITCONFIG_PATH)
161
162def get_old_kunitconfig_path(build_dir: str) -> str:
163 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
164
165def get_parsed_kunitconfig(build_dir: str,
166 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
167 if not kunitconfig_paths:
168 path = get_kunitconfig_path(build_dir)
169 if not os.path.exists(path):
170 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
171 return kunit_config.parse_file(path)
172
173 merged = kunit_config.Kconfig()
174
175 for path in kunitconfig_paths:
176 if os.path.isdir(path):
177 path = os.path.join(path, KUNITCONFIG_PATH)
178 if not os.path.exists(path):
179 raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
180
181 partial = kunit_config.parse_file(path)
182 diff = merged.conflicting_options(partial)
183 if diff:
184 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff)
185 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
186 merged.merge_in_entries(partial)
187 return merged
188
189def get_outfile_path(build_dir: str) -> str:
190 return os.path.join(build_dir, OUTFILE_PATH)
191
192def _default_qemu_config_path(arch: str) -> str:
193 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
194 if os.path.isfile(config_path):
195 return config_path
196
197 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
198 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
199
200def _get_qemu_ops(config_path: str,
201 extra_qemu_args: Optional[List[str]],
202 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
203 # The module name/path has very little to do with where the actual file
204 # exists (I learned this through experimentation and could not find it
205 # anywhere in the Python documentation).
206 #
207 # Bascially, we completely ignore the actual file location of the config
208 # we are loading and just tell Python that the module lives in the
209 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
210 # exists as a file.
211 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
212 spec = importlib.util.spec_from_file_location(module_path, config_path)
213 assert spec is not None
214 config = importlib.util.module_from_spec(spec)
215 # See https://github.com/python/typeshed/pull/2626 for context.
216 assert isinstance(spec.loader, importlib.abc.Loader)
217 spec.loader.exec_module(config)
218
219 if not hasattr(config, 'QEMU_ARCH'):
220 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
221 params: qemu_config.QemuArchParams = config.QEMU_ARCH
222 if extra_qemu_args:
223 params.extra_qemu_params.extend(extra_qemu_args)
224 return params.linux_arch, LinuxSourceTreeOperationsQemu(
225 params, cross_compile=cross_compile)
226
227class LinuxSourceTree:
228 """Represents a Linux kernel source tree with KUnit tests."""
229
230 def __init__(
231 self,
232 build_dir: str,
233 kunitconfig_paths: Optional[List[str]]=None,
234 kconfig_add: Optional[List[str]]=None,
235 arch: Optional[str]=None,
236 cross_compile: Optional[str]=None,
237 qemu_config_path: Optional[str]=None,
238 extra_qemu_args: Optional[List[str]]=None) -> None:
239 signal.signal(signal.SIGINT, self.signal_handler)
240 if qemu_config_path:
241 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
242 else:
243 self._arch = 'um' if arch is None else arch
244 if self._arch == 'um':
245 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
246 else:
247 qemu_config_path = _default_qemu_config_path(self._arch)
248 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
249
250 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
251 if kconfig_add:
252 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
253 self._kconfig.merge_in_entries(kconfig)
254
255 def arch(self) -> str:
256 return self._arch
257
258 def clean(self) -> bool:
259 try:
260 self._ops.make_mrproper()
261 except ConfigError as e:
262 logging.error(e)
263 return False
264 return True
265
266 def validate_config(self, build_dir: str) -> bool:
267 kconfig_path = get_kconfig_path(build_dir)
268 validated_kconfig = kunit_config.parse_file(kconfig_path)
269 if self._kconfig.is_subset_of(validated_kconfig):
270 return True
271 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
272 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
273 'This is probably due to unsatisfied dependencies.\n' \
274 'Missing: ' + ', '.join(str(e) for e in missing)
275 if self._arch == 'um':
276 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
277 'on a different architecture with something like "--arch=x86_64".'
278 logging.error(message)
279 return False
280
281 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
282 kconfig_path = get_kconfig_path(build_dir)
283 if build_dir and not os.path.exists(build_dir):
284 os.mkdir(build_dir)
285 try:
286 self._kconfig = self._ops.make_arch_config(self._kconfig)
287 self._kconfig.write_to_file(kconfig_path)
288 self._ops.make_olddefconfig(build_dir, make_options)
289 except ConfigError as e:
290 logging.error(e)
291 return False
292 if not self.validate_config(build_dir):
293 return False
294
295 old_path = get_old_kunitconfig_path(build_dir)
296 if os.path.exists(old_path):
297 os.remove(old_path) # write_to_file appends to the file
298 self._kconfig.write_to_file(old_path)
299 return True
300
301 def _kunitconfig_changed(self, build_dir: str) -> bool:
302 old_path = get_old_kunitconfig_path(build_dir)
303 if not os.path.exists(old_path):
304 return True
305
306 old_kconfig = kunit_config.parse_file(old_path)
307 return old_kconfig != self._kconfig
308
309 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
310 """Creates a new .config if it is not a subset of the .kunitconfig."""
311 kconfig_path = get_kconfig_path(build_dir)
312 if not os.path.exists(kconfig_path):
313 print('Generating .config ...')
314 return self.build_config(build_dir, make_options)
315
316 existing_kconfig = kunit_config.parse_file(kconfig_path)
317 self._kconfig = self._ops.make_arch_config(self._kconfig)
318
319 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
320 return True
321 print('Regenerating .config ...')
322 os.remove(kconfig_path)
323 return self.build_config(build_dir, make_options)
324
325 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
326 try:
327 self._ops.make_olddefconfig(build_dir, make_options)
328 self._ops.make(jobs, build_dir, make_options)
329 except (ConfigError, BuildError) as e:
330 logging.error(e)
331 return False
332 return self.validate_config(build_dir)
333
334 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
335 if not args:
336 args = []
337 if filter_glob:
338 args.append('kunit.filter_glob=' + filter_glob)
339 if filter:
340 args.append('kunit.filter="' + filter + '"')
341 if filter_action:
342 args.append('kunit.filter_action=' + filter_action)
343 args.append('kunit.enable=1')
344
345 process = self._ops.start(args, build_dir)
346 assert process.stdout is not None # tell mypy it's set
347
348 # Enforce the timeout in a background thread.
349 def _wait_proc() -> None:
350 try:
351 process.wait(timeout=timeout)
352 except Exception as e:
353 print(e)
354 process.terminate()
355 process.wait()
356 waiter = threading.Thread(target=_wait_proc)
357 waiter.start()
358
359 output = open(get_outfile_path(build_dir), 'w')
360 try:
361 # Tee the output to the file and to our caller in real time.
362 for line in process.stdout:
363 output.write(line)
364 yield line
365 # This runs even if our caller doesn't consume every line.
366 finally:
367 # Flush any leftover output to the file
368 output.write(process.stdout.read())
369 output.close()
370 process.stdout.close()
371
372 waiter.join()
373 subprocess.call(['stty', 'sane'])
374
375 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
376 logging.error('Build interruption occurred. Cleaning console.')
377 subprocess.call(['stty', 'sane'])