Linux Audio

Check our new training course

Loading...
v6.2
  1# SPDX-License-Identifier: GPL-2.0
  2#
  3# Parses KTAP test results from a kernel dmesg log and incrementally prints
  4# results with reader-friendly format. Stores and returns test results in a
  5# Test object.
  6#
  7# Copyright (C) 2019, Google LLC.
  8# Author: Felix Guo <felixguoxiuping@gmail.com>
  9# Author: Brendan Higgins <brendanhiggins@google.com>
 10# Author: Rae Moar <rmoar@google.com>
 11
 12from __future__ import annotations
 13from dataclasses import dataclass
 14import re
 15import sys
 16import textwrap
 17
 
 
 18from enum import Enum, auto
 
 19from typing import Iterable, Iterator, List, Optional, Tuple
 20
 21from kunit_printer import stdout
 22
 23class Test:
 24	"""
 25	A class to represent a test parsed from KTAP results. All KTAP
 26	results within a test log are stored in a main Test object as
 27	subtests.
 28
 29	Attributes:
 30	status : TestStatus - status of the test
 31	name : str - name of the test
 32	expected_count : int - expected number of subtests (0 if single
 33		test case and None if unknown expected number of subtests)
 34	subtests : List[Test] - list of subtests
 35	log : List[str] - log of KTAP lines that correspond to the test
 36	counts : TestCounts - counts of the test statuses and errors of
 37		subtests or of the test itself if the test is a single
 38		test case.
 39	"""
 40	def __init__(self) -> None:
 41		"""Creates Test object with default attributes."""
 42		self.status = TestStatus.TEST_CRASHED
 43		self.name = ''
 44		self.expected_count = 0  # type: Optional[int]
 45		self.subtests = []  # type: List[Test]
 46		self.log = []  # type: List[str]
 47		self.counts = TestCounts()
 48
 49	def __str__(self) -> str:
 50		"""Returns string representation of a Test class object."""
 51		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
 52			f'{self.subtests}, {self.log}, {self.counts})')
 53
 54	def __repr__(self) -> str:
 55		"""Returns string representation of a Test class object."""
 56		return str(self)
 57
 58	def add_error(self, error_message: str) -> None:
 59		"""Records an error that occurred while parsing this test."""
 60		self.counts.errors += 1
 61		stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
 62
 63	def ok_status(self) -> bool:
 64		"""Returns true if the status was ok, i.e. passed or skipped."""
 65		return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
 
 
 
 66
 67class TestStatus(Enum):
 68	"""An enumeration class to represent the status of a test."""
 69	SUCCESS = auto()
 70	FAILURE = auto()
 71	SKIPPED = auto()
 72	TEST_CRASHED = auto()
 73	NO_TESTS = auto()
 74	FAILURE_TO_PARSE_TESTS = auto()
 75
 76@dataclass
 77class TestCounts:
 78	"""
 79	Tracks the counts of statuses of all test cases and any errors within
 80	a Test.
 81	"""
 82	passed: int = 0
 83	failed: int = 0
 84	crashed: int = 0
 85	skipped: int = 0
 86	errors: int = 0
 87
 88	def __str__(self) -> str:
 89		"""Returns the string representation of a TestCounts object."""
 90		statuses = [('passed', self.passed), ('failed', self.failed),
 91			('crashed', self.crashed), ('skipped', self.skipped),
 92			('errors', self.errors)]
 93		return f'Ran {self.total()} tests: ' + \
 94			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
 95
 96	def total(self) -> int:
 97		"""Returns the total number of test cases within a test
 98		object, where a test case is a test with no subtests.
 99		"""
100		return (self.passed + self.failed + self.crashed +
101			self.skipped)
102
103	def add_subtest_counts(self, counts: TestCounts) -> None:
104		"""
105		Adds the counts of another TestCounts object to the current
106		TestCounts object. Used to add the counts of a subtest to the
107		parent test.
108
109		Parameters:
110		counts - a different TestCounts object whose counts
111			will be added to the counts of the TestCounts object
112		"""
113		self.passed += counts.passed
114		self.failed += counts.failed
115		self.crashed += counts.crashed
116		self.skipped += counts.skipped
117		self.errors += counts.errors
118
119	def get_status(self) -> TestStatus:
120		"""Returns the aggregated status of a Test using test
121		counts.
122		"""
123		if self.total() == 0:
124			return TestStatus.NO_TESTS
125		if self.crashed:
126			# Crashes should take priority.
127			return TestStatus.TEST_CRASHED
128		if self.failed:
129			return TestStatus.FAILURE
130		if self.passed:
131			# No failures or crashes, looks good!
132			return TestStatus.SUCCESS
133		# We have only skipped tests.
134		return TestStatus.SKIPPED
135
136	def add_status(self, status: TestStatus) -> None:
137		"""Increments the count for `status`."""
138		if status == TestStatus.SUCCESS:
139			self.passed += 1
140		elif status == TestStatus.FAILURE:
141			self.failed += 1
142		elif status == TestStatus.SKIPPED:
143			self.skipped += 1
144		elif status != TestStatus.NO_TESTS:
145			self.crashed += 1
146
147class LineStream:
148	"""
149	A class to represent the lines of kernel output.
150	Provides a lazy peek()/pop() interface over an iterator of
151	(line#, text).
152	"""
153	_lines: Iterator[Tuple[int, str]]
154	_next: Tuple[int, str]
155	_need_next: bool
156	_done: bool
157
158	def __init__(self, lines: Iterator[Tuple[int, str]]):
159		"""Creates a new LineStream that wraps the given iterator."""
160		self._lines = lines
161		self._done = False
162		self._need_next = True
163		self._next = (0, '')
 
164
165	def _get_next(self) -> None:
166		"""Advances the LineSteam to the next line, if necessary."""
167		if not self._need_next:
168			return
169		try:
170			self._next = next(self._lines)
171		except StopIteration:
172			self._done = True
173		finally:
174			self._need_next = False
175
176	def peek(self) -> str:
177		"""Returns the current line, without advancing the LineStream.
178		"""
179		self._get_next()
180		return self._next[1]
181
182	def pop(self) -> str:
183		"""Returns the current line and advances the LineStream to
184		the next line.
185		"""
186		s = self.peek()
187		if self._done:
188			raise ValueError(f'LineStream: going past EOF, last line was {s}')
189		self._need_next = True
190		return s
191
192	def __bool__(self) -> bool:
193		"""Returns True if stream has more lines."""
194		self._get_next()
195		return not self._done
196
197	# Only used by kunit_tool_test.py.
198	def __iter__(self) -> Iterator[str]:
199		"""Empties all lines stored in LineStream object into
200		Iterator object and returns the Iterator object.
201		"""
202		while bool(self):
203			yield self.pop()
204
205	def line_number(self) -> int:
206		"""Returns the line number of the current line."""
207		self._get_next()
208		return self._next[0]
209
210# Parsing helper methods:
211
212KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
213TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
214KTAP_END = re.compile(r'\s*(List of all partitions:|'
215	'Kernel panic - not syncing: VFS:|reboot: System halted)')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218	"""Extracts KTAP lines from the kernel output."""
219	def isolate_ktap_output(kernel_output: Iterable[str]) \
220			-> Iterator[Tuple[int, str]]:
221		line_num = 0
222		started = False
223		for line in kernel_output:
224			line_num += 1
225			line = line.rstrip()  # remove trailing \n
226			if not started and KTAP_START.search(line):
227				# start extracting KTAP lines and set prefix
228				# to number of characters before version line
229				prefix_len = len(
230					line.split('KTAP version')[0])
231				started = True
232				yield line_num, line[prefix_len:]
233			elif not started and TAP_START.search(line):
234				# start extracting KTAP lines and set prefix
235				# to number of characters before version line
236				prefix_len = len(line.split('TAP version')[0])
237				started = True
238				yield line_num, line[prefix_len:]
239			elif started and KTAP_END.search(line):
240				# stop extracting KTAP lines
241				break
242			elif started:
243				# remove the prefix, if any.
244				line = line[prefix_len:]
245				yield line_num, line
246	return LineStream(lines=isolate_ktap_output(kernel_output))
247
248KTAP_VERSIONS = [1]
249TAP_VERSIONS = [13, 14]
250
251def check_version(version_num: int, accepted_versions: List[int],
252			version_type: str, test: Test) -> None:
253	"""
254	Adds error to test object if version number is too high or too
255	low.
256
257	Parameters:
258	version_num - The inputted version number from the parsed KTAP or TAP
259		header line
260	accepted_version - List of accepted KTAP or TAP versions
261	version_type - 'KTAP' or 'TAP' depending on the type of
262		version line.
263	test - Test object for current test being parsed
264	"""
265	if version_num < min(accepted_versions):
266		test.add_error(f'{version_type} version lower than expected!')
267	elif version_num > max(accepted_versions):
268		test.add_error(f'{version_type} version higer than expected!')
269
270def parse_ktap_header(lines: LineStream, test: Test) -> bool:
271	"""
272	Parses KTAP/TAP header line and checks version number.
273	Returns False if fails to parse KTAP/TAP header line.
274
275	Accepted formats:
276	- 'KTAP version [version number]'
277	- 'TAP version [version number]'
278
279	Parameters:
280	lines - LineStream of KTAP output to parse
281	test - Test object for current test being parsed
282
283	Return:
284	True if successfully parsed KTAP/TAP header line
285	"""
286	ktap_match = KTAP_START.match(lines.peek())
287	tap_match = TAP_START.match(lines.peek())
288	if ktap_match:
289		version_num = int(ktap_match.group(1))
290		check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
291	elif tap_match:
292		version_num = int(tap_match.group(1))
293		check_version(version_num, TAP_VERSIONS, 'TAP', test)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
294	else:
295		return False
296	lines.pop()
297	return True
298
299TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
 
300
301def parse_test_header(lines: LineStream, test: Test) -> bool:
302	"""
303	Parses test header and stores test name in test object.
304	Returns False if fails to parse test header line.
305
306	Accepted format:
307	- '# Subtest: [test name]'
308
309	Parameters:
310	lines - LineStream of KTAP output to parse
311	test - Test object for current test being parsed
312
313	Return:
314	True if successfully parsed test header line
315	"""
316	match = TEST_HEADER.match(lines.peek())
317	if not match:
318		return False
319	test.name = match.group(1)
320	lines.pop()
321	return True
322
323TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
324
325def parse_test_plan(lines: LineStream, test: Test) -> bool:
326	"""
327	Parses test plan line and stores the expected number of subtests in
328	test object. Reports an error if expected count is 0.
329	Returns False and sets expected_count to None if there is no valid test
330	plan.
331
332	Accepted format:
333	- '1..[number of subtests]'
334
335	Parameters:
336	lines - LineStream of KTAP output to parse
337	test - Test object for current test being parsed
338
339	Return:
340	True if successfully parsed test plan line
341	"""
342	match = TEST_PLAN.match(lines.peek())
343	if not match:
344		test.expected_count = None
345		return False
346	expected_count = int(match.group(1))
347	test.expected_count = expected_count
348	lines.pop()
349	return True
350
351TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
352
353TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
354
355def peek_test_name_match(lines: LineStream, test: Test) -> bool:
356	"""
357	Matches current line with the format of a test result line and checks
358	if the name matches the name of the current test.
359	Returns False if fails to match format or name.
360
361	Accepted format:
362	- '[ok|not ok] [test number] [-] [test name] [optional skip
363		directive]'
364
365	Parameters:
366	lines - LineStream of KTAP output to parse
367	test - Test object for current test being parsed
368
369	Return:
370	True if matched a test result line and the name matching the
371		expected test name
372	"""
373	line = lines.peek()
374	match = TEST_RESULT.match(line)
375	if not match:
 
 
 
 
 
 
376		return False
377	name = match.group(4)
378	return name == test.name
379
380def parse_test_result(lines: LineStream, test: Test,
381			expected_num: int) -> bool:
382	"""
383	Parses test result line and stores the status and name in the test
384	object. Reports an error if the test number does not match expected
385	test number.
386	Returns False if fails to parse test result line.
387
388	Note that the SKIP directive is the only direction that causes a
389	change in status.
390
391	Accepted format:
392	- '[ok|not ok] [test number] [-] [test name] [optional skip
393		directive]'
394
395	Parameters:
396	lines - LineStream of KTAP output to parse
397	test - Test object for current test being parsed
398	expected_num - expected test number for current test
399
400	Return:
401	True if successfully parsed a test result line.
402	"""
403	line = lines.peek()
404	match = TEST_RESULT.match(line)
405	skip_match = TEST_RESULT_SKIP.match(line)
406
407	# Check if line matches test result line format
408	if not match:
409		return False
410	lines.pop()
411
412	# Set name of test object
413	if skip_match:
414		test.name = skip_match.group(4)
415	else:
416		test.name = match.group(4)
417
418	# Check test num
419	num = int(match.group(2))
420	if num != expected_num:
421		test.add_error(f'Expected test number {expected_num} but found {num}')
422
423	# Set status of test object
424	status = match.group(1)
425	if skip_match:
426		test.status = TestStatus.SKIPPED
427	elif status == 'ok':
428		test.status = TestStatus.SUCCESS
429	else:
430		test.status = TestStatus.FAILURE
431	return True
432
433def parse_diagnostic(lines: LineStream) -> List[str]:
434	"""
435	Parse lines that do not match the format of a test result line or
436	test header line and returns them in list.
437
438	Line formats that are not parsed:
439	- '# Subtest: [test name]'
440	- '[ok|not ok] [test number] [-] [test name] [optional skip
441		directive]'
442	- 'KTAP version [version number]'
443
444	Parameters:
445	lines - LineStream of KTAP output to parse
446
447	Return:
448	Log of diagnostic lines
449	"""
450	log = []  # type: List[str]
451	non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START]
452	while lines and not any(re.match(lines.peek())
453			for re in non_diagnostic_lines):
454		log.append(lines.pop())
455	return log
456
 
457
458# Printing helper methods:
 
 
 
 
 
 
 
459
460DIVIDER = '=' * 60
 
 
 
 
 
 
 
 
 
 
461
462def format_test_divider(message: str, len_message: int) -> str:
463	"""
464	Returns string with message centered in fixed width divider.
465
466	Example:
467	'===================== message example ====================='
468
469	Parameters:
470	message - message to be centered in divider line
471	len_message - length of the message to be printed such that
472		any characters of the color codes are not counted
473
474	Return:
475	String containing message centered in fixed width divider
476	"""
477	default_count = 3  # default number of dashes
478	len_1 = default_count
479	len_2 = default_count
480	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
481	if difference > 0:
482		# calculate number of dashes for each side of the divider
483		len_1 = int(difference / 2)
484		len_2 = difference - len_1
485	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
486
487def print_test_header(test: Test) -> None:
488	"""
489	Prints test header with test name and optionally the expected number
490	of subtests.
491
492	Example:
493	'=================== example (2 subtests) ==================='
494
495	Parameters:
496	test - Test object representing current test being printed
497	"""
498	message = test.name
499	if message != "":
500		# Add a leading space before the subtest counts only if a test name
501		# is provided using a "# Subtest" header line.
502		message += " "
503	if test.expected_count:
504		if test.expected_count == 1:
505			message += '(1 subtest)'
506		else:
507			message += f'({test.expected_count} subtests)'
508	stdout.print_with_timestamp(format_test_divider(message, len(message)))
 
 
 
 
 
 
 
 
 
 
 
509
510def print_log(log: Iterable[str]) -> None:
511	"""Prints all strings in saved log for test in yellow."""
512	formatted = textwrap.dedent('\n'.join(log))
513	for line in formatted.splitlines():
514		stdout.print_with_timestamp(stdout.yellow(line))
515
516def format_test_result(test: Test) -> str:
517	"""
518	Returns string with formatted test result with colored status and test
519	name.
520
521	Example:
522	'[PASSED] example'
523
524	Parameters:
525	test - Test object representing current test being printed
526
527	Return:
528	String containing formatted test result
529	"""
530	if test.status == TestStatus.SUCCESS:
531		return stdout.green('[PASSED] ') + test.name
532	if test.status == TestStatus.SKIPPED:
533		return stdout.yellow('[SKIPPED] ') + test.name
534	if test.status == TestStatus.NO_TESTS:
535		return stdout.yellow('[NO TESTS RUN] ') + test.name
536	if test.status == TestStatus.TEST_CRASHED:
537		print_log(test.log)
538		return stdout.red('[CRASHED] ') + test.name
539	print_log(test.log)
540	return stdout.red('[FAILED] ') + test.name
541
542def print_test_result(test: Test) -> None:
543	"""
544	Prints result line with status of test.
545
546	Example:
547	'[PASSED] example'
548
549	Parameters:
550	test - Test object representing current test being printed
551	"""
552	stdout.print_with_timestamp(format_test_result(test))
553
554def print_test_footer(test: Test) -> None:
555	"""
556	Prints test footer with status of test.
557
558	Example:
559	'===================== [PASSED] example ====================='
560
561	Parameters:
562	test - Test object representing current test being printed
563	"""
564	message = format_test_result(test)
565	stdout.print_with_timestamp(format_test_divider(message,
566		len(message) - stdout.color_len()))
567
568
569
570def _summarize_failed_tests(test: Test) -> str:
571	"""Tries to summarize all the failing subtests in `test`."""
572
573	def failed_names(test: Test, parent_name: str) -> List[str]:
574		# Note: we use 'main' internally for the top-level test.
575		if not parent_name or parent_name == 'main':
576			full_name = test.name
577		else:
578			full_name = parent_name + '.' + test.name
579
580		if not test.subtests:  # this is a leaf node
581			return [full_name]
 
582
583		# If all the children failed, just say this subtest failed.
584		# Don't summarize it down "the top-level test failed", though.
585		failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
586		if parent_name and len(failed_subtests) ==  len(test.subtests):
587			return [full_name]
588
589		all_failures = []  # type: List[str]
590		for t in failed_subtests:
591			all_failures.extend(failed_names(t, full_name))
592		return all_failures
593
594	failures = failed_names(test, '')
595	# If there are too many failures, printing them out will just be noisy.
596	if len(failures) > 10:  # this is an arbitrary limit
597		return ''
598
599	return 'Failures: ' + ', '.join(failures)
600
601
602def print_summary_line(test: Test) -> None:
603	"""
604	Prints summary line of test object. Color of line is dependent on
605	status of test. Color is green if test passes, yellow if test is
606	skipped, and red if the test fails or crashes. Summary line contains
607	counts of the statuses of the tests subtests or the test itself if it
608	has no subtests.
609
610	Example:
611	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
612	Errors: 0"
613
614	test - Test object representing current test being printed
615	"""
616	if test.status == TestStatus.SUCCESS:
617		color = stdout.green
618	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
619		color = stdout.yellow
620	else:
621		color = stdout.red
622	stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
623
624	# Summarize failures that might have gone off-screen since we had a lot
625	# of tests (arbitrarily defined as >=100 for now).
626	if test.ok_status() or test.counts.total() < 100:
627		return
628	summarized = _summarize_failed_tests(test)
629	if not summarized:
630		return
631	stdout.print_with_timestamp(color(summarized))
632
633# Other methods:
634
635def bubble_up_test_results(test: Test) -> None:
636	"""
637	If the test has subtests, add the test counts of the subtests to the
638	test and check if any of the tests crashed and if so set the test
639	status to crashed. Otherwise if the test has no subtests add the
640	status of the test to the test counts.
641
642	Parameters:
643	test - Test object for current test being parsed
644	"""
645	subtests = test.subtests
646	counts = test.counts
647	status = test.status
648	for t in subtests:
649		counts.add_subtest_counts(t.counts)
650	if counts.total() == 0:
651		counts.add_status(status)
652	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
653		test.status = TestStatus.TEST_CRASHED
654
655def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
656	"""
657	Finds next test to parse in LineStream, creates new Test object,
658	parses any subtests of the test, populates Test object with all
659	information (status, name) about the test and the Test objects for
660	any subtests, and then returns the Test object. The method accepts
661	three formats of tests:
662
663	Accepted test formats:
664
665	- Main KTAP/TAP header
666
667	Example:
668
669	KTAP version 1
670	1..4
671	[subtests]
672
673	- Subtest header (must include either the KTAP version line or
674	  "# Subtest" header line)
675
676	Example (preferred format with both KTAP version line and
677	"# Subtest" line):
678
679	KTAP version 1
680	# Subtest: name
681	1..3
682	[subtests]
683	ok 1 name
684
685	Example (only "# Subtest" line):
686
687	# Subtest: name
688	1..3
689	[subtests]
690	ok 1 name
691
692	Example (only KTAP version line, compliant with KTAP v1 spec):
693
694	KTAP version 1
695	1..3
696	[subtests]
697	ok 1 name
698
699	- Test result line
700
701	Example:
702
703	ok 1 - test
704
705	Parameters:
706	lines - LineStream of KTAP output to parse
707	expected_num - expected test number for test to be parsed
708	log - list of strings containing any preceding diagnostic lines
709		corresponding to the current test
710	is_subtest - boolean indicating whether test is a subtest
711
712	Return:
713	Test object populated with characteristics and any subtests
714	"""
715	test = Test()
716	test.log.extend(log)
717	if not is_subtest:
718		# If parsing the main/top-level test, parse KTAP version line and
719		# test plan
720		test.name = "main"
721		ktap_line = parse_ktap_header(lines, test)
722		parse_test_plan(lines, test)
723		parent_test = True
724	else:
725		# If not the main test, attempt to parse a test header containing
726		# the KTAP version line and/or subtest header line
727		ktap_line = parse_ktap_header(lines, test)
728		subtest_line = parse_test_header(lines, test)
729		parent_test = (ktap_line or subtest_line)
730		if parent_test:
731			# If KTAP version line and/or subtest header is found, attempt
732			# to parse test plan and print test header
733			parse_test_plan(lines, test)
734			print_test_header(test)
735	expected_count = test.expected_count
736	subtests = []
737	test_num = 1
738	while parent_test and (expected_count is None or test_num <= expected_count):
739		# Loop to parse any subtests.
740		# Break after parsing expected number of tests or
741		# if expected number of tests is unknown break when test
742		# result line with matching name to subtest header is found
743		# or no more lines in stream.
744		sub_log = parse_diagnostic(lines)
745		sub_test = Test()
746		if not lines or (peek_test_name_match(lines, test) and
747				is_subtest):
748			if expected_count and test_num <= expected_count:
749				# If parser reaches end of test before
750				# parsing expected number of subtests, print
751				# crashed subtest and record error
752				test.add_error('missing expected subtest!')
753				sub_test.log.extend(sub_log)
754				test.counts.add_status(
755					TestStatus.TEST_CRASHED)
756				print_test_result(sub_test)
757			else:
758				test.log.extend(sub_log)
759				break
760		else:
761			sub_test = parse_test(lines, test_num, sub_log, True)
762		subtests.append(sub_test)
763		test_num += 1
764	test.subtests = subtests
765	if is_subtest:
766		# If not main test, look for test result line
767		test.log.extend(parse_diagnostic(lines))
768		if test.name != "" and not peek_test_name_match(lines, test):
769			test.add_error('missing subtest result line!')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
770		else:
771			parse_test_result(lines, test, expected_num)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
772
773	# Check for there being no subtests within parent test
774	if parent_test and len(subtests) == 0:
775		# Don't override a bad status if this test had one reported.
776		# Assumption: no subtests means CRASHED is from Test.__init__()
777		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
778			test.status = TestStatus.NO_TESTS
779			test.add_error('0 tests run!')
780
781	# Add statuses to TestCounts attribute in Test object
782	bubble_up_test_results(test)
783	if parent_test and is_subtest:
784		# If test has subtests and is not the main test object, print
785		# footer.
786		print_test_footer(test)
787	elif is_subtest:
788		print_test_result(test)
789	return test
790
791def parse_run_tests(kernel_output: Iterable[str]) -> Test:
792	"""
793	Using kernel output, extract KTAP lines, parse the lines for test
794	results and print condensed test results and summary line.
795
796	Parameters:
797	kernel_output - Iterable object contains lines of kernel output
798
799	Return:
800	Test - the main test object with all subtests.
801	"""
802	stdout.print_with_timestamp(DIVIDER)
803	lines = extract_tap_lines(kernel_output)
804	test = Test()
805	if not lines:
806		test.name = '<missing>'
807		test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
808		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
809	else:
810		test = parse_test(lines, 0, [], False)
811		if test.status != TestStatus.NO_TESTS:
812			test.status = test.counts.get_status()
813	stdout.print_with_timestamp(DIVIDER)
814	print_summary_line(test)
815	return test
 
 
 
 
 
 
v5.14.15
  1# SPDX-License-Identifier: GPL-2.0
  2#
  3# Parses test results from a kernel dmesg log.
 
 
  4#
  5# Copyright (C) 2019, Google LLC.
  6# Author: Felix Guo <felixguoxiuping@gmail.com>
  7# Author: Brendan Higgins <brendanhiggins@google.com>
 
  8
 
 
  9import re
 
 
 10
 11from collections import namedtuple
 12from datetime import datetime
 13from enum import Enum, auto
 14from functools import reduce
 15from typing import Iterable, Iterator, List, Optional, Tuple
 16
 17TestResult = namedtuple('TestResult', ['status','suites','log'])
 18
 19class TestSuite(object):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 20	def __init__(self) -> None:
 21		self.status = TestStatus.SUCCESS
 
 22		self.name = ''
 23		self.cases = []  # type: List[TestCase]
 
 
 
 24
 25	def __str__(self) -> str:
 26		return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
 
 
 27
 28	def __repr__(self) -> str:
 
 29		return str(self)
 30
 31class TestCase(object):
 32	def __init__(self) -> None:
 33		self.status = TestStatus.SUCCESS
 34		self.name = ''
 35		self.log = []  # type: List[str]
 36
 37	def __str__(self) -> str:
 38		return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
 39
 40	def __repr__(self) -> str:
 41		return str(self)
 42
 43class TestStatus(Enum):
 
 44	SUCCESS = auto()
 45	FAILURE = auto()
 46	SKIPPED = auto()
 47	TEST_CRASHED = auto()
 48	NO_TESTS = auto()
 49	FAILURE_TO_PARSE_TESTS = auto()
 50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 51class LineStream:
 52	"""Provides a peek()/pop() interface over an iterator of (line#, text)."""
 
 
 
 
 53	_lines: Iterator[Tuple[int, str]]
 54	_next: Tuple[int, str]
 
 55	_done: bool
 56
 57	def __init__(self, lines: Iterator[Tuple[int, str]]):
 
 58		self._lines = lines
 59		self._done = False
 
 60		self._next = (0, '')
 61		self._get_next()
 62
 63	def _get_next(self) -> None:
 
 
 
 64		try:
 65			self._next = next(self._lines)
 66		except StopIteration:
 67			self._done = True
 
 
 68
 69	def peek(self) -> str:
 
 
 
 70		return self._next[1]
 71
 72	def pop(self) -> str:
 73		n = self._next
 74		self._get_next()
 75		return n[1]
 
 
 
 
 
 76
 77	def __bool__(self) -> bool:
 
 
 78		return not self._done
 79
 80	# Only used by kunit_tool_test.py.
 81	def __iter__(self) -> Iterator[str]:
 
 
 
 82		while bool(self):
 83			yield self.pop()
 84
 85	def line_number(self) -> int:
 
 
 86		return self._next[0]
 87
 88kunit_start_re = re.compile(r'TAP version [0-9]+$')
 89kunit_end_re = re.compile('(List of all partitions:|'
 90			  'Kernel panic - not syncing: VFS:|reboot: System halted)')
 
 
 
 91
 92def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
 93	def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
 
 
 94		line_num = 0
 95		started = False
 96		for line in kernel_output:
 97			line_num += 1
 98			line = line.rstrip()  # line always has a trailing \n
 99			if kunit_start_re.search(line):
 
 
 
 
 
 
 
 
 
100				prefix_len = len(line.split('TAP version')[0])
101				started = True
102				yield line_num, line[prefix_len:]
103			elif kunit_end_re.search(line):
 
104				break
105			elif started:
106				yield line_num, line[prefix_len:]
107	return LineStream(lines=isolate_kunit_output(kernel_output))
108
109def raw_output(kernel_output) -> None:
110	for line in kernel_output:
111		print(line.rstrip())
112
113DIVIDER = '=' * 60
114
115RESET = '\033[0;0m'
116
117def red(text) -> str:
118	return '\033[1;31m' + text + RESET
119
120def yellow(text) -> str:
121	return '\033[1;33m' + text + RESET
122
123def green(text) -> str:
124	return '\033[1;32m' + text + RESET
125
126def print_with_timestamp(message) -> None:
127	print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
128
129def format_suite_divider(message) -> str:
130	return '======== ' + message + ' ========'
131
132def print_suite_divider(message) -> None:
133	print_with_timestamp(DIVIDER)
134	print_with_timestamp(format_suite_divider(message))
135
136def print_log(log) -> None:
137	for m in log:
138		print_with_timestamp(m)
139
140TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
141
142def consume_non_diagnostic(lines: LineStream) -> None:
143	while lines and not TAP_ENTRIES.match(lines.peek()):
144		lines.pop()
145
146def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
147	while lines and not TAP_ENTRIES.match(lines.peek()):
148		test_case.log.append(lines.peek())
149		lines.pop()
150
151OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
152
153OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
154
155OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
156
157OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
158
159def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
160	save_non_diagnostic(lines, test_case)
161	if not lines:
162		test_case.status = TestStatus.TEST_CRASHED
163		return True
164	line = lines.peek()
165	match = OK_NOT_OK_SUBTEST.match(line)
166	while not match and lines:
167		line = lines.pop()
168		match = OK_NOT_OK_SUBTEST.match(line)
169	if match:
170		test_case.log.append(lines.pop())
171		test_case.name = match.group(2)
172		skip_match = OK_NOT_OK_SKIP.match(line)
173		if skip_match:
174			test_case.status = TestStatus.SKIPPED
175			return True
176		if test_case.status == TestStatus.TEST_CRASHED:
177			return True
178		if match.group(1) == 'ok':
179			test_case.status = TestStatus.SUCCESS
180		else:
181			test_case.status = TestStatus.FAILURE
182		return True
183	else:
184		return False
 
 
185
186SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
187DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
188
189def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
190	save_non_diagnostic(lines, test_case)
191	if not lines:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192		return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193	line = lines.peek()
194	match = SUBTEST_DIAGNOSTIC.match(line)
195	if match:
196		test_case.log.append(lines.pop())
197		crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
198		if crash_match:
199			test_case.status = TestStatus.TEST_CRASHED
200		return True
201	else:
202		return False
 
 
203
204def parse_test_case(lines: LineStream) -> Optional[TestCase]:
205	test_case = TestCase()
206	save_non_diagnostic(lines, test_case)
207	while parse_diagnostic(lines, test_case):
208		pass
209	if parse_ok_not_ok_test_case(lines, test_case):
210		return test_case
211	else:
212		return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
214SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
 
 
 
215
216def parse_subtest_header(lines: LineStream) -> Optional[str]:
217	consume_non_diagnostic(lines)
218	if not lines:
219		return None
220	match = SUBTEST_HEADER.match(lines.peek())
221	if match:
222		lines.pop()
223		return match.group(1)
224	else:
225		return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
226
227SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
228
229def parse_subtest_plan(lines: LineStream) -> Optional[int]:
230	consume_non_diagnostic(lines)
231	match = SUBTEST_PLAN.match(lines.peek())
232	if match:
233		lines.pop()
234		return int(match.group(1))
235	else:
236		return None
237
238def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
239	if left == right:
240		return left
241	elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
242		return TestStatus.TEST_CRASHED
243	elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
244		return TestStatus.FAILURE
245	elif left == TestStatus.SKIPPED:
246		return right
247	else:
248		return left
249
250def parse_ok_not_ok_test_suite(lines: LineStream,
251			       test_suite: TestSuite,
252			       expected_suite_index: int) -> bool:
253	consume_non_diagnostic(lines)
254	if not lines:
255		test_suite.status = TestStatus.TEST_CRASHED
256		return False
257	line = lines.peek()
258	match = OK_NOT_OK_MODULE.match(line)
259	if match:
260		lines.pop()
261		if match.group(1) == 'ok':
262			test_suite.status = TestStatus.SUCCESS
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263		else:
264			test_suite.status = TestStatus.FAILURE
265		skip_match = OK_NOT_OK_SKIP.match(line)
266		if skip_match:
267			test_suite.status = TestStatus.SKIPPED
268		suite_index = int(match.group(2))
269		if suite_index != expected_suite_index:
270			print_with_timestamp(
271				red('[ERROR] ') + 'expected_suite_index ' +
272				str(expected_suite_index) + ', but got ' +
273				str(suite_index))
274		return True
275	else:
276		return False
277
278def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
279	return reduce(max_status, status_list, TestStatus.SKIPPED)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
280
281def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
282	max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
283	return max_status(max_test_case_status, test_suite.status)
284
285def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
286	if not lines:
287		return None
288	consume_non_diagnostic(lines)
289	test_suite = TestSuite()
290	test_suite.status = TestStatus.SUCCESS
291	name = parse_subtest_header(lines)
292	if not name:
293		return None
294	test_suite.name = name
295	expected_test_case_num = parse_subtest_plan(lines)
296	if expected_test_case_num is None:
297		return None
298	while expected_test_case_num > 0:
299		test_case = parse_test_case(lines)
300		if not test_case:
301			break
302		test_suite.cases.append(test_case)
303		expected_test_case_num -= 1
304	if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
305		test_suite.status = bubble_up_test_case_errors(test_suite)
306		return test_suite
307	elif not lines:
308		print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
309		return test_suite
310	else:
311		print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
312		return None
313
314TAP_HEADER = re.compile(r'^TAP version 14$')
315
316def parse_tap_header(lines: LineStream) -> bool:
317	consume_non_diagnostic(lines)
318	if TAP_HEADER.match(lines.peek()):
319		lines.pop()
320		return True
321	else:
322		return False
323
324TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
325
326def parse_test_plan(lines: LineStream) -> Optional[int]:
327	consume_non_diagnostic(lines)
328	match = TEST_PLAN.match(lines.peek())
329	if match:
330		lines.pop()
331		return int(match.group(1))
332	else:
333		return None
334
335def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
336	return bubble_up_errors(x.status for x in test_suites)
337
338def parse_test_result(lines: LineStream) -> TestResult:
339	consume_non_diagnostic(lines)
340	if not lines or not parse_tap_header(lines):
341		return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
342	expected_test_suite_num = parse_test_plan(lines)
343	if expected_test_suite_num == 0:
344		return TestResult(TestStatus.NO_TESTS, [], lines)
345	elif expected_test_suite_num is None:
346		return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
347	test_suites = []
348	for i in range(1, expected_test_suite_num + 1):
349		test_suite = parse_test_suite(lines, i)
350		if test_suite:
351			test_suites.append(test_suite)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
352		else:
353			print_with_timestamp(
354				red('[ERROR] ') + ' expected ' +
355				str(expected_test_suite_num) +
356				' test suites, but got ' + str(i - 2))
357			break
358	test_suite = parse_test_suite(lines, -1)
359	if test_suite:
360		print_with_timestamp(red('[ERROR] ') +
361			'got unexpected test suite: ' + test_suite.name)
362	if test_suites:
363		return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
364	else:
365		return TestResult(TestStatus.NO_TESTS, [], lines)
366
367class TestCounts:
368	passed: int
369	failed: int
370	crashed: int
371	skipped: int
372
373	def __init__(self):
374		self.passed = 0
375		self.failed = 0
376		self.crashed = 0
377		self.skipped = 0
378
379	def total(self) -> int:
380		return self.passed + self.failed + self.crashed + self.skipped
381
382def print_and_count_results(test_result: TestResult) -> TestCounts:
383	counts = TestCounts()
384	for test_suite in test_result.suites:
385		if test_suite.status == TestStatus.SUCCESS:
386			print_suite_divider(green('[PASSED] ') + test_suite.name)
387		elif test_suite.status == TestStatus.SKIPPED:
388			print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
389		elif test_suite.status == TestStatus.TEST_CRASHED:
390			print_suite_divider(red('[CRASHED] ' + test_suite.name))
391		else:
392			print_suite_divider(red('[FAILED] ') + test_suite.name)
393		for test_case in test_suite.cases:
394			if test_case.status == TestStatus.SUCCESS:
395				counts.passed += 1
396				print_with_timestamp(green('[PASSED] ') + test_case.name)
397			elif test_case.status == TestStatus.SKIPPED:
398				counts.skipped += 1
399				print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
400			elif test_case.status == TestStatus.TEST_CRASHED:
401				counts.crashed += 1
402				print_with_timestamp(red('[CRASHED] ' + test_case.name))
403				print_log(map(yellow, test_case.log))
404				print_with_timestamp('')
405			else:
406				counts.failed += 1
407				print_with_timestamp(red('[FAILED] ') + test_case.name)
408				print_log(map(yellow, test_case.log))
409				print_with_timestamp('')
410	return counts
411
412def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
413	counts = TestCounts()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
414	lines = extract_tap_lines(kernel_output)
415	test_result = parse_test_result(lines)
416	if test_result.status == TestStatus.NO_TESTS:
417		print(red('[ERROR] ') + yellow('no tests run!'))
418	elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
419		print(red('[ERROR] ') + yellow('could not parse test results!'))
420	else:
421		counts = print_and_count_results(test_result)
422	print_with_timestamp(DIVIDER)
423	if test_result.status == TestStatus.SUCCESS:
424		fmt = green
425	elif test_result.status == TestStatus.SKIPPED:
426		fmt = yellow
427	else:
428		fmt =red
429	print_with_timestamp(
430		fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
431		    (counts.total(), counts.failed, counts.crashed, counts.skipped)))
432	return test_result