Linux Audio

Check our new training course

Loading...
v6.13.7
  1# SPDX-License-Identifier: GPL-2.0
  2#
  3# Parses KTAP test results from a kernel dmesg log and incrementally prints
  4# results with reader-friendly format. Stores and returns test results in a
  5# Test object.
  6#
  7# Copyright (C) 2019, Google LLC.
  8# Author: Felix Guo <felixguoxiuping@gmail.com>
  9# Author: Brendan Higgins <brendanhiggins@google.com>
 10# Author: Rae Moar <rmoar@google.com>
 11
 12from __future__ import annotations
 13from dataclasses import dataclass
 14import re
 15import textwrap
 16
 
 
 17from enum import Enum, auto
 
 18from typing import Iterable, Iterator, List, Optional, Tuple
 19
 20from kunit_printer import Printer, stdout
 21
 22class Test:
 23	"""
 24	A class to represent a test parsed from KTAP results. All KTAP
 25	results within a test log are stored in a main Test object as
 26	subtests.
 27
 28	Attributes:
 29	status : TestStatus - status of the test
 30	name : str - name of the test
 31	expected_count : int - expected number of subtests (0 if single
 32		test case and None if unknown expected number of subtests)
 33	subtests : List[Test] - list of subtests
 34	log : List[str] - log of KTAP lines that correspond to the test
 35	counts : TestCounts - counts of the test statuses and errors of
 36		subtests or of the test itself if the test is a single
 37		test case.
 38	"""
 39	def __init__(self) -> None:
 40		"""Creates Test object with default attributes."""
 41		self.status = TestStatus.TEST_CRASHED
 42		self.name = ''
 43		self.expected_count = 0  # type: Optional[int]
 44		self.subtests = []  # type: List[Test]
 45		self.log = []  # type: List[str]
 46		self.counts = TestCounts()
 47
 48	def __str__(self) -> str:
 49		"""Returns string representation of a Test class object."""
 50		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
 51			f'{self.subtests}, {self.log}, {self.counts})')
 52
 53	def __repr__(self) -> str:
 54		"""Returns string representation of a Test class object."""
 55		return str(self)
 56
 57	def add_error(self, printer: Printer, error_message: str) -> None:
 58		"""Records an error that occurred while parsing this test."""
 59		self.counts.errors += 1
 60		printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
 61
 62	def ok_status(self) -> bool:
 63		"""Returns true if the status was ok, i.e. passed or skipped."""
 64		return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
 
 
 
 65
 66class TestStatus(Enum):
 67	"""An enumeration class to represent the status of a test."""
 68	SUCCESS = auto()
 69	FAILURE = auto()
 70	SKIPPED = auto()
 71	TEST_CRASHED = auto()
 72	NO_TESTS = auto()
 73	FAILURE_TO_PARSE_TESTS = auto()
 74
 75@dataclass
 76class TestCounts:
 77	"""
 78	Tracks the counts of statuses of all test cases and any errors within
 79	a Test.
 80	"""
 81	passed: int = 0
 82	failed: int = 0
 83	crashed: int = 0
 84	skipped: int = 0
 85	errors: int = 0
 86
 87	def __str__(self) -> str:
 88		"""Returns the string representation of a TestCounts object."""
 89		statuses = [('passed', self.passed), ('failed', self.failed),
 90			('crashed', self.crashed), ('skipped', self.skipped),
 91			('errors', self.errors)]
 92		return f'Ran {self.total()} tests: ' + \
 93			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
 94
 95	def total(self) -> int:
 96		"""Returns the total number of test cases within a test
 97		object, where a test case is a test with no subtests.
 98		"""
 99		return (self.passed + self.failed + self.crashed +
100			self.skipped)
101
102	def add_subtest_counts(self, counts: TestCounts) -> None:
103		"""
104		Adds the counts of another TestCounts object to the current
105		TestCounts object. Used to add the counts of a subtest to the
106		parent test.
107
108		Parameters:
109		counts - a different TestCounts object whose counts
110			will be added to the counts of the TestCounts object
111		"""
112		self.passed += counts.passed
113		self.failed += counts.failed
114		self.crashed += counts.crashed
115		self.skipped += counts.skipped
116		self.errors += counts.errors
117
118	def get_status(self) -> TestStatus:
119		"""Returns the aggregated status of a Test using test
120		counts.
121		"""
122		if self.total() == 0:
123			return TestStatus.NO_TESTS
124		if self.crashed:
125			# Crashes should take priority.
126			return TestStatus.TEST_CRASHED
127		if self.failed:
128			return TestStatus.FAILURE
129		if self.passed:
130			# No failures or crashes, looks good!
131			return TestStatus.SUCCESS
132		# We have only skipped tests.
133		return TestStatus.SKIPPED
134
135	def add_status(self, status: TestStatus) -> None:
136		"""Increments the count for `status`."""
137		if status == TestStatus.SUCCESS:
138			self.passed += 1
139		elif status == TestStatus.FAILURE:
140			self.failed += 1
141		elif status == TestStatus.SKIPPED:
142			self.skipped += 1
143		elif status != TestStatus.NO_TESTS:
144			self.crashed += 1
145
146class LineStream:
147	"""
148	A class to represent the lines of kernel output.
149	Provides a lazy peek()/pop() interface over an iterator of
150	(line#, text).
151	"""
152	_lines: Iterator[Tuple[int, str]]
153	_next: Tuple[int, str]
154	_need_next: bool
155	_done: bool
156
157	def __init__(self, lines: Iterator[Tuple[int, str]]):
158		"""Creates a new LineStream that wraps the given iterator."""
159		self._lines = lines
160		self._done = False
161		self._need_next = True
162		self._next = (0, '')
 
163
164	def _get_next(self) -> None:
165		"""Advances the LineSteam to the next line, if necessary."""
166		if not self._need_next:
167			return
168		try:
169			self._next = next(self._lines)
170		except StopIteration:
171			self._done = True
172		finally:
173			self._need_next = False
174
175	def peek(self) -> str:
176		"""Returns the current line, without advancing the LineStream.
177		"""
178		self._get_next()
179		return self._next[1]
180
181	def pop(self) -> str:
182		"""Returns the current line and advances the LineStream to
183		the next line.
184		"""
185		s = self.peek()
186		if self._done:
187			raise ValueError(f'LineStream: going past EOF, last line was {s}')
188		self._need_next = True
189		return s
190
191	def __bool__(self) -> bool:
192		"""Returns True if stream has more lines."""
193		self._get_next()
194		return not self._done
195
196	# Only used by kunit_tool_test.py.
197	def __iter__(self) -> Iterator[str]:
198		"""Empties all lines stored in LineStream object into
199		Iterator object and returns the Iterator object.
200		"""
201		while bool(self):
202			yield self.pop()
203
204	def line_number(self) -> int:
205		"""Returns the line number of the current line."""
206		self._get_next()
207		return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214	'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218	"""Extracts KTAP lines from the kernel output."""
219	def isolate_ktap_output(kernel_output: Iterable[str]) \
220			-> Iterator[Tuple[int, str]]:
221		line_num = 0
222		started = False
223		for line in kernel_output:
224			line_num += 1
225			line = line.rstrip()  # remove trailing \n
226			if not started and KTAP_START.search(line):
227				# start extracting KTAP lines and set prefix
228				# to number of characters before version line
229				prefix_len = len(
230					line.split('KTAP version')[0])
231				started = True
232				yield line_num, line[prefix_len:]
233			elif not started and TAP_START.search(line):
234				# start extracting KTAP lines and set prefix
235				# to number of characters before version line
236				prefix_len = len(line.split('TAP version')[0])
237				started = True
238				yield line_num, line[prefix_len:]
239			elif started and KTAP_END.search(line):
240				# stop extracting KTAP lines
241				break
242			elif started:
243				# remove the prefix, if any.
244				line = line[prefix_len:]
245				yield line_num, line
246			elif EXECUTOR_ERROR.search(line):
247				yield line_num, line
248	return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254			version_type: str, test: Test, printer: Printer) -> None:
255	"""
256	Adds error to test object if version number is too high or too
257	low.
258
259	Parameters:
260	version_num - The inputted version number from the parsed KTAP or TAP
261		header line
262	accepted_version - List of accepted KTAP or TAP versions
263	version_type - 'KTAP' or 'TAP' depending on the type of
264		version line.
265	test - Test object for current test being parsed
266	printer - Printer object to output error
267	"""
268	if version_num < min(accepted_versions):
269		test.add_error(printer, f'{version_type} version lower than expected!')
270	elif version_num > max(accepted_versions):
271		test.add_error(printer, f'{version_type} version higer than expected!')
272
273def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
274	"""
275	Parses KTAP/TAP header line and checks version number.
276	Returns False if fails to parse KTAP/TAP header line.
277
278	Accepted formats:
279	- 'KTAP version [version number]'
280	- 'TAP version [version number]'
281
282	Parameters:
283	lines - LineStream of KTAP output to parse
284	test - Test object for current test being parsed
285	printer - Printer object to output results
286
287	Return:
288	True if successfully parsed KTAP/TAP header line
289	"""
290	ktap_match = KTAP_START.match(lines.peek())
291	tap_match = TAP_START.match(lines.peek())
292	if ktap_match:
293		version_num = int(ktap_match.group(1))
294		check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
295	elif tap_match:
296		version_num = int(tap_match.group(1))
297		check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
298	else:
299		return False
300	lines.pop()
301	return True
302
303TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
 
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306	"""
307	Parses test header and stores test name in test object.
308	Returns False if fails to parse test header line.
309
310	Accepted format:
311	- '# Subtest: [test name]'
312
313	Parameters:
314	lines - LineStream of KTAP output to parse
315	test - Test object for current test being parsed
316
317	Return:
318	True if successfully parsed test header line
319	"""
320	match = TEST_HEADER.match(lines.peek())
321	if not match:
322		return False
323	test.name = match.group(1)
324	lines.pop()
325	return True
326
327TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330	"""
331	Parses test plan line and stores the expected number of subtests in
332	test object. Reports an error if expected count is 0.
333	Returns False and sets expected_count to None if there is no valid test
334	plan.
335
336	Accepted format:
337	- '1..[number of subtests]'
338
339	Parameters:
340	lines - LineStream of KTAP output to parse
341	test - Test object for current test being parsed
342
343	Return:
344	True if successfully parsed test plan line
345	"""
346	match = TEST_PLAN.match(lines.peek())
347	if not match:
348		test.expected_count = None
349		return False
350	expected_count = int(match.group(1))
351	test.expected_count = expected_count
352	lines.pop()
353	return True
354
355TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360	"""
361	Matches current line with the format of a test result line and checks
362	if the name matches the name of the current test.
363	Returns False if fails to match format or name.
364
365	Accepted format:
366	- '[ok|not ok] [test number] [-] [test name] [optional skip
367		directive]'
368
369	Parameters:
370	lines - LineStream of KTAP output to parse
371	test - Test object for current test being parsed
372
373	Return:
374	True if matched a test result line and the name matching the
375		expected test name
376	"""
377	line = lines.peek()
378	match = TEST_RESULT.match(line)
379	if not match:
 
 
 
 
 
 
380		return False
381	name = match.group(4)
382	return name == test.name
383
384def parse_test_result(lines: LineStream, test: Test,
385			expected_num: int, printer: Printer) -> bool:
386	"""
387	Parses test result line and stores the status and name in the test
388	object. Reports an error if the test number does not match expected
389	test number.
390	Returns False if fails to parse test result line.
391
392	Note that the SKIP directive is the only direction that causes a
393	change in status.
394
395	Accepted format:
396	- '[ok|not ok] [test number] [-] [test name] [optional skip
397		directive]'
398
399	Parameters:
400	lines - LineStream of KTAP output to parse
401	test - Test object for current test being parsed
402	expected_num - expected test number for current test
403	printer - Printer object to output results
404
405	Return:
406	True if successfully parsed a test result line.
407	"""
408	line = lines.peek()
409	match = TEST_RESULT.match(line)
410	skip_match = TEST_RESULT_SKIP.match(line)
411
412	# Check if line matches test result line format
413	if not match:
414		return False
415	lines.pop()
416
417	# Set name of test object
418	if skip_match:
419		test.name = skip_match.group(4)
420	else:
421		test.name = match.group(4)
422
423	# Check test num
424	num = int(match.group(2))
425	if num != expected_num:
426		test.add_error(printer, f'Expected test number {expected_num} but found {num}')
427
428	# Set status of test object
429	status = match.group(1)
430	if skip_match:
431		test.status = TestStatus.SKIPPED
432	elif status == 'ok':
433		test.status = TestStatus.SUCCESS
434	else:
435		test.status = TestStatus.FAILURE
436	return True
437
438def parse_diagnostic(lines: LineStream) -> List[str]:
439	"""
440	Parse lines that do not match the format of a test result line or
441	test header line and returns them in list.
442
443	Line formats that are not parsed:
444	- '# Subtest: [test name]'
445	- '[ok|not ok] [test number] [-] [test name] [optional skip
446		directive]'
447	- 'KTAP version [version number]'
448
449	Parameters:
450	lines - LineStream of KTAP output to parse
451
452	Return:
453	Log of diagnostic lines
454	"""
455	log = []  # type: List[str]
456	non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
457	while lines and not any(re.match(lines.peek())
458			for re in non_diagnostic_lines):
459		log.append(lines.pop())
460	return log
461
 
462
463# Printing helper methods:
 
 
 
 
 
 
 
464
465DIVIDER = '=' * 60
 
 
 
 
 
 
 
 
 
 
466
467def format_test_divider(message: str, len_message: int) -> str:
468	"""
469	Returns string with message centered in fixed width divider.
470
471	Example:
472	'===================== message example ====================='
473
474	Parameters:
475	message - message to be centered in divider line
476	len_message - length of the message to be printed such that
477		any characters of the color codes are not counted
478
479	Return:
480	String containing message centered in fixed width divider
481	"""
482	default_count = 3  # default number of dashes
483	len_1 = default_count
484	len_2 = default_count
485	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
486	if difference > 0:
487		# calculate number of dashes for each side of the divider
488		len_1 = int(difference / 2)
489		len_2 = difference - len_1
490	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
491
492def print_test_header(test: Test, printer: Printer) -> None:
493	"""
494	Prints test header with test name and optionally the expected number
495	of subtests.
496
497	Example:
498	'=================== example (2 subtests) ==================='
499
500	Parameters:
501	test - Test object representing current test being printed
502	printer - Printer object to output results
503	"""
504	message = test.name
505	if message != "":
506		# Add a leading space before the subtest counts only if a test name
507		# is provided using a "# Subtest" header line.
508		message += " "
509	if test.expected_count:
510		if test.expected_count == 1:
511			message += '(1 subtest)'
512		else:
513			message += f'({test.expected_count} subtests)'
514	printer.print_with_timestamp(format_test_divider(message, len(message)))
 
 
 
 
 
 
 
 
 
 
 
515
516def print_log(log: Iterable[str], printer: Printer) -> None:
517	"""Prints all strings in saved log for test in yellow."""
518	formatted = textwrap.dedent('\n'.join(log))
519	for line in formatted.splitlines():
520		printer.print_with_timestamp(printer.yellow(line))
521
522def format_test_result(test: Test, printer: Printer) -> str:
523	"""
524	Returns string with formatted test result with colored status and test
525	name.
526
527	Example:
528	'[PASSED] example'
529
530	Parameters:
531	test - Test object representing current test being printed
532	printer - Printer object to output results
533
534	Return:
535	String containing formatted test result
536	"""
537	if test.status == TestStatus.SUCCESS:
538		return printer.green('[PASSED] ') + test.name
539	if test.status == TestStatus.SKIPPED:
540		return printer.yellow('[SKIPPED] ') + test.name
541	if test.status == TestStatus.NO_TESTS:
542		return printer.yellow('[NO TESTS RUN] ') + test.name
543	if test.status == TestStatus.TEST_CRASHED:
544		print_log(test.log, printer)
545		return stdout.red('[CRASHED] ') + test.name
546	print_log(test.log, printer)
547	return printer.red('[FAILED] ') + test.name
548
549def print_test_result(test: Test, printer: Printer) -> None:
550	"""
551	Prints result line with status of test.
552
553	Example:
554	'[PASSED] example'
555
556	Parameters:
557	test - Test object representing current test being printed
558	printer - Printer object
559	"""
560	printer.print_with_timestamp(format_test_result(test, printer))
561
562def print_test_footer(test: Test, printer: Printer) -> None:
563	"""
564	Prints test footer with status of test.
565
566	Example:
567	'===================== [PASSED] example ====================='
568
569	Parameters:
570	test - Test object representing current test being printed
571	printer - Printer object to output results
572	"""
573	message = format_test_result(test, printer)
574	printer.print_with_timestamp(format_test_divider(message,
575		len(message) - printer.color_len()))
576
577def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
578	"""
579	Prints Test object to given printer. For a child test, the result line is
580	printed. For a parent test, the test header, all child test results, and
581	the test footer are all printed. If failed_only is true, only failed/crashed
582	tests will be printed.
583
584	Parameters:
585	test - Test object to print
586	failed_only - True if only failed/crashed tests should be printed.
587	printer - Printer object to output results
588	"""
589	if test.name == "main":
590		printer.print_with_timestamp(DIVIDER)
591		for subtest in test.subtests:
592			print_test(subtest, failed_only, printer)
593		printer.print_with_timestamp(DIVIDER)
594	elif test.subtests != []:
595		if not failed_only or not test.ok_status():
596			print_test_header(test, printer)
597			for subtest in test.subtests:
598				print_test(subtest, failed_only, printer)
599			print_test_footer(test, printer)
600	else:
601		if not failed_only or not test.ok_status():
602			print_test_result(test, printer)
603
604def _summarize_failed_tests(test: Test) -> str:
605	"""Tries to summarize all the failing subtests in `test`."""
606
607	def failed_names(test: Test, parent_name: str) -> List[str]:
608		# Note: we use 'main' internally for the top-level test.
609		if not parent_name or parent_name == 'main':
610			full_name = test.name
611		else:
612			full_name = parent_name + '.' + test.name
613
614		if not test.subtests:  # this is a leaf node
615			return [full_name]
 
616
617		# If all the children failed, just say this subtest failed.
618		# Don't summarize it down "the top-level test failed", though.
619		failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
620		if parent_name and len(failed_subtests) ==  len(test.subtests):
621			return [full_name]
622
623		all_failures = []  # type: List[str]
624		for t in failed_subtests:
625			all_failures.extend(failed_names(t, full_name))
626		return all_failures
627
628	failures = failed_names(test, '')
629	# If there are too many failures, printing them out will just be noisy.
630	if len(failures) > 10:  # this is an arbitrary limit
631		return ''
632
633	return 'Failures: ' + ', '.join(failures)
634
635
636def print_summary_line(test: Test, printer: Printer) -> None:
637	"""
638	Prints summary line of test object. Color of line is dependent on
639	status of test. Color is green if test passes, yellow if test is
640	skipped, and red if the test fails or crashes. Summary line contains
641	counts of the statuses of the tests subtests or the test itself if it
642	has no subtests.
643
644	Example:
645	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
646	Errors: 0"
647
648	test - Test object representing current test being printed
649	printer - Printer object to output results
650	"""
651	if test.status == TestStatus.SUCCESS:
652		color = stdout.green
653	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
654		color = stdout.yellow
655	else:
656		color = stdout.red
657	printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
658
659	# Summarize failures that might have gone off-screen since we had a lot
660	# of tests (arbitrarily defined as >=100 for now).
661	if test.ok_status() or test.counts.total() < 100:
662		return
663	summarized = _summarize_failed_tests(test)
664	if not summarized:
665		return
666	printer.print_with_timestamp(color(summarized))
667
668# Other methods:
669
670def bubble_up_test_results(test: Test) -> None:
671	"""
672	If the test has subtests, add the test counts of the subtests to the
673	test and check if any of the tests crashed and if so set the test
674	status to crashed. Otherwise if the test has no subtests add the
675	status of the test to the test counts.
676
677	Parameters:
678	test - Test object for current test being parsed
679	"""
680	subtests = test.subtests
681	counts = test.counts
682	status = test.status
683	for t in subtests:
684		counts.add_subtest_counts(t.counts)
685	if counts.total() == 0:
686		counts.add_status(status)
687	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
688		test.status = TestStatus.TEST_CRASHED
689
690def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
691	"""
692	Finds next test to parse in LineStream, creates new Test object,
693	parses any subtests of the test, populates Test object with all
694	information (status, name) about the test and the Test objects for
695	any subtests, and then returns the Test object. The method accepts
696	three formats of tests:
697
698	Accepted test formats:
699
700	- Main KTAP/TAP header
701
702	Example:
703
704	KTAP version 1
705	1..4
706	[subtests]
707
708	- Subtest header (must include either the KTAP version line or
709	  "# Subtest" header line)
710
711	Example (preferred format with both KTAP version line and
712	"# Subtest" line):
713
714	KTAP version 1
715	# Subtest: name
716	1..3
717	[subtests]
718	ok 1 name
719
720	Example (only "# Subtest" line):
721
722	# Subtest: name
723	1..3
724	[subtests]
725	ok 1 name
726
727	Example (only KTAP version line, compliant with KTAP v1 spec):
728
729	KTAP version 1
730	1..3
731	[subtests]
732	ok 1 name
733
734	- Test result line
735
736	Example:
737
738	ok 1 - test
739
740	Parameters:
741	lines - LineStream of KTAP output to parse
742	expected_num - expected test number for test to be parsed
743	log - list of strings containing any preceding diagnostic lines
744		corresponding to the current test
745	is_subtest - boolean indicating whether test is a subtest
746	printer - Printer object to output results
747
748	Return:
749	Test object populated with characteristics and any subtests
750	"""
751	test = Test()
752	test.log.extend(log)
753
754	# Parse any errors prior to parsing tests
755	err_log = parse_diagnostic(lines)
756	test.log.extend(err_log)
757
758	if not is_subtest:
759		# If parsing the main/top-level test, parse KTAP version line and
760		# test plan
761		test.name = "main"
762		ktap_line = parse_ktap_header(lines, test, printer)
763		test.log.extend(parse_diagnostic(lines))
764		parse_test_plan(lines, test)
765		parent_test = True
766	else:
767		# If not the main test, attempt to parse a test header containing
768		# the KTAP version line and/or subtest header line
769		ktap_line = parse_ktap_header(lines, test, printer)
770		subtest_line = parse_test_header(lines, test)
771		parent_test = (ktap_line or subtest_line)
772		if parent_test:
773			# If KTAP version line and/or subtest header is found, attempt
774			# to parse test plan and print test header
775			test.log.extend(parse_diagnostic(lines))
776			parse_test_plan(lines, test)
777			print_test_header(test, printer)
778	expected_count = test.expected_count
779	subtests = []
780	test_num = 1
781	while parent_test and (expected_count is None or test_num <= expected_count):
782		# Loop to parse any subtests.
783		# Break after parsing expected number of tests or
784		# if expected number of tests is unknown break when test
785		# result line with matching name to subtest header is found
786		# or no more lines in stream.
787		sub_log = parse_diagnostic(lines)
788		sub_test = Test()
789		if not lines or (peek_test_name_match(lines, test) and
790				is_subtest):
791			if expected_count and test_num <= expected_count:
792				# If parser reaches end of test before
793				# parsing expected number of subtests, print
794				# crashed subtest and record error
795				test.add_error(printer, 'missing expected subtest!')
796				sub_test.log.extend(sub_log)
797				test.counts.add_status(
798					TestStatus.TEST_CRASHED)
799				print_test_result(sub_test, printer)
800			else:
801				test.log.extend(sub_log)
802				break
803		else:
804			sub_test = parse_test(lines, test_num, sub_log, True, printer)
805		subtests.append(sub_test)
806		test_num += 1
807	test.subtests = subtests
808	if is_subtest:
809		# If not main test, look for test result line
810		test.log.extend(parse_diagnostic(lines))
811		if test.name != "" and not peek_test_name_match(lines, test):
812			test.add_error(printer, 'missing subtest result line!')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
813		else:
814			parse_test_result(lines, test, expected_num, printer)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
815
816	# Check for there being no subtests within parent test
817	if parent_test and len(subtests) == 0:
818		# Don't override a bad status if this test had one reported.
819		# Assumption: no subtests means CRASHED is from Test.__init__()
820		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
821			print_log(test.log, printer)
822			test.status = TestStatus.NO_TESTS
823			test.add_error(printer, '0 tests run!')
824
825	# Add statuses to TestCounts attribute in Test object
826	bubble_up_test_results(test)
827	if parent_test and is_subtest:
828		# If test has subtests and is not the main test object, print
829		# footer.
830		print_test_footer(test, printer)
831	elif is_subtest:
832		print_test_result(test, printer)
833	return test
834
835def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
836	"""
837	Using kernel output, extract KTAP lines, parse the lines for test
838	results and print condensed test results and summary line.
839
840	Parameters:
841	kernel_output - Iterable object contains lines of kernel output
842	printer - Printer object to output results
843
844	Return:
845	Test - the main test object with all subtests.
846	"""
847	printer.print_with_timestamp(DIVIDER)
848	lines = extract_tap_lines(kernel_output)
849	test = Test()
850	if not lines:
851		test.name = '<missing>'
852		test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?')
853		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
854	else:
855		test = parse_test(lines, 0, [], False, printer)
856		if test.status != TestStatus.NO_TESTS:
857			test.status = test.counts.get_status()
858	printer.print_with_timestamp(DIVIDER)
859	return test
 
 
 
 
 
 
 
v5.14.15
  1# SPDX-License-Identifier: GPL-2.0
  2#
  3# Parses test results from a kernel dmesg log.
 
 
  4#
  5# Copyright (C) 2019, Google LLC.
  6# Author: Felix Guo <felixguoxiuping@gmail.com>
  7# Author: Brendan Higgins <brendanhiggins@google.com>
 
  8
 
 
  9import re
 
 10
 11from collections import namedtuple
 12from datetime import datetime
 13from enum import Enum, auto
 14from functools import reduce
 15from typing import Iterable, Iterator, List, Optional, Tuple
 16
 17TestResult = namedtuple('TestResult', ['status','suites','log'])
 18
 19class TestSuite(object):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 20	def __init__(self) -> None:
 21		self.status = TestStatus.SUCCESS
 
 22		self.name = ''
 23		self.cases = []  # type: List[TestCase]
 
 
 
 24
 25	def __str__(self) -> str:
 26		return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
 
 
 27
 28	def __repr__(self) -> str:
 
 29		return str(self)
 30
 31class TestCase(object):
 32	def __init__(self) -> None:
 33		self.status = TestStatus.SUCCESS
 34		self.name = ''
 35		self.log = []  # type: List[str]
 36
 37	def __str__(self) -> str:
 38		return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
 39
 40	def __repr__(self) -> str:
 41		return str(self)
 42
 43class TestStatus(Enum):
 
 44	SUCCESS = auto()
 45	FAILURE = auto()
 46	SKIPPED = auto()
 47	TEST_CRASHED = auto()
 48	NO_TESTS = auto()
 49	FAILURE_TO_PARSE_TESTS = auto()
 50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 51class LineStream:
 52	"""Provides a peek()/pop() interface over an iterator of (line#, text)."""
 
 
 
 
 53	_lines: Iterator[Tuple[int, str]]
 54	_next: Tuple[int, str]
 
 55	_done: bool
 56
 57	def __init__(self, lines: Iterator[Tuple[int, str]]):
 
 58		self._lines = lines
 59		self._done = False
 
 60		self._next = (0, '')
 61		self._get_next()
 62
 63	def _get_next(self) -> None:
 
 
 
 64		try:
 65			self._next = next(self._lines)
 66		except StopIteration:
 67			self._done = True
 
 
 68
 69	def peek(self) -> str:
 
 
 
 70		return self._next[1]
 71
 72	def pop(self) -> str:
 73		n = self._next
 74		self._get_next()
 75		return n[1]
 
 
 
 
 
 76
 77	def __bool__(self) -> bool:
 
 
 78		return not self._done
 79
 80	# Only used by kunit_tool_test.py.
 81	def __iter__(self) -> Iterator[str]:
 
 
 
 82		while bool(self):
 83			yield self.pop()
 84
 85	def line_number(self) -> int:
 
 
 86		return self._next[0]
 87
 88kunit_start_re = re.compile(r'TAP version [0-9]+$')
 89kunit_end_re = re.compile('(List of all partitions:|'
 90			  'Kernel panic - not syncing: VFS:|reboot: System halted)')
 
 
 
 
 91
 92def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
 93	def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
 
 
 94		line_num = 0
 95		started = False
 96		for line in kernel_output:
 97			line_num += 1
 98			line = line.rstrip()  # line always has a trailing \n
 99			if kunit_start_re.search(line):
 
 
 
 
 
 
 
 
 
100				prefix_len = len(line.split('TAP version')[0])
101				started = True
102				yield line_num, line[prefix_len:]
103			elif kunit_end_re.search(line):
 
104				break
105			elif started:
106				yield line_num, line[prefix_len:]
107	return LineStream(lines=isolate_kunit_output(kernel_output))
108
109def raw_output(kernel_output) -> None:
110	for line in kernel_output:
111		print(line.rstrip())
112
113DIVIDER = '=' * 60
114
115RESET = '\033[0;0m'
116
117def red(text) -> str:
118	return '\033[1;31m' + text + RESET
119
120def yellow(text) -> str:
121	return '\033[1;33m' + text + RESET
122
123def green(text) -> str:
124	return '\033[1;32m' + text + RESET
125
126def print_with_timestamp(message) -> None:
127	print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
128
129def format_suite_divider(message) -> str:
130	return '======== ' + message + ' ========'
131
132def print_suite_divider(message) -> None:
133	print_with_timestamp(DIVIDER)
134	print_with_timestamp(format_suite_divider(message))
135
136def print_log(log) -> None:
137	for m in log:
138		print_with_timestamp(m)
139
140TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
141
142def consume_non_diagnostic(lines: LineStream) -> None:
143	while lines and not TAP_ENTRIES.match(lines.peek()):
144		lines.pop()
145
146def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
147	while lines and not TAP_ENTRIES.match(lines.peek()):
148		test_case.log.append(lines.peek())
149		lines.pop()
150
151OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
152
153OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
154
155OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
156
157OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
158
159def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
160	save_non_diagnostic(lines, test_case)
161	if not lines:
162		test_case.status = TestStatus.TEST_CRASHED
163		return True
164	line = lines.peek()
165	match = OK_NOT_OK_SUBTEST.match(line)
166	while not match and lines:
167		line = lines.pop()
168		match = OK_NOT_OK_SUBTEST.match(line)
169	if match:
170		test_case.log.append(lines.pop())
171		test_case.name = match.group(2)
172		skip_match = OK_NOT_OK_SKIP.match(line)
173		if skip_match:
174			test_case.status = TestStatus.SKIPPED
175			return True
176		if test_case.status == TestStatus.TEST_CRASHED:
177			return True
178		if match.group(1) == 'ok':
179			test_case.status = TestStatus.SUCCESS
180		else:
181			test_case.status = TestStatus.FAILURE
182		return True
183	else:
184		return False
 
 
185
186SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
187DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
188
189def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
190	save_non_diagnostic(lines, test_case)
191	if not lines:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192		return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193	line = lines.peek()
194	match = SUBTEST_DIAGNOSTIC.match(line)
195	if match:
196		test_case.log.append(lines.pop())
197		crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
198		if crash_match:
199			test_case.status = TestStatus.TEST_CRASHED
200		return True
201	else:
202		return False
 
 
203
204def parse_test_case(lines: LineStream) -> Optional[TestCase]:
205	test_case = TestCase()
206	save_non_diagnostic(lines, test_case)
207	while parse_diagnostic(lines, test_case):
208		pass
209	if parse_ok_not_ok_test_case(lines, test_case):
210		return test_case
211	else:
212		return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
214SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
 
 
 
215
216def parse_subtest_header(lines: LineStream) -> Optional[str]:
217	consume_non_diagnostic(lines)
218	if not lines:
219		return None
220	match = SUBTEST_HEADER.match(lines.peek())
221	if match:
222		lines.pop()
223		return match.group(1)
224	else:
225		return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
226
227SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
228
229def parse_subtest_plan(lines: LineStream) -> Optional[int]:
230	consume_non_diagnostic(lines)
231	match = SUBTEST_PLAN.match(lines.peek())
232	if match:
233		lines.pop()
234		return int(match.group(1))
235	else:
236		return None
237
238def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
239	if left == right:
240		return left
241	elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
242		return TestStatus.TEST_CRASHED
243	elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
244		return TestStatus.FAILURE
245	elif left == TestStatus.SKIPPED:
246		return right
247	else:
248		return left
249
250def parse_ok_not_ok_test_suite(lines: LineStream,
251			       test_suite: TestSuite,
252			       expected_suite_index: int) -> bool:
253	consume_non_diagnostic(lines)
254	if not lines:
255		test_suite.status = TestStatus.TEST_CRASHED
256		return False
257	line = lines.peek()
258	match = OK_NOT_OK_MODULE.match(line)
259	if match:
260		lines.pop()
261		if match.group(1) == 'ok':
262			test_suite.status = TestStatus.SUCCESS
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263		else:
264			test_suite.status = TestStatus.FAILURE
265		skip_match = OK_NOT_OK_SKIP.match(line)
266		if skip_match:
267			test_suite.status = TestStatus.SKIPPED
268		suite_index = int(match.group(2))
269		if suite_index != expected_suite_index:
270			print_with_timestamp(
271				red('[ERROR] ') + 'expected_suite_index ' +
272				str(expected_suite_index) + ', but got ' +
273				str(suite_index))
274		return True
275	else:
276		return False
277
278def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
279	return reduce(max_status, status_list, TestStatus.SKIPPED)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
280
281def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
282	max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
283	return max_status(max_test_case_status, test_suite.status)
284
285def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
286	if not lines:
287		return None
288	consume_non_diagnostic(lines)
289	test_suite = TestSuite()
290	test_suite.status = TestStatus.SUCCESS
291	name = parse_subtest_header(lines)
292	if not name:
293		return None
294	test_suite.name = name
295	expected_test_case_num = parse_subtest_plan(lines)
296	if expected_test_case_num is None:
297		return None
298	while expected_test_case_num > 0:
299		test_case = parse_test_case(lines)
300		if not test_case:
301			break
302		test_suite.cases.append(test_case)
303		expected_test_case_num -= 1
304	if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
305		test_suite.status = bubble_up_test_case_errors(test_suite)
306		return test_suite
307	elif not lines:
308		print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
309		return test_suite
310	else:
311		print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
312		return None
313
314TAP_HEADER = re.compile(r'^TAP version 14$')
315
316def parse_tap_header(lines: LineStream) -> bool:
317	consume_non_diagnostic(lines)
318	if TAP_HEADER.match(lines.peek()):
319		lines.pop()
320		return True
321	else:
322		return False
323
324TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
325
326def parse_test_plan(lines: LineStream) -> Optional[int]:
327	consume_non_diagnostic(lines)
328	match = TEST_PLAN.match(lines.peek())
329	if match:
330		lines.pop()
331		return int(match.group(1))
332	else:
333		return None
334
335def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
336	return bubble_up_errors(x.status for x in test_suites)
337
338def parse_test_result(lines: LineStream) -> TestResult:
339	consume_non_diagnostic(lines)
340	if not lines or not parse_tap_header(lines):
341		return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
342	expected_test_suite_num = parse_test_plan(lines)
343	if expected_test_suite_num == 0:
344		return TestResult(TestStatus.NO_TESTS, [], lines)
345	elif expected_test_suite_num is None:
346		return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
347	test_suites = []
348	for i in range(1, expected_test_suite_num + 1):
349		test_suite = parse_test_suite(lines, i)
350		if test_suite:
351			test_suites.append(test_suite)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
352		else:
353			print_with_timestamp(
354				red('[ERROR] ') + ' expected ' +
355				str(expected_test_suite_num) +
356				' test suites, but got ' + str(i - 2))
357			break
358	test_suite = parse_test_suite(lines, -1)
359	if test_suite:
360		print_with_timestamp(red('[ERROR] ') +
361			'got unexpected test suite: ' + test_suite.name)
362	if test_suites:
363		return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
364	else:
365		return TestResult(TestStatus.NO_TESTS, [], lines)
366
367class TestCounts:
368	passed: int
369	failed: int
370	crashed: int
371	skipped: int
372
373	def __init__(self):
374		self.passed = 0
375		self.failed = 0
376		self.crashed = 0
377		self.skipped = 0
378
379	def total(self) -> int:
380		return self.passed + self.failed + self.crashed + self.skipped
381
382def print_and_count_results(test_result: TestResult) -> TestCounts:
383	counts = TestCounts()
384	for test_suite in test_result.suites:
385		if test_suite.status == TestStatus.SUCCESS:
386			print_suite_divider(green('[PASSED] ') + test_suite.name)
387		elif test_suite.status == TestStatus.SKIPPED:
388			print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
389		elif test_suite.status == TestStatus.TEST_CRASHED:
390			print_suite_divider(red('[CRASHED] ' + test_suite.name))
391		else:
392			print_suite_divider(red('[FAILED] ') + test_suite.name)
393		for test_case in test_suite.cases:
394			if test_case.status == TestStatus.SUCCESS:
395				counts.passed += 1
396				print_with_timestamp(green('[PASSED] ') + test_case.name)
397			elif test_case.status == TestStatus.SKIPPED:
398				counts.skipped += 1
399				print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
400			elif test_case.status == TestStatus.TEST_CRASHED:
401				counts.crashed += 1
402				print_with_timestamp(red('[CRASHED] ' + test_case.name))
403				print_log(map(yellow, test_case.log))
404				print_with_timestamp('')
405			else:
406				counts.failed += 1
407				print_with_timestamp(red('[FAILED] ') + test_case.name)
408				print_log(map(yellow, test_case.log))
409				print_with_timestamp('')
410	return counts
411
412def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
413	counts = TestCounts()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
414	lines = extract_tap_lines(kernel_output)
415	test_result = parse_test_result(lines)
416	if test_result.status == TestStatus.NO_TESTS:
417		print(red('[ERROR] ') + yellow('no tests run!'))
418	elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
419		print(red('[ERROR] ') + yellow('could not parse test results!'))
420	else:
421		counts = print_and_count_results(test_result)
422	print_with_timestamp(DIVIDER)
423	if test_result.status == TestStatus.SUCCESS:
424		fmt = green
425	elif test_result.status == TestStatus.SKIPPED:
426		fmt = yellow
427	else:
428		fmt =red
429	print_with_timestamp(
430		fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
431		    (counts.total(), counts.failed, counts.crashed, counts.skipped)))
432	return test_result