Linux Audio

Check our new training course

Loading...
v6.13.7
  1# SPDX-License-Identifier: GPL-2.0
  2#
  3# Parses KTAP test results from a kernel dmesg log and incrementally prints
  4# results with reader-friendly format. Stores and returns test results in a
  5# Test object.
  6#
  7# Copyright (C) 2019, Google LLC.
  8# Author: Felix Guo <felixguoxiuping@gmail.com>
  9# Author: Brendan Higgins <brendanhiggins@google.com>
 10# Author: Rae Moar <rmoar@google.com>
 11
 12from __future__ import annotations
 13from dataclasses import dataclass
 14import re
 15import textwrap
 16
 17from enum import Enum, auto
 18from typing import Iterable, Iterator, List, Optional, Tuple
 19
 20from kunit_printer import Printer, stdout
 21
 22class Test:
 23	"""
 24	A class to represent a test parsed from KTAP results. All KTAP
 25	results within a test log are stored in a main Test object as
 26	subtests.
 27
 28	Attributes:
 29	status : TestStatus - status of the test
 30	name : str - name of the test
 31	expected_count : int - expected number of subtests (0 if single
 32		test case and None if unknown expected number of subtests)
 33	subtests : List[Test] - list of subtests
 34	log : List[str] - log of KTAP lines that correspond to the test
 35	counts : TestCounts - counts of the test statuses and errors of
 36		subtests or of the test itself if the test is a single
 37		test case.
 38	"""
 39	def __init__(self) -> None:
 40		"""Creates Test object with default attributes."""
 41		self.status = TestStatus.TEST_CRASHED
 42		self.name = ''
 43		self.expected_count = 0  # type: Optional[int]
 44		self.subtests = []  # type: List[Test]
 45		self.log = []  # type: List[str]
 46		self.counts = TestCounts()
 47
 48	def __str__(self) -> str:
 49		"""Returns string representation of a Test class object."""
 50		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
 51			f'{self.subtests}, {self.log}, {self.counts})')
 52
 53	def __repr__(self) -> str:
 54		"""Returns string representation of a Test class object."""
 55		return str(self)
 56
 57	def add_error(self, printer: Printer, error_message: str) -> None:
 58		"""Records an error that occurred while parsing this test."""
 59		self.counts.errors += 1
 60		printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
 61
 62	def ok_status(self) -> bool:
 63		"""Returns true if the status was ok, i.e. passed or skipped."""
 64		return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
 65
 66class TestStatus(Enum):
 67	"""An enumeration class to represent the status of a test."""
 68	SUCCESS = auto()
 69	FAILURE = auto()
 70	SKIPPED = auto()
 71	TEST_CRASHED = auto()
 72	NO_TESTS = auto()
 73	FAILURE_TO_PARSE_TESTS = auto()
 74
 75@dataclass
 76class TestCounts:
 77	"""
 78	Tracks the counts of statuses of all test cases and any errors within
 79	a Test.
 80	"""
 81	passed: int = 0
 82	failed: int = 0
 83	crashed: int = 0
 84	skipped: int = 0
 85	errors: int = 0
 86
 87	def __str__(self) -> str:
 88		"""Returns the string representation of a TestCounts object."""
 89		statuses = [('passed', self.passed), ('failed', self.failed),
 90			('crashed', self.crashed), ('skipped', self.skipped),
 91			('errors', self.errors)]
 92		return f'Ran {self.total()} tests: ' + \
 93			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
 94
 95	def total(self) -> int:
 96		"""Returns the total number of test cases within a test
 97		object, where a test case is a test with no subtests.
 98		"""
 99		return (self.passed + self.failed + self.crashed +
100			self.skipped)
101
102	def add_subtest_counts(self, counts: TestCounts) -> None:
103		"""
104		Adds the counts of another TestCounts object to the current
105		TestCounts object. Used to add the counts of a subtest to the
106		parent test.
107
108		Parameters:
109		counts - a different TestCounts object whose counts
110			will be added to the counts of the TestCounts object
111		"""
112		self.passed += counts.passed
113		self.failed += counts.failed
114		self.crashed += counts.crashed
115		self.skipped += counts.skipped
116		self.errors += counts.errors
117
118	def get_status(self) -> TestStatus:
119		"""Returns the aggregated status of a Test using test
120		counts.
121		"""
122		if self.total() == 0:
123			return TestStatus.NO_TESTS
124		if self.crashed:
125			# Crashes should take priority.
126			return TestStatus.TEST_CRASHED
127		if self.failed:
128			return TestStatus.FAILURE
129		if self.passed:
130			# No failures or crashes, looks good!
131			return TestStatus.SUCCESS
132		# We have only skipped tests.
133		return TestStatus.SKIPPED
134
135	def add_status(self, status: TestStatus) -> None:
136		"""Increments the count for `status`."""
137		if status == TestStatus.SUCCESS:
138			self.passed += 1
139		elif status == TestStatus.FAILURE:
140			self.failed += 1
141		elif status == TestStatus.SKIPPED:
142			self.skipped += 1
143		elif status != TestStatus.NO_TESTS:
144			self.crashed += 1
145
146class LineStream:
147	"""
148	A class to represent the lines of kernel output.
149	Provides a lazy peek()/pop() interface over an iterator of
150	(line#, text).
151	"""
152	_lines: Iterator[Tuple[int, str]]
153	_next: Tuple[int, str]
154	_need_next: bool
155	_done: bool
156
157	def __init__(self, lines: Iterator[Tuple[int, str]]):
158		"""Creates a new LineStream that wraps the given iterator."""
159		self._lines = lines
160		self._done = False
161		self._need_next = True
162		self._next = (0, '')
163
164	def _get_next(self) -> None:
165		"""Advances the LineSteam to the next line, if necessary."""
166		if not self._need_next:
167			return
168		try:
169			self._next = next(self._lines)
170		except StopIteration:
171			self._done = True
172		finally:
173			self._need_next = False
174
175	def peek(self) -> str:
176		"""Returns the current line, without advancing the LineStream.
177		"""
178		self._get_next()
179		return self._next[1]
180
181	def pop(self) -> str:
182		"""Returns the current line and advances the LineStream to
183		the next line.
184		"""
185		s = self.peek()
186		if self._done:
187			raise ValueError(f'LineStream: going past EOF, last line was {s}')
188		self._need_next = True
189		return s
190
191	def __bool__(self) -> bool:
192		"""Returns True if stream has more lines."""
193		self._get_next()
194		return not self._done
195
196	# Only used by kunit_tool_test.py.
197	def __iter__(self) -> Iterator[str]:
198		"""Empties all lines stored in LineStream object into
199		Iterator object and returns the Iterator object.
200		"""
201		while bool(self):
202			yield self.pop()
203
204	def line_number(self) -> int:
205		"""Returns the line number of the current line."""
206		self._get_next()
207		return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214	'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218	"""Extracts KTAP lines from the kernel output."""
219	def isolate_ktap_output(kernel_output: Iterable[str]) \
220			-> Iterator[Tuple[int, str]]:
221		line_num = 0
222		started = False
223		for line in kernel_output:
224			line_num += 1
225			line = line.rstrip()  # remove trailing \n
226			if not started and KTAP_START.search(line):
227				# start extracting KTAP lines and set prefix
228				# to number of characters before version line
229				prefix_len = len(
230					line.split('KTAP version')[0])
231				started = True
232				yield line_num, line[prefix_len:]
233			elif not started and TAP_START.search(line):
234				# start extracting KTAP lines and set prefix
235				# to number of characters before version line
236				prefix_len = len(line.split('TAP version')[0])
237				started = True
238				yield line_num, line[prefix_len:]
239			elif started and KTAP_END.search(line):
240				# stop extracting KTAP lines
241				break
242			elif started:
243				# remove the prefix, if any.
244				line = line[prefix_len:]
245				yield line_num, line
246			elif EXECUTOR_ERROR.search(line):
247				yield line_num, line
248	return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254			version_type: str, test: Test, printer: Printer) -> None:
255	"""
256	Adds error to test object if version number is too high or too
257	low.
258
259	Parameters:
260	version_num - The inputted version number from the parsed KTAP or TAP
261		header line
262	accepted_version - List of accepted KTAP or TAP versions
263	version_type - 'KTAP' or 'TAP' depending on the type of
264		version line.
265	test - Test object for current test being parsed
266	printer - Printer object to output error
267	"""
268	if version_num < min(accepted_versions):
269		test.add_error(printer, f'{version_type} version lower than expected!')
270	elif version_num > max(accepted_versions):
271		test.add_error(printer, f'{version_type} version higer than expected!')
272
273def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
274	"""
275	Parses KTAP/TAP header line and checks version number.
276	Returns False if fails to parse KTAP/TAP header line.
277
278	Accepted formats:
279	- 'KTAP version [version number]'
280	- 'TAP version [version number]'
281
282	Parameters:
283	lines - LineStream of KTAP output to parse
284	test - Test object for current test being parsed
285	printer - Printer object to output results
286
287	Return:
288	True if successfully parsed KTAP/TAP header line
289	"""
290	ktap_match = KTAP_START.match(lines.peek())
291	tap_match = TAP_START.match(lines.peek())
292	if ktap_match:
293		version_num = int(ktap_match.group(1))
294		check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
295	elif tap_match:
296		version_num = int(tap_match.group(1))
297		check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
298	else:
299		return False
300	lines.pop()
301	return True
302
303TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306	"""
307	Parses test header and stores test name in test object.
308	Returns False if fails to parse test header line.
309
310	Accepted format:
311	- '# Subtest: [test name]'
312
313	Parameters:
314	lines - LineStream of KTAP output to parse
315	test - Test object for current test being parsed
316
317	Return:
318	True if successfully parsed test header line
319	"""
320	match = TEST_HEADER.match(lines.peek())
321	if not match:
322		return False
323	test.name = match.group(1)
324	lines.pop()
325	return True
326
327TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330	"""
331	Parses test plan line and stores the expected number of subtests in
332	test object. Reports an error if expected count is 0.
333	Returns False and sets expected_count to None if there is no valid test
334	plan.
335
336	Accepted format:
337	- '1..[number of subtests]'
338
339	Parameters:
340	lines - LineStream of KTAP output to parse
341	test - Test object for current test being parsed
342
343	Return:
344	True if successfully parsed test plan line
345	"""
346	match = TEST_PLAN.match(lines.peek())
347	if not match:
348		test.expected_count = None
349		return False
350	expected_count = int(match.group(1))
351	test.expected_count = expected_count
352	lines.pop()
353	return True
354
355TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360	"""
361	Matches current line with the format of a test result line and checks
362	if the name matches the name of the current test.
363	Returns False if fails to match format or name.
364
365	Accepted format:
366	- '[ok|not ok] [test number] [-] [test name] [optional skip
367		directive]'
368
369	Parameters:
370	lines - LineStream of KTAP output to parse
371	test - Test object for current test being parsed
372
373	Return:
374	True if matched a test result line and the name matching the
375		expected test name
376	"""
377	line = lines.peek()
378	match = TEST_RESULT.match(line)
379	if not match:
380		return False
381	name = match.group(4)
382	return name == test.name
383
384def parse_test_result(lines: LineStream, test: Test,
385			expected_num: int, printer: Printer) -> bool:
386	"""
387	Parses test result line and stores the status and name in the test
388	object. Reports an error if the test number does not match expected
389	test number.
390	Returns False if fails to parse test result line.
391
392	Note that the SKIP directive is the only direction that causes a
393	change in status.
394
395	Accepted format:
396	- '[ok|not ok] [test number] [-] [test name] [optional skip
397		directive]'
398
399	Parameters:
400	lines - LineStream of KTAP output to parse
401	test - Test object for current test being parsed
402	expected_num - expected test number for current test
403	printer - Printer object to output results
404
405	Return:
406	True if successfully parsed a test result line.
407	"""
408	line = lines.peek()
409	match = TEST_RESULT.match(line)
410	skip_match = TEST_RESULT_SKIP.match(line)
411
412	# Check if line matches test result line format
413	if not match:
414		return False
415	lines.pop()
416
417	# Set name of test object
418	if skip_match:
419		test.name = skip_match.group(4)
420	else:
421		test.name = match.group(4)
422
423	# Check test num
424	num = int(match.group(2))
425	if num != expected_num:
426		test.add_error(printer, f'Expected test number {expected_num} but found {num}')
427
428	# Set status of test object
429	status = match.group(1)
430	if skip_match:
431		test.status = TestStatus.SKIPPED
432	elif status == 'ok':
433		test.status = TestStatus.SUCCESS
434	else:
435		test.status = TestStatus.FAILURE
436	return True
437
438def parse_diagnostic(lines: LineStream) -> List[str]:
439	"""
440	Parse lines that do not match the format of a test result line or
441	test header line and returns them in list.
442
443	Line formats that are not parsed:
444	- '# Subtest: [test name]'
445	- '[ok|not ok] [test number] [-] [test name] [optional skip
446		directive]'
447	- 'KTAP version [version number]'
448
449	Parameters:
450	lines - LineStream of KTAP output to parse
451
452	Return:
453	Log of diagnostic lines
454	"""
455	log = []  # type: List[str]
456	non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
457	while lines and not any(re.match(lines.peek())
458			for re in non_diagnostic_lines):
459		log.append(lines.pop())
460	return log
461
462
463# Printing helper methods:
464
465DIVIDER = '=' * 60
466
467def format_test_divider(message: str, len_message: int) -> str:
468	"""
469	Returns string with message centered in fixed width divider.
470
471	Example:
472	'===================== message example ====================='
473
474	Parameters:
475	message - message to be centered in divider line
476	len_message - length of the message to be printed such that
477		any characters of the color codes are not counted
478
479	Return:
480	String containing message centered in fixed width divider
481	"""
482	default_count = 3  # default number of dashes
483	len_1 = default_count
484	len_2 = default_count
485	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
486	if difference > 0:
487		# calculate number of dashes for each side of the divider
488		len_1 = int(difference / 2)
489		len_2 = difference - len_1
490	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
491
492def print_test_header(test: Test, printer: Printer) -> None:
493	"""
494	Prints test header with test name and optionally the expected number
495	of subtests.
496
497	Example:
498	'=================== example (2 subtests) ==================='
499
500	Parameters:
501	test - Test object representing current test being printed
502	printer - Printer object to output results
503	"""
504	message = test.name
505	if message != "":
506		# Add a leading space before the subtest counts only if a test name
507		# is provided using a "# Subtest" header line.
508		message += " "
509	if test.expected_count:
510		if test.expected_count == 1:
511			message += '(1 subtest)'
512		else:
513			message += f'({test.expected_count} subtests)'
514	printer.print_with_timestamp(format_test_divider(message, len(message)))
515
516def print_log(log: Iterable[str], printer: Printer) -> None:
517	"""Prints all strings in saved log for test in yellow."""
518	formatted = textwrap.dedent('\n'.join(log))
519	for line in formatted.splitlines():
520		printer.print_with_timestamp(printer.yellow(line))
521
522def format_test_result(test: Test, printer: Printer) -> str:
523	"""
524	Returns string with formatted test result with colored status and test
525	name.
526
527	Example:
528	'[PASSED] example'
529
530	Parameters:
531	test - Test object representing current test being printed
532	printer - Printer object to output results
533
534	Return:
535	String containing formatted test result
536	"""
537	if test.status == TestStatus.SUCCESS:
538		return printer.green('[PASSED] ') + test.name
539	if test.status == TestStatus.SKIPPED:
540		return printer.yellow('[SKIPPED] ') + test.name
541	if test.status == TestStatus.NO_TESTS:
542		return printer.yellow('[NO TESTS RUN] ') + test.name
543	if test.status == TestStatus.TEST_CRASHED:
544		print_log(test.log, printer)
545		return stdout.red('[CRASHED] ') + test.name
546	print_log(test.log, printer)
547	return printer.red('[FAILED] ') + test.name
548
549def print_test_result(test: Test, printer: Printer) -> None:
550	"""
551	Prints result line with status of test.
552
553	Example:
554	'[PASSED] example'
555
556	Parameters:
557	test - Test object representing current test being printed
558	printer - Printer object
559	"""
560	printer.print_with_timestamp(format_test_result(test, printer))
561
562def print_test_footer(test: Test, printer: Printer) -> None:
563	"""
564	Prints test footer with status of test.
565
566	Example:
567	'===================== [PASSED] example ====================='
568
569	Parameters:
570	test - Test object representing current test being printed
571	printer - Printer object to output results
572	"""
573	message = format_test_result(test, printer)
574	printer.print_with_timestamp(format_test_divider(message,
575		len(message) - printer.color_len()))
576
577def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
578	"""
579	Prints Test object to given printer. For a child test, the result line is
580	printed. For a parent test, the test header, all child test results, and
581	the test footer are all printed. If failed_only is true, only failed/crashed
582	tests will be printed.
583
584	Parameters:
585	test - Test object to print
586	failed_only - True if only failed/crashed tests should be printed.
587	printer - Printer object to output results
588	"""
589	if test.name == "main":
590		printer.print_with_timestamp(DIVIDER)
591		for subtest in test.subtests:
592			print_test(subtest, failed_only, printer)
593		printer.print_with_timestamp(DIVIDER)
594	elif test.subtests != []:
595		if not failed_only or not test.ok_status():
596			print_test_header(test, printer)
597			for subtest in test.subtests:
598				print_test(subtest, failed_only, printer)
599			print_test_footer(test, printer)
600	else:
601		if not failed_only or not test.ok_status():
602			print_test_result(test, printer)
603
604def _summarize_failed_tests(test: Test) -> str:
605	"""Tries to summarize all the failing subtests in `test`."""
606
607	def failed_names(test: Test, parent_name: str) -> List[str]:
608		# Note: we use 'main' internally for the top-level test.
609		if not parent_name or parent_name == 'main':
610			full_name = test.name
611		else:
612			full_name = parent_name + '.' + test.name
613
614		if not test.subtests:  # this is a leaf node
615			return [full_name]
616
617		# If all the children failed, just say this subtest failed.
618		# Don't summarize it down "the top-level test failed", though.
619		failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
620		if parent_name and len(failed_subtests) ==  len(test.subtests):
621			return [full_name]
622
623		all_failures = []  # type: List[str]
624		for t in failed_subtests:
625			all_failures.extend(failed_names(t, full_name))
626		return all_failures
627
628	failures = failed_names(test, '')
629	# If there are too many failures, printing them out will just be noisy.
630	if len(failures) > 10:  # this is an arbitrary limit
631		return ''
632
633	return 'Failures: ' + ', '.join(failures)
634
635
636def print_summary_line(test: Test, printer: Printer) -> None:
637	"""
638	Prints summary line of test object. Color of line is dependent on
639	status of test. Color is green if test passes, yellow if test is
640	skipped, and red if the test fails or crashes. Summary line contains
641	counts of the statuses of the tests subtests or the test itself if it
642	has no subtests.
643
644	Example:
645	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
646	Errors: 0"
647
648	test - Test object representing current test being printed
649	printer - Printer object to output results
650	"""
651	if test.status == TestStatus.SUCCESS:
652		color = stdout.green
653	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
654		color = stdout.yellow
655	else:
656		color = stdout.red
657	printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
658
659	# Summarize failures that might have gone off-screen since we had a lot
660	# of tests (arbitrarily defined as >=100 for now).
661	if test.ok_status() or test.counts.total() < 100:
662		return
663	summarized = _summarize_failed_tests(test)
664	if not summarized:
665		return
666	printer.print_with_timestamp(color(summarized))
667
668# Other methods:
669
670def bubble_up_test_results(test: Test) -> None:
671	"""
672	If the test has subtests, add the test counts of the subtests to the
673	test and check if any of the tests crashed and if so set the test
674	status to crashed. Otherwise if the test has no subtests add the
675	status of the test to the test counts.
676
677	Parameters:
678	test - Test object for current test being parsed
679	"""
680	subtests = test.subtests
681	counts = test.counts
682	status = test.status
683	for t in subtests:
684		counts.add_subtest_counts(t.counts)
685	if counts.total() == 0:
686		counts.add_status(status)
687	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
688		test.status = TestStatus.TEST_CRASHED
689
690def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
691	"""
692	Finds next test to parse in LineStream, creates new Test object,
693	parses any subtests of the test, populates Test object with all
694	information (status, name) about the test and the Test objects for
695	any subtests, and then returns the Test object. The method accepts
696	three formats of tests:
697
698	Accepted test formats:
699
700	- Main KTAP/TAP header
701
702	Example:
703
704	KTAP version 1
705	1..4
706	[subtests]
707
708	- Subtest header (must include either the KTAP version line or
709	  "# Subtest" header line)
710
711	Example (preferred format with both KTAP version line and
712	"# Subtest" line):
713
714	KTAP version 1
715	# Subtest: name
716	1..3
717	[subtests]
718	ok 1 name
719
720	Example (only "# Subtest" line):
721
722	# Subtest: name
723	1..3
724	[subtests]
725	ok 1 name
726
727	Example (only KTAP version line, compliant with KTAP v1 spec):
728
729	KTAP version 1
730	1..3
731	[subtests]
732	ok 1 name
733
734	- Test result line
735
736	Example:
737
738	ok 1 - test
739
740	Parameters:
741	lines - LineStream of KTAP output to parse
742	expected_num - expected test number for test to be parsed
743	log - list of strings containing any preceding diagnostic lines
744		corresponding to the current test
745	is_subtest - boolean indicating whether test is a subtest
746	printer - Printer object to output results
747
748	Return:
749	Test object populated with characteristics and any subtests
750	"""
751	test = Test()
752	test.log.extend(log)
753
754	# Parse any errors prior to parsing tests
755	err_log = parse_diagnostic(lines)
756	test.log.extend(err_log)
757
758	if not is_subtest:
759		# If parsing the main/top-level test, parse KTAP version line and
760		# test plan
761		test.name = "main"
762		ktap_line = parse_ktap_header(lines, test, printer)
763		test.log.extend(parse_diagnostic(lines))
764		parse_test_plan(lines, test)
765		parent_test = True
766	else:
767		# If not the main test, attempt to parse a test header containing
768		# the KTAP version line and/or subtest header line
769		ktap_line = parse_ktap_header(lines, test, printer)
770		subtest_line = parse_test_header(lines, test)
771		parent_test = (ktap_line or subtest_line)
772		if parent_test:
773			# If KTAP version line and/or subtest header is found, attempt
774			# to parse test plan and print test header
775			test.log.extend(parse_diagnostic(lines))
776			parse_test_plan(lines, test)
777			print_test_header(test, printer)
778	expected_count = test.expected_count
779	subtests = []
780	test_num = 1
781	while parent_test and (expected_count is None or test_num <= expected_count):
782		# Loop to parse any subtests.
783		# Break after parsing expected number of tests or
784		# if expected number of tests is unknown break when test
785		# result line with matching name to subtest header is found
786		# or no more lines in stream.
787		sub_log = parse_diagnostic(lines)
788		sub_test = Test()
789		if not lines or (peek_test_name_match(lines, test) and
790				is_subtest):
791			if expected_count and test_num <= expected_count:
792				# If parser reaches end of test before
793				# parsing expected number of subtests, print
794				# crashed subtest and record error
795				test.add_error(printer, 'missing expected subtest!')
796				sub_test.log.extend(sub_log)
797				test.counts.add_status(
798					TestStatus.TEST_CRASHED)
799				print_test_result(sub_test, printer)
800			else:
801				test.log.extend(sub_log)
802				break
803		else:
804			sub_test = parse_test(lines, test_num, sub_log, True, printer)
805		subtests.append(sub_test)
806		test_num += 1
807	test.subtests = subtests
808	if is_subtest:
809		# If not main test, look for test result line
810		test.log.extend(parse_diagnostic(lines))
811		if test.name != "" and not peek_test_name_match(lines, test):
812			test.add_error(printer, 'missing subtest result line!')
813		else:
814			parse_test_result(lines, test, expected_num, printer)
815
816	# Check for there being no subtests within parent test
817	if parent_test and len(subtests) == 0:
818		# Don't override a bad status if this test had one reported.
819		# Assumption: no subtests means CRASHED is from Test.__init__()
820		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
821			print_log(test.log, printer)
822			test.status = TestStatus.NO_TESTS
823			test.add_error(printer, '0 tests run!')
824
825	# Add statuses to TestCounts attribute in Test object
826	bubble_up_test_results(test)
827	if parent_test and is_subtest:
828		# If test has subtests and is not the main test object, print
829		# footer.
830		print_test_footer(test, printer)
831	elif is_subtest:
832		print_test_result(test, printer)
833	return test
834
835def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
836	"""
837	Using kernel output, extract KTAP lines, parse the lines for test
838	results and print condensed test results and summary line.
839
840	Parameters:
841	kernel_output - Iterable object contains lines of kernel output
842	printer - Printer object to output results
843
844	Return:
845	Test - the main test object with all subtests.
846	"""
847	printer.print_with_timestamp(DIVIDER)
848	lines = extract_tap_lines(kernel_output)
849	test = Test()
850	if not lines:
851		test.name = '<missing>'
852		test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?')
853		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
854	else:
855		test = parse_test(lines, 0, [], False, printer)
856		if test.status != TestStatus.NO_TESTS:
857			test.status = test.counts.get_status()
858	printer.print_with_timestamp(DIVIDER)
 
859	return test
v6.9.4
  1# SPDX-License-Identifier: GPL-2.0
  2#
  3# Parses KTAP test results from a kernel dmesg log and incrementally prints
  4# results with reader-friendly format. Stores and returns test results in a
  5# Test object.
  6#
  7# Copyright (C) 2019, Google LLC.
  8# Author: Felix Guo <felixguoxiuping@gmail.com>
  9# Author: Brendan Higgins <brendanhiggins@google.com>
 10# Author: Rae Moar <rmoar@google.com>
 11
 12from __future__ import annotations
 13from dataclasses import dataclass
 14import re
 15import textwrap
 16
 17from enum import Enum, auto
 18from typing import Iterable, Iterator, List, Optional, Tuple
 19
 20from kunit_printer import stdout
 21
 22class Test:
 23	"""
 24	A class to represent a test parsed from KTAP results. All KTAP
 25	results within a test log are stored in a main Test object as
 26	subtests.
 27
 28	Attributes:
 29	status : TestStatus - status of the test
 30	name : str - name of the test
 31	expected_count : int - expected number of subtests (0 if single
 32		test case and None if unknown expected number of subtests)
 33	subtests : List[Test] - list of subtests
 34	log : List[str] - log of KTAP lines that correspond to the test
 35	counts : TestCounts - counts of the test statuses and errors of
 36		subtests or of the test itself if the test is a single
 37		test case.
 38	"""
 39	def __init__(self) -> None:
 40		"""Creates Test object with default attributes."""
 41		self.status = TestStatus.TEST_CRASHED
 42		self.name = ''
 43		self.expected_count = 0  # type: Optional[int]
 44		self.subtests = []  # type: List[Test]
 45		self.log = []  # type: List[str]
 46		self.counts = TestCounts()
 47
 48	def __str__(self) -> str:
 49		"""Returns string representation of a Test class object."""
 50		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
 51			f'{self.subtests}, {self.log}, {self.counts})')
 52
 53	def __repr__(self) -> str:
 54		"""Returns string representation of a Test class object."""
 55		return str(self)
 56
 57	def add_error(self, error_message: str) -> None:
 58		"""Records an error that occurred while parsing this test."""
 59		self.counts.errors += 1
 60		stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
 61
 62	def ok_status(self) -> bool:
 63		"""Returns true if the status was ok, i.e. passed or skipped."""
 64		return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
 65
 66class TestStatus(Enum):
 67	"""An enumeration class to represent the status of a test."""
 68	SUCCESS = auto()
 69	FAILURE = auto()
 70	SKIPPED = auto()
 71	TEST_CRASHED = auto()
 72	NO_TESTS = auto()
 73	FAILURE_TO_PARSE_TESTS = auto()
 74
 75@dataclass
 76class TestCounts:
 77	"""
 78	Tracks the counts of statuses of all test cases and any errors within
 79	a Test.
 80	"""
 81	passed: int = 0
 82	failed: int = 0
 83	crashed: int = 0
 84	skipped: int = 0
 85	errors: int = 0
 86
 87	def __str__(self) -> str:
 88		"""Returns the string representation of a TestCounts object."""
 89		statuses = [('passed', self.passed), ('failed', self.failed),
 90			('crashed', self.crashed), ('skipped', self.skipped),
 91			('errors', self.errors)]
 92		return f'Ran {self.total()} tests: ' + \
 93			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
 94
 95	def total(self) -> int:
 96		"""Returns the total number of test cases within a test
 97		object, where a test case is a test with no subtests.
 98		"""
 99		return (self.passed + self.failed + self.crashed +
100			self.skipped)
101
102	def add_subtest_counts(self, counts: TestCounts) -> None:
103		"""
104		Adds the counts of another TestCounts object to the current
105		TestCounts object. Used to add the counts of a subtest to the
106		parent test.
107
108		Parameters:
109		counts - a different TestCounts object whose counts
110			will be added to the counts of the TestCounts object
111		"""
112		self.passed += counts.passed
113		self.failed += counts.failed
114		self.crashed += counts.crashed
115		self.skipped += counts.skipped
116		self.errors += counts.errors
117
118	def get_status(self) -> TestStatus:
119		"""Returns the aggregated status of a Test using test
120		counts.
121		"""
122		if self.total() == 0:
123			return TestStatus.NO_TESTS
124		if self.crashed:
125			# Crashes should take priority.
126			return TestStatus.TEST_CRASHED
127		if self.failed:
128			return TestStatus.FAILURE
129		if self.passed:
130			# No failures or crashes, looks good!
131			return TestStatus.SUCCESS
132		# We have only skipped tests.
133		return TestStatus.SKIPPED
134
135	def add_status(self, status: TestStatus) -> None:
136		"""Increments the count for `status`."""
137		if status == TestStatus.SUCCESS:
138			self.passed += 1
139		elif status == TestStatus.FAILURE:
140			self.failed += 1
141		elif status == TestStatus.SKIPPED:
142			self.skipped += 1
143		elif status != TestStatus.NO_TESTS:
144			self.crashed += 1
145
146class LineStream:
147	"""
148	A class to represent the lines of kernel output.
149	Provides a lazy peek()/pop() interface over an iterator of
150	(line#, text).
151	"""
152	_lines: Iterator[Tuple[int, str]]
153	_next: Tuple[int, str]
154	_need_next: bool
155	_done: bool
156
157	def __init__(self, lines: Iterator[Tuple[int, str]]):
158		"""Creates a new LineStream that wraps the given iterator."""
159		self._lines = lines
160		self._done = False
161		self._need_next = True
162		self._next = (0, '')
163
164	def _get_next(self) -> None:
165		"""Advances the LineSteam to the next line, if necessary."""
166		if not self._need_next:
167			return
168		try:
169			self._next = next(self._lines)
170		except StopIteration:
171			self._done = True
172		finally:
173			self._need_next = False
174
175	def peek(self) -> str:
176		"""Returns the current line, without advancing the LineStream.
177		"""
178		self._get_next()
179		return self._next[1]
180
181	def pop(self) -> str:
182		"""Returns the current line and advances the LineStream to
183		the next line.
184		"""
185		s = self.peek()
186		if self._done:
187			raise ValueError(f'LineStream: going past EOF, last line was {s}')
188		self._need_next = True
189		return s
190
191	def __bool__(self) -> bool:
192		"""Returns True if stream has more lines."""
193		self._get_next()
194		return not self._done
195
196	# Only used by kunit_tool_test.py.
197	def __iter__(self) -> Iterator[str]:
198		"""Empties all lines stored in LineStream object into
199		Iterator object and returns the Iterator object.
200		"""
201		while bool(self):
202			yield self.pop()
203
204	def line_number(self) -> int:
205		"""Returns the line number of the current line."""
206		self._get_next()
207		return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214	'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218	"""Extracts KTAP lines from the kernel output."""
219	def isolate_ktap_output(kernel_output: Iterable[str]) \
220			-> Iterator[Tuple[int, str]]:
221		line_num = 0
222		started = False
223		for line in kernel_output:
224			line_num += 1
225			line = line.rstrip()  # remove trailing \n
226			if not started and KTAP_START.search(line):
227				# start extracting KTAP lines and set prefix
228				# to number of characters before version line
229				prefix_len = len(
230					line.split('KTAP version')[0])
231				started = True
232				yield line_num, line[prefix_len:]
233			elif not started and TAP_START.search(line):
234				# start extracting KTAP lines and set prefix
235				# to number of characters before version line
236				prefix_len = len(line.split('TAP version')[0])
237				started = True
238				yield line_num, line[prefix_len:]
239			elif started and KTAP_END.search(line):
240				# stop extracting KTAP lines
241				break
242			elif started:
243				# remove the prefix, if any.
244				line = line[prefix_len:]
245				yield line_num, line
246			elif EXECUTOR_ERROR.search(line):
247				yield line_num, line
248	return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254			version_type: str, test: Test) -> None:
255	"""
256	Adds error to test object if version number is too high or too
257	low.
258
259	Parameters:
260	version_num - The inputted version number from the parsed KTAP or TAP
261		header line
262	accepted_version - List of accepted KTAP or TAP versions
263	version_type - 'KTAP' or 'TAP' depending on the type of
264		version line.
265	test - Test object for current test being parsed
 
266	"""
267	if version_num < min(accepted_versions):
268		test.add_error(f'{version_type} version lower than expected!')
269	elif version_num > max(accepted_versions):
270		test.add_error(f'{version_type} version higer than expected!')
271
272def parse_ktap_header(lines: LineStream, test: Test) -> bool:
273	"""
274	Parses KTAP/TAP header line and checks version number.
275	Returns False if fails to parse KTAP/TAP header line.
276
277	Accepted formats:
278	- 'KTAP version [version number]'
279	- 'TAP version [version number]'
280
281	Parameters:
282	lines - LineStream of KTAP output to parse
283	test - Test object for current test being parsed
 
284
285	Return:
286	True if successfully parsed KTAP/TAP header line
287	"""
288	ktap_match = KTAP_START.match(lines.peek())
289	tap_match = TAP_START.match(lines.peek())
290	if ktap_match:
291		version_num = int(ktap_match.group(1))
292		check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
293	elif tap_match:
294		version_num = int(tap_match.group(1))
295		check_version(version_num, TAP_VERSIONS, 'TAP', test)
296	else:
297		return False
298	lines.pop()
299	return True
300
301TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
302
303def parse_test_header(lines: LineStream, test: Test) -> bool:
304	"""
305	Parses test header and stores test name in test object.
306	Returns False if fails to parse test header line.
307
308	Accepted format:
309	- '# Subtest: [test name]'
310
311	Parameters:
312	lines - LineStream of KTAP output to parse
313	test - Test object for current test being parsed
314
315	Return:
316	True if successfully parsed test header line
317	"""
318	match = TEST_HEADER.match(lines.peek())
319	if not match:
320		return False
321	test.name = match.group(1)
322	lines.pop()
323	return True
324
325TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
326
327def parse_test_plan(lines: LineStream, test: Test) -> bool:
328	"""
329	Parses test plan line and stores the expected number of subtests in
330	test object. Reports an error if expected count is 0.
331	Returns False and sets expected_count to None if there is no valid test
332	plan.
333
334	Accepted format:
335	- '1..[number of subtests]'
336
337	Parameters:
338	lines - LineStream of KTAP output to parse
339	test - Test object for current test being parsed
340
341	Return:
342	True if successfully parsed test plan line
343	"""
344	match = TEST_PLAN.match(lines.peek())
345	if not match:
346		test.expected_count = None
347		return False
348	expected_count = int(match.group(1))
349	test.expected_count = expected_count
350	lines.pop()
351	return True
352
353TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
354
355TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
356
357def peek_test_name_match(lines: LineStream, test: Test) -> bool:
358	"""
359	Matches current line with the format of a test result line and checks
360	if the name matches the name of the current test.
361	Returns False if fails to match format or name.
362
363	Accepted format:
364	- '[ok|not ok] [test number] [-] [test name] [optional skip
365		directive]'
366
367	Parameters:
368	lines - LineStream of KTAP output to parse
369	test - Test object for current test being parsed
370
371	Return:
372	True if matched a test result line and the name matching the
373		expected test name
374	"""
375	line = lines.peek()
376	match = TEST_RESULT.match(line)
377	if not match:
378		return False
379	name = match.group(4)
380	return name == test.name
381
382def parse_test_result(lines: LineStream, test: Test,
383			expected_num: int) -> bool:
384	"""
385	Parses test result line and stores the status and name in the test
386	object. Reports an error if the test number does not match expected
387	test number.
388	Returns False if fails to parse test result line.
389
390	Note that the SKIP directive is the only direction that causes a
391	change in status.
392
393	Accepted format:
394	- '[ok|not ok] [test number] [-] [test name] [optional skip
395		directive]'
396
397	Parameters:
398	lines - LineStream of KTAP output to parse
399	test - Test object for current test being parsed
400	expected_num - expected test number for current test
 
401
402	Return:
403	True if successfully parsed a test result line.
404	"""
405	line = lines.peek()
406	match = TEST_RESULT.match(line)
407	skip_match = TEST_RESULT_SKIP.match(line)
408
409	# Check if line matches test result line format
410	if not match:
411		return False
412	lines.pop()
413
414	# Set name of test object
415	if skip_match:
416		test.name = skip_match.group(4)
417	else:
418		test.name = match.group(4)
419
420	# Check test num
421	num = int(match.group(2))
422	if num != expected_num:
423		test.add_error(f'Expected test number {expected_num} but found {num}')
424
425	# Set status of test object
426	status = match.group(1)
427	if skip_match:
428		test.status = TestStatus.SKIPPED
429	elif status == 'ok':
430		test.status = TestStatus.SUCCESS
431	else:
432		test.status = TestStatus.FAILURE
433	return True
434
435def parse_diagnostic(lines: LineStream) -> List[str]:
436	"""
437	Parse lines that do not match the format of a test result line or
438	test header line and returns them in list.
439
440	Line formats that are not parsed:
441	- '# Subtest: [test name]'
442	- '[ok|not ok] [test number] [-] [test name] [optional skip
443		directive]'
444	- 'KTAP version [version number]'
445
446	Parameters:
447	lines - LineStream of KTAP output to parse
448
449	Return:
450	Log of diagnostic lines
451	"""
452	log = []  # type: List[str]
453	non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
454	while lines and not any(re.match(lines.peek())
455			for re in non_diagnostic_lines):
456		log.append(lines.pop())
457	return log
458
459
460# Printing helper methods:
461
462DIVIDER = '=' * 60
463
464def format_test_divider(message: str, len_message: int) -> str:
465	"""
466	Returns string with message centered in fixed width divider.
467
468	Example:
469	'===================== message example ====================='
470
471	Parameters:
472	message - message to be centered in divider line
473	len_message - length of the message to be printed such that
474		any characters of the color codes are not counted
475
476	Return:
477	String containing message centered in fixed width divider
478	"""
479	default_count = 3  # default number of dashes
480	len_1 = default_count
481	len_2 = default_count
482	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
483	if difference > 0:
484		# calculate number of dashes for each side of the divider
485		len_1 = int(difference / 2)
486		len_2 = difference - len_1
487	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
488
489def print_test_header(test: Test) -> None:
490	"""
491	Prints test header with test name and optionally the expected number
492	of subtests.
493
494	Example:
495	'=================== example (2 subtests) ==================='
496
497	Parameters:
498	test - Test object representing current test being printed
 
499	"""
500	message = test.name
501	if message != "":
502		# Add a leading space before the subtest counts only if a test name
503		# is provided using a "# Subtest" header line.
504		message += " "
505	if test.expected_count:
506		if test.expected_count == 1:
507			message += '(1 subtest)'
508		else:
509			message += f'({test.expected_count} subtests)'
510	stdout.print_with_timestamp(format_test_divider(message, len(message)))
511
512def print_log(log: Iterable[str]) -> None:
513	"""Prints all strings in saved log for test in yellow."""
514	formatted = textwrap.dedent('\n'.join(log))
515	for line in formatted.splitlines():
516		stdout.print_with_timestamp(stdout.yellow(line))
517
518def format_test_result(test: Test) -> str:
519	"""
520	Returns string with formatted test result with colored status and test
521	name.
522
523	Example:
524	'[PASSED] example'
525
526	Parameters:
527	test - Test object representing current test being printed
 
528
529	Return:
530	String containing formatted test result
531	"""
532	if test.status == TestStatus.SUCCESS:
533		return stdout.green('[PASSED] ') + test.name
534	if test.status == TestStatus.SKIPPED:
535		return stdout.yellow('[SKIPPED] ') + test.name
536	if test.status == TestStatus.NO_TESTS:
537		return stdout.yellow('[NO TESTS RUN] ') + test.name
538	if test.status == TestStatus.TEST_CRASHED:
539		print_log(test.log)
540		return stdout.red('[CRASHED] ') + test.name
541	print_log(test.log)
542	return stdout.red('[FAILED] ') + test.name
543
544def print_test_result(test: Test) -> None:
545	"""
546	Prints result line with status of test.
547
548	Example:
549	'[PASSED] example'
550
551	Parameters:
552	test - Test object representing current test being printed
 
553	"""
554	stdout.print_with_timestamp(format_test_result(test))
555
556def print_test_footer(test: Test) -> None:
557	"""
558	Prints test footer with status of test.
559
560	Example:
561	'===================== [PASSED] example ====================='
562
563	Parameters:
564	test - Test object representing current test being printed
 
565	"""
566	message = format_test_result(test)
567	stdout.print_with_timestamp(format_test_divider(message,
568		len(message) - stdout.color_len()))
569
570
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
571
572def _summarize_failed_tests(test: Test) -> str:
573	"""Tries to summarize all the failing subtests in `test`."""
574
575	def failed_names(test: Test, parent_name: str) -> List[str]:
576		# Note: we use 'main' internally for the top-level test.
577		if not parent_name or parent_name == 'main':
578			full_name = test.name
579		else:
580			full_name = parent_name + '.' + test.name
581
582		if not test.subtests:  # this is a leaf node
583			return [full_name]
584
585		# If all the children failed, just say this subtest failed.
586		# Don't summarize it down "the top-level test failed", though.
587		failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
588		if parent_name and len(failed_subtests) ==  len(test.subtests):
589			return [full_name]
590
591		all_failures = []  # type: List[str]
592		for t in failed_subtests:
593			all_failures.extend(failed_names(t, full_name))
594		return all_failures
595
596	failures = failed_names(test, '')
597	# If there are too many failures, printing them out will just be noisy.
598	if len(failures) > 10:  # this is an arbitrary limit
599		return ''
600
601	return 'Failures: ' + ', '.join(failures)
602
603
604def print_summary_line(test: Test) -> None:
605	"""
606	Prints summary line of test object. Color of line is dependent on
607	status of test. Color is green if test passes, yellow if test is
608	skipped, and red if the test fails or crashes. Summary line contains
609	counts of the statuses of the tests subtests or the test itself if it
610	has no subtests.
611
612	Example:
613	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
614	Errors: 0"
615
616	test - Test object representing current test being printed
 
617	"""
618	if test.status == TestStatus.SUCCESS:
619		color = stdout.green
620	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
621		color = stdout.yellow
622	else:
623		color = stdout.red
624	stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
625
626	# Summarize failures that might have gone off-screen since we had a lot
627	# of tests (arbitrarily defined as >=100 for now).
628	if test.ok_status() or test.counts.total() < 100:
629		return
630	summarized = _summarize_failed_tests(test)
631	if not summarized:
632		return
633	stdout.print_with_timestamp(color(summarized))
634
635# Other methods:
636
637def bubble_up_test_results(test: Test) -> None:
638	"""
639	If the test has subtests, add the test counts of the subtests to the
640	test and check if any of the tests crashed and if so set the test
641	status to crashed. Otherwise if the test has no subtests add the
642	status of the test to the test counts.
643
644	Parameters:
645	test - Test object for current test being parsed
646	"""
647	subtests = test.subtests
648	counts = test.counts
649	status = test.status
650	for t in subtests:
651		counts.add_subtest_counts(t.counts)
652	if counts.total() == 0:
653		counts.add_status(status)
654	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
655		test.status = TestStatus.TEST_CRASHED
656
657def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
658	"""
659	Finds next test to parse in LineStream, creates new Test object,
660	parses any subtests of the test, populates Test object with all
661	information (status, name) about the test and the Test objects for
662	any subtests, and then returns the Test object. The method accepts
663	three formats of tests:
664
665	Accepted test formats:
666
667	- Main KTAP/TAP header
668
669	Example:
670
671	KTAP version 1
672	1..4
673	[subtests]
674
675	- Subtest header (must include either the KTAP version line or
676	  "# Subtest" header line)
677
678	Example (preferred format with both KTAP version line and
679	"# Subtest" line):
680
681	KTAP version 1
682	# Subtest: name
683	1..3
684	[subtests]
685	ok 1 name
686
687	Example (only "# Subtest" line):
688
689	# Subtest: name
690	1..3
691	[subtests]
692	ok 1 name
693
694	Example (only KTAP version line, compliant with KTAP v1 spec):
695
696	KTAP version 1
697	1..3
698	[subtests]
699	ok 1 name
700
701	- Test result line
702
703	Example:
704
705	ok 1 - test
706
707	Parameters:
708	lines - LineStream of KTAP output to parse
709	expected_num - expected test number for test to be parsed
710	log - list of strings containing any preceding diagnostic lines
711		corresponding to the current test
712	is_subtest - boolean indicating whether test is a subtest
 
713
714	Return:
715	Test object populated with characteristics and any subtests
716	"""
717	test = Test()
718	test.log.extend(log)
719
720	# Parse any errors prior to parsing tests
721	err_log = parse_diagnostic(lines)
722	test.log.extend(err_log)
723
724	if not is_subtest:
725		# If parsing the main/top-level test, parse KTAP version line and
726		# test plan
727		test.name = "main"
728		ktap_line = parse_ktap_header(lines, test)
729		test.log.extend(parse_diagnostic(lines))
730		parse_test_plan(lines, test)
731		parent_test = True
732	else:
733		# If not the main test, attempt to parse a test header containing
734		# the KTAP version line and/or subtest header line
735		ktap_line = parse_ktap_header(lines, test)
736		subtest_line = parse_test_header(lines, test)
737		parent_test = (ktap_line or subtest_line)
738		if parent_test:
739			# If KTAP version line and/or subtest header is found, attempt
740			# to parse test plan and print test header
741			test.log.extend(parse_diagnostic(lines))
742			parse_test_plan(lines, test)
743			print_test_header(test)
744	expected_count = test.expected_count
745	subtests = []
746	test_num = 1
747	while parent_test and (expected_count is None or test_num <= expected_count):
748		# Loop to parse any subtests.
749		# Break after parsing expected number of tests or
750		# if expected number of tests is unknown break when test
751		# result line with matching name to subtest header is found
752		# or no more lines in stream.
753		sub_log = parse_diagnostic(lines)
754		sub_test = Test()
755		if not lines or (peek_test_name_match(lines, test) and
756				is_subtest):
757			if expected_count and test_num <= expected_count:
758				# If parser reaches end of test before
759				# parsing expected number of subtests, print
760				# crashed subtest and record error
761				test.add_error('missing expected subtest!')
762				sub_test.log.extend(sub_log)
763				test.counts.add_status(
764					TestStatus.TEST_CRASHED)
765				print_test_result(sub_test)
766			else:
767				test.log.extend(sub_log)
768				break
769		else:
770			sub_test = parse_test(lines, test_num, sub_log, True)
771		subtests.append(sub_test)
772		test_num += 1
773	test.subtests = subtests
774	if is_subtest:
775		# If not main test, look for test result line
776		test.log.extend(parse_diagnostic(lines))
777		if test.name != "" and not peek_test_name_match(lines, test):
778			test.add_error('missing subtest result line!')
779		else:
780			parse_test_result(lines, test, expected_num)
781
782	# Check for there being no subtests within parent test
783	if parent_test and len(subtests) == 0:
784		# Don't override a bad status if this test had one reported.
785		# Assumption: no subtests means CRASHED is from Test.__init__()
786		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
787			print_log(test.log)
788			test.status = TestStatus.NO_TESTS
789			test.add_error('0 tests run!')
790
791	# Add statuses to TestCounts attribute in Test object
792	bubble_up_test_results(test)
793	if parent_test and is_subtest:
794		# If test has subtests and is not the main test object, print
795		# footer.
796		print_test_footer(test)
797	elif is_subtest:
798		print_test_result(test)
799	return test
800
801def parse_run_tests(kernel_output: Iterable[str]) -> Test:
802	"""
803	Using kernel output, extract KTAP lines, parse the lines for test
804	results and print condensed test results and summary line.
805
806	Parameters:
807	kernel_output - Iterable object contains lines of kernel output
 
808
809	Return:
810	Test - the main test object with all subtests.
811	"""
812	stdout.print_with_timestamp(DIVIDER)
813	lines = extract_tap_lines(kernel_output)
814	test = Test()
815	if not lines:
816		test.name = '<missing>'
817		test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
818		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
819	else:
820		test = parse_test(lines, 0, [], False)
821		if test.status != TestStatus.NO_TESTS:
822			test.status = test.counts.get_status()
823	stdout.print_with_timestamp(DIVIDER)
824	print_summary_line(test)
825	return test