Loading...
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import sys
16import textwrap
17
18from enum import Enum, auto
19from typing import Iterable, Iterator, List, Optional, Tuple
20
21from kunit_printer import stdout
22
23class Test:
24 """
25 A class to represent a test parsed from KTAP results. All KTAP
26 results within a test log are stored in a main Test object as
27 subtests.
28
29 Attributes:
30 status : TestStatus - status of the test
31 name : str - name of the test
32 expected_count : int - expected number of subtests (0 if single
33 test case and None if unknown expected number of subtests)
34 subtests : List[Test] - list of subtests
35 log : List[str] - log of KTAP lines that correspond to the test
36 counts : TestCounts - counts of the test statuses and errors of
37 subtests or of the test itself if the test is a single
38 test case.
39 """
40 def __init__(self) -> None:
41 """Creates Test object with default attributes."""
42 self.status = TestStatus.TEST_CRASHED
43 self.name = ''
44 self.expected_count = 0 # type: Optional[int]
45 self.subtests = [] # type: List[Test]
46 self.log = [] # type: List[str]
47 self.counts = TestCounts()
48
49 def __str__(self) -> str:
50 """Returns string representation of a Test class object."""
51 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
52 f'{self.subtests}, {self.log}, {self.counts})')
53
54 def __repr__(self) -> str:
55 """Returns string representation of a Test class object."""
56 return str(self)
57
58 def add_error(self, error_message: str) -> None:
59 """Records an error that occurred while parsing this test."""
60 self.counts.errors += 1
61 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
62
63 def ok_status(self) -> bool:
64 """Returns true if the status was ok, i.e. passed or skipped."""
65 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
66
67class TestStatus(Enum):
68 """An enumeration class to represent the status of a test."""
69 SUCCESS = auto()
70 FAILURE = auto()
71 SKIPPED = auto()
72 TEST_CRASHED = auto()
73 NO_TESTS = auto()
74 FAILURE_TO_PARSE_TESTS = auto()
75
76@dataclass
77class TestCounts:
78 """
79 Tracks the counts of statuses of all test cases and any errors within
80 a Test.
81 """
82 passed: int = 0
83 failed: int = 0
84 crashed: int = 0
85 skipped: int = 0
86 errors: int = 0
87
88 def __str__(self) -> str:
89 """Returns the string representation of a TestCounts object."""
90 statuses = [('passed', self.passed), ('failed', self.failed),
91 ('crashed', self.crashed), ('skipped', self.skipped),
92 ('errors', self.errors)]
93 return f'Ran {self.total()} tests: ' + \
94 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
95
96 def total(self) -> int:
97 """Returns the total number of test cases within a test
98 object, where a test case is a test with no subtests.
99 """
100 return (self.passed + self.failed + self.crashed +
101 self.skipped)
102
103 def add_subtest_counts(self, counts: TestCounts) -> None:
104 """
105 Adds the counts of another TestCounts object to the current
106 TestCounts object. Used to add the counts of a subtest to the
107 parent test.
108
109 Parameters:
110 counts - a different TestCounts object whose counts
111 will be added to the counts of the TestCounts object
112 """
113 self.passed += counts.passed
114 self.failed += counts.failed
115 self.crashed += counts.crashed
116 self.skipped += counts.skipped
117 self.errors += counts.errors
118
119 def get_status(self) -> TestStatus:
120 """Returns the aggregated status of a Test using test
121 counts.
122 """
123 if self.total() == 0:
124 return TestStatus.NO_TESTS
125 if self.crashed:
126 # Crashes should take priority.
127 return TestStatus.TEST_CRASHED
128 if self.failed:
129 return TestStatus.FAILURE
130 if self.passed:
131 # No failures or crashes, looks good!
132 return TestStatus.SUCCESS
133 # We have only skipped tests.
134 return TestStatus.SKIPPED
135
136 def add_status(self, status: TestStatus) -> None:
137 """Increments the count for `status`."""
138 if status == TestStatus.SUCCESS:
139 self.passed += 1
140 elif status == TestStatus.FAILURE:
141 self.failed += 1
142 elif status == TestStatus.SKIPPED:
143 self.skipped += 1
144 elif status != TestStatus.NO_TESTS:
145 self.crashed += 1
146
147class LineStream:
148 """
149 A class to represent the lines of kernel output.
150 Provides a lazy peek()/pop() interface over an iterator of
151 (line#, text).
152 """
153 _lines: Iterator[Tuple[int, str]]
154 _next: Tuple[int, str]
155 _need_next: bool
156 _done: bool
157
158 def __init__(self, lines: Iterator[Tuple[int, str]]):
159 """Creates a new LineStream that wraps the given iterator."""
160 self._lines = lines
161 self._done = False
162 self._need_next = True
163 self._next = (0, '')
164
165 def _get_next(self) -> None:
166 """Advances the LineSteam to the next line, if necessary."""
167 if not self._need_next:
168 return
169 try:
170 self._next = next(self._lines)
171 except StopIteration:
172 self._done = True
173 finally:
174 self._need_next = False
175
176 def peek(self) -> str:
177 """Returns the current line, without advancing the LineStream.
178 """
179 self._get_next()
180 return self._next[1]
181
182 def pop(self) -> str:
183 """Returns the current line and advances the LineStream to
184 the next line.
185 """
186 s = self.peek()
187 if self._done:
188 raise ValueError(f'LineStream: going past EOF, last line was {s}')
189 self._need_next = True
190 return s
191
192 def __bool__(self) -> bool:
193 """Returns True if stream has more lines."""
194 self._get_next()
195 return not self._done
196
197 # Only used by kunit_tool_test.py.
198 def __iter__(self) -> Iterator[str]:
199 """Empties all lines stored in LineStream object into
200 Iterator object and returns the Iterator object.
201 """
202 while bool(self):
203 yield self.pop()
204
205 def line_number(self) -> int:
206 """Returns the line number of the current line."""
207 self._get_next()
208 return self._next[0]
209
210# Parsing helper methods:
211
212KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
213TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
214KTAP_END = re.compile(r'\s*(List of all partitions:|'
215 'Kernel panic - not syncing: VFS:|reboot: System halted)')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
221 line_num = 0
222 started = False
223 for line in kernel_output:
224 line_num += 1
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
229 prefix_len = len(
230 line.split('KTAP version')[0])
231 started = True
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
237 started = True
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
241 break
242 elif started:
243 # remove the prefix, if any.
244 line = line[prefix_len:]
245 yield line_num, line
246 return LineStream(lines=isolate_ktap_output(kernel_output))
247
248KTAP_VERSIONS = [1]
249TAP_VERSIONS = [13, 14]
250
251def check_version(version_num: int, accepted_versions: List[int],
252 version_type: str, test: Test) -> None:
253 """
254 Adds error to test object if version number is too high or too
255 low.
256
257 Parameters:
258 version_num - The inputted version number from the parsed KTAP or TAP
259 header line
260 accepted_version - List of accepted KTAP or TAP versions
261 version_type - 'KTAP' or 'TAP' depending on the type of
262 version line.
263 test - Test object for current test being parsed
264 """
265 if version_num < min(accepted_versions):
266 test.add_error(f'{version_type} version lower than expected!')
267 elif version_num > max(accepted_versions):
268 test.add_error(f'{version_type} version higer than expected!')
269
270def parse_ktap_header(lines: LineStream, test: Test) -> bool:
271 """
272 Parses KTAP/TAP header line and checks version number.
273 Returns False if fails to parse KTAP/TAP header line.
274
275 Accepted formats:
276 - 'KTAP version [version number]'
277 - 'TAP version [version number]'
278
279 Parameters:
280 lines - LineStream of KTAP output to parse
281 test - Test object for current test being parsed
282
283 Return:
284 True if successfully parsed KTAP/TAP header line
285 """
286 ktap_match = KTAP_START.match(lines.peek())
287 tap_match = TAP_START.match(lines.peek())
288 if ktap_match:
289 version_num = int(ktap_match.group(1))
290 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
291 elif tap_match:
292 version_num = int(tap_match.group(1))
293 check_version(version_num, TAP_VERSIONS, 'TAP', test)
294 else:
295 return False
296 lines.pop()
297 return True
298
299TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
300
301def parse_test_header(lines: LineStream, test: Test) -> bool:
302 """
303 Parses test header and stores test name in test object.
304 Returns False if fails to parse test header line.
305
306 Accepted format:
307 - '# Subtest: [test name]'
308
309 Parameters:
310 lines - LineStream of KTAP output to parse
311 test - Test object for current test being parsed
312
313 Return:
314 True if successfully parsed test header line
315 """
316 match = TEST_HEADER.match(lines.peek())
317 if not match:
318 return False
319 test.name = match.group(1)
320 lines.pop()
321 return True
322
323TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
324
325def parse_test_plan(lines: LineStream, test: Test) -> bool:
326 """
327 Parses test plan line and stores the expected number of subtests in
328 test object. Reports an error if expected count is 0.
329 Returns False and sets expected_count to None if there is no valid test
330 plan.
331
332 Accepted format:
333 - '1..[number of subtests]'
334
335 Parameters:
336 lines - LineStream of KTAP output to parse
337 test - Test object for current test being parsed
338
339 Return:
340 True if successfully parsed test plan line
341 """
342 match = TEST_PLAN.match(lines.peek())
343 if not match:
344 test.expected_count = None
345 return False
346 expected_count = int(match.group(1))
347 test.expected_count = expected_count
348 lines.pop()
349 return True
350
351TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
352
353TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
354
355def peek_test_name_match(lines: LineStream, test: Test) -> bool:
356 """
357 Matches current line with the format of a test result line and checks
358 if the name matches the name of the current test.
359 Returns False if fails to match format or name.
360
361 Accepted format:
362 - '[ok|not ok] [test number] [-] [test name] [optional skip
363 directive]'
364
365 Parameters:
366 lines - LineStream of KTAP output to parse
367 test - Test object for current test being parsed
368
369 Return:
370 True if matched a test result line and the name matching the
371 expected test name
372 """
373 line = lines.peek()
374 match = TEST_RESULT.match(line)
375 if not match:
376 return False
377 name = match.group(4)
378 return name == test.name
379
380def parse_test_result(lines: LineStream, test: Test,
381 expected_num: int) -> bool:
382 """
383 Parses test result line and stores the status and name in the test
384 object. Reports an error if the test number does not match expected
385 test number.
386 Returns False if fails to parse test result line.
387
388 Note that the SKIP directive is the only direction that causes a
389 change in status.
390
391 Accepted format:
392 - '[ok|not ok] [test number] [-] [test name] [optional skip
393 directive]'
394
395 Parameters:
396 lines - LineStream of KTAP output to parse
397 test - Test object for current test being parsed
398 expected_num - expected test number for current test
399
400 Return:
401 True if successfully parsed a test result line.
402 """
403 line = lines.peek()
404 match = TEST_RESULT.match(line)
405 skip_match = TEST_RESULT_SKIP.match(line)
406
407 # Check if line matches test result line format
408 if not match:
409 return False
410 lines.pop()
411
412 # Set name of test object
413 if skip_match:
414 test.name = skip_match.group(4)
415 else:
416 test.name = match.group(4)
417
418 # Check test num
419 num = int(match.group(2))
420 if num != expected_num:
421 test.add_error(f'Expected test number {expected_num} but found {num}')
422
423 # Set status of test object
424 status = match.group(1)
425 if skip_match:
426 test.status = TestStatus.SKIPPED
427 elif status == 'ok':
428 test.status = TestStatus.SUCCESS
429 else:
430 test.status = TestStatus.FAILURE
431 return True
432
433def parse_diagnostic(lines: LineStream) -> List[str]:
434 """
435 Parse lines that do not match the format of a test result line or
436 test header line and returns them in list.
437
438 Line formats that are not parsed:
439 - '# Subtest: [test name]'
440 - '[ok|not ok] [test number] [-] [test name] [optional skip
441 directive]'
442 - 'KTAP version [version number]'
443
444 Parameters:
445 lines - LineStream of KTAP output to parse
446
447 Return:
448 Log of diagnostic lines
449 """
450 log = [] # type: List[str]
451 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START]
452 while lines and not any(re.match(lines.peek())
453 for re in non_diagnostic_lines):
454 log.append(lines.pop())
455 return log
456
457
458# Printing helper methods:
459
460DIVIDER = '=' * 60
461
462def format_test_divider(message: str, len_message: int) -> str:
463 """
464 Returns string with message centered in fixed width divider.
465
466 Example:
467 '===================== message example ====================='
468
469 Parameters:
470 message - message to be centered in divider line
471 len_message - length of the message to be printed such that
472 any characters of the color codes are not counted
473
474 Return:
475 String containing message centered in fixed width divider
476 """
477 default_count = 3 # default number of dashes
478 len_1 = default_count
479 len_2 = default_count
480 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
481 if difference > 0:
482 # calculate number of dashes for each side of the divider
483 len_1 = int(difference / 2)
484 len_2 = difference - len_1
485 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
486
487def print_test_header(test: Test) -> None:
488 """
489 Prints test header with test name and optionally the expected number
490 of subtests.
491
492 Example:
493 '=================== example (2 subtests) ==================='
494
495 Parameters:
496 test - Test object representing current test being printed
497 """
498 message = test.name
499 if message != "":
500 # Add a leading space before the subtest counts only if a test name
501 # is provided using a "# Subtest" header line.
502 message += " "
503 if test.expected_count:
504 if test.expected_count == 1:
505 message += '(1 subtest)'
506 else:
507 message += f'({test.expected_count} subtests)'
508 stdout.print_with_timestamp(format_test_divider(message, len(message)))
509
510def print_log(log: Iterable[str]) -> None:
511 """Prints all strings in saved log for test in yellow."""
512 formatted = textwrap.dedent('\n'.join(log))
513 for line in formatted.splitlines():
514 stdout.print_with_timestamp(stdout.yellow(line))
515
516def format_test_result(test: Test) -> str:
517 """
518 Returns string with formatted test result with colored status and test
519 name.
520
521 Example:
522 '[PASSED] example'
523
524 Parameters:
525 test - Test object representing current test being printed
526
527 Return:
528 String containing formatted test result
529 """
530 if test.status == TestStatus.SUCCESS:
531 return stdout.green('[PASSED] ') + test.name
532 if test.status == TestStatus.SKIPPED:
533 return stdout.yellow('[SKIPPED] ') + test.name
534 if test.status == TestStatus.NO_TESTS:
535 return stdout.yellow('[NO TESTS RUN] ') + test.name
536 if test.status == TestStatus.TEST_CRASHED:
537 print_log(test.log)
538 return stdout.red('[CRASHED] ') + test.name
539 print_log(test.log)
540 return stdout.red('[FAILED] ') + test.name
541
542def print_test_result(test: Test) -> None:
543 """
544 Prints result line with status of test.
545
546 Example:
547 '[PASSED] example'
548
549 Parameters:
550 test - Test object representing current test being printed
551 """
552 stdout.print_with_timestamp(format_test_result(test))
553
554def print_test_footer(test: Test) -> None:
555 """
556 Prints test footer with status of test.
557
558 Example:
559 '===================== [PASSED] example ====================='
560
561 Parameters:
562 test - Test object representing current test being printed
563 """
564 message = format_test_result(test)
565 stdout.print_with_timestamp(format_test_divider(message,
566 len(message) - stdout.color_len()))
567
568
569
570def _summarize_failed_tests(test: Test) -> str:
571 """Tries to summarize all the failing subtests in `test`."""
572
573 def failed_names(test: Test, parent_name: str) -> List[str]:
574 # Note: we use 'main' internally for the top-level test.
575 if not parent_name or parent_name == 'main':
576 full_name = test.name
577 else:
578 full_name = parent_name + '.' + test.name
579
580 if not test.subtests: # this is a leaf node
581 return [full_name]
582
583 # If all the children failed, just say this subtest failed.
584 # Don't summarize it down "the top-level test failed", though.
585 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
586 if parent_name and len(failed_subtests) == len(test.subtests):
587 return [full_name]
588
589 all_failures = [] # type: List[str]
590 for t in failed_subtests:
591 all_failures.extend(failed_names(t, full_name))
592 return all_failures
593
594 failures = failed_names(test, '')
595 # If there are too many failures, printing them out will just be noisy.
596 if len(failures) > 10: # this is an arbitrary limit
597 return ''
598
599 return 'Failures: ' + ', '.join(failures)
600
601
602def print_summary_line(test: Test) -> None:
603 """
604 Prints summary line of test object. Color of line is dependent on
605 status of test. Color is green if test passes, yellow if test is
606 skipped, and red if the test fails or crashes. Summary line contains
607 counts of the statuses of the tests subtests or the test itself if it
608 has no subtests.
609
610 Example:
611 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
612 Errors: 0"
613
614 test - Test object representing current test being printed
615 """
616 if test.status == TestStatus.SUCCESS:
617 color = stdout.green
618 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
619 color = stdout.yellow
620 else:
621 color = stdout.red
622 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
623
624 # Summarize failures that might have gone off-screen since we had a lot
625 # of tests (arbitrarily defined as >=100 for now).
626 if test.ok_status() or test.counts.total() < 100:
627 return
628 summarized = _summarize_failed_tests(test)
629 if not summarized:
630 return
631 stdout.print_with_timestamp(color(summarized))
632
633# Other methods:
634
635def bubble_up_test_results(test: Test) -> None:
636 """
637 If the test has subtests, add the test counts of the subtests to the
638 test and check if any of the tests crashed and if so set the test
639 status to crashed. Otherwise if the test has no subtests add the
640 status of the test to the test counts.
641
642 Parameters:
643 test - Test object for current test being parsed
644 """
645 subtests = test.subtests
646 counts = test.counts
647 status = test.status
648 for t in subtests:
649 counts.add_subtest_counts(t.counts)
650 if counts.total() == 0:
651 counts.add_status(status)
652 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
653 test.status = TestStatus.TEST_CRASHED
654
655def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
656 """
657 Finds next test to parse in LineStream, creates new Test object,
658 parses any subtests of the test, populates Test object with all
659 information (status, name) about the test and the Test objects for
660 any subtests, and then returns the Test object. The method accepts
661 three formats of tests:
662
663 Accepted test formats:
664
665 - Main KTAP/TAP header
666
667 Example:
668
669 KTAP version 1
670 1..4
671 [subtests]
672
673 - Subtest header (must include either the KTAP version line or
674 "# Subtest" header line)
675
676 Example (preferred format with both KTAP version line and
677 "# Subtest" line):
678
679 KTAP version 1
680 # Subtest: name
681 1..3
682 [subtests]
683 ok 1 name
684
685 Example (only "# Subtest" line):
686
687 # Subtest: name
688 1..3
689 [subtests]
690 ok 1 name
691
692 Example (only KTAP version line, compliant with KTAP v1 spec):
693
694 KTAP version 1
695 1..3
696 [subtests]
697 ok 1 name
698
699 - Test result line
700
701 Example:
702
703 ok 1 - test
704
705 Parameters:
706 lines - LineStream of KTAP output to parse
707 expected_num - expected test number for test to be parsed
708 log - list of strings containing any preceding diagnostic lines
709 corresponding to the current test
710 is_subtest - boolean indicating whether test is a subtest
711
712 Return:
713 Test object populated with characteristics and any subtests
714 """
715 test = Test()
716 test.log.extend(log)
717 if not is_subtest:
718 # If parsing the main/top-level test, parse KTAP version line and
719 # test plan
720 test.name = "main"
721 ktap_line = parse_ktap_header(lines, test)
722 parse_test_plan(lines, test)
723 parent_test = True
724 else:
725 # If not the main test, attempt to parse a test header containing
726 # the KTAP version line and/or subtest header line
727 ktap_line = parse_ktap_header(lines, test)
728 subtest_line = parse_test_header(lines, test)
729 parent_test = (ktap_line or subtest_line)
730 if parent_test:
731 # If KTAP version line and/or subtest header is found, attempt
732 # to parse test plan and print test header
733 parse_test_plan(lines, test)
734 print_test_header(test)
735 expected_count = test.expected_count
736 subtests = []
737 test_num = 1
738 while parent_test and (expected_count is None or test_num <= expected_count):
739 # Loop to parse any subtests.
740 # Break after parsing expected number of tests or
741 # if expected number of tests is unknown break when test
742 # result line with matching name to subtest header is found
743 # or no more lines in stream.
744 sub_log = parse_diagnostic(lines)
745 sub_test = Test()
746 if not lines or (peek_test_name_match(lines, test) and
747 is_subtest):
748 if expected_count and test_num <= expected_count:
749 # If parser reaches end of test before
750 # parsing expected number of subtests, print
751 # crashed subtest and record error
752 test.add_error('missing expected subtest!')
753 sub_test.log.extend(sub_log)
754 test.counts.add_status(
755 TestStatus.TEST_CRASHED)
756 print_test_result(sub_test)
757 else:
758 test.log.extend(sub_log)
759 break
760 else:
761 sub_test = parse_test(lines, test_num, sub_log, True)
762 subtests.append(sub_test)
763 test_num += 1
764 test.subtests = subtests
765 if is_subtest:
766 # If not main test, look for test result line
767 test.log.extend(parse_diagnostic(lines))
768 if test.name != "" and not peek_test_name_match(lines, test):
769 test.add_error('missing subtest result line!')
770 else:
771 parse_test_result(lines, test, expected_num)
772
773 # Check for there being no subtests within parent test
774 if parent_test and len(subtests) == 0:
775 # Don't override a bad status if this test had one reported.
776 # Assumption: no subtests means CRASHED is from Test.__init__()
777 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
778 test.status = TestStatus.NO_TESTS
779 test.add_error('0 tests run!')
780
781 # Add statuses to TestCounts attribute in Test object
782 bubble_up_test_results(test)
783 if parent_test and is_subtest:
784 # If test has subtests and is not the main test object, print
785 # footer.
786 print_test_footer(test)
787 elif is_subtest:
788 print_test_result(test)
789 return test
790
791def parse_run_tests(kernel_output: Iterable[str]) -> Test:
792 """
793 Using kernel output, extract KTAP lines, parse the lines for test
794 results and print condensed test results and summary line.
795
796 Parameters:
797 kernel_output - Iterable object contains lines of kernel output
798
799 Return:
800 Test - the main test object with all subtests.
801 """
802 stdout.print_with_timestamp(DIVIDER)
803 lines = extract_tap_lines(kernel_output)
804 test = Test()
805 if not lines:
806 test.name = '<missing>'
807 test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
808 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
809 else:
810 test = parse_test(lines, 0, [], False)
811 if test.status != TestStatus.NO_TESTS:
812 test.status = test.counts.get_status()
813 stdout.print_with_timestamp(DIVIDER)
814 print_summary_line(test)
815 return test
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import textwrap
16
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20from kunit_printer import stdout
21
22class Test:
23 """
24 A class to represent a test parsed from KTAP results. All KTAP
25 results within a test log are stored in a main Test object as
26 subtests.
27
28 Attributes:
29 status : TestStatus - status of the test
30 name : str - name of the test
31 expected_count : int - expected number of subtests (0 if single
32 test case and None if unknown expected number of subtests)
33 subtests : List[Test] - list of subtests
34 log : List[str] - log of KTAP lines that correspond to the test
35 counts : TestCounts - counts of the test statuses and errors of
36 subtests or of the test itself if the test is a single
37 test case.
38 """
39 def __init__(self) -> None:
40 """Creates Test object with default attributes."""
41 self.status = TestStatus.TEST_CRASHED
42 self.name = ''
43 self.expected_count = 0 # type: Optional[int]
44 self.subtests = [] # type: List[Test]
45 self.log = [] # type: List[str]
46 self.counts = TestCounts()
47
48 def __str__(self) -> str:
49 """Returns string representation of a Test class object."""
50 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51 f'{self.subtests}, {self.log}, {self.counts})')
52
53 def __repr__(self) -> str:
54 """Returns string representation of a Test class object."""
55 return str(self)
56
57 def add_error(self, error_message: str) -> None:
58 """Records an error that occurred while parsing this test."""
59 self.counts.errors += 1
60 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62 def ok_status(self) -> bool:
63 """Returns true if the status was ok, i.e. passed or skipped."""
64 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66class TestStatus(Enum):
67 """An enumeration class to represent the status of a test."""
68 SUCCESS = auto()
69 FAILURE = auto()
70 SKIPPED = auto()
71 TEST_CRASHED = auto()
72 NO_TESTS = auto()
73 FAILURE_TO_PARSE_TESTS = auto()
74
75@dataclass
76class TestCounts:
77 """
78 Tracks the counts of statuses of all test cases and any errors within
79 a Test.
80 """
81 passed: int = 0
82 failed: int = 0
83 crashed: int = 0
84 skipped: int = 0
85 errors: int = 0
86
87 def __str__(self) -> str:
88 """Returns the string representation of a TestCounts object."""
89 statuses = [('passed', self.passed), ('failed', self.failed),
90 ('crashed', self.crashed), ('skipped', self.skipped),
91 ('errors', self.errors)]
92 return f'Ran {self.total()} tests: ' + \
93 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95 def total(self) -> int:
96 """Returns the total number of test cases within a test
97 object, where a test case is a test with no subtests.
98 """
99 return (self.passed + self.failed + self.crashed +
100 self.skipped)
101
102 def add_subtest_counts(self, counts: TestCounts) -> None:
103 """
104 Adds the counts of another TestCounts object to the current
105 TestCounts object. Used to add the counts of a subtest to the
106 parent test.
107
108 Parameters:
109 counts - a different TestCounts object whose counts
110 will be added to the counts of the TestCounts object
111 """
112 self.passed += counts.passed
113 self.failed += counts.failed
114 self.crashed += counts.crashed
115 self.skipped += counts.skipped
116 self.errors += counts.errors
117
118 def get_status(self) -> TestStatus:
119 """Returns the aggregated status of a Test using test
120 counts.
121 """
122 if self.total() == 0:
123 return TestStatus.NO_TESTS
124 if self.crashed:
125 # Crashes should take priority.
126 return TestStatus.TEST_CRASHED
127 if self.failed:
128 return TestStatus.FAILURE
129 if self.passed:
130 # No failures or crashes, looks good!
131 return TestStatus.SUCCESS
132 # We have only skipped tests.
133 return TestStatus.SKIPPED
134
135 def add_status(self, status: TestStatus) -> None:
136 """Increments the count for `status`."""
137 if status == TestStatus.SUCCESS:
138 self.passed += 1
139 elif status == TestStatus.FAILURE:
140 self.failed += 1
141 elif status == TestStatus.SKIPPED:
142 self.skipped += 1
143 elif status != TestStatus.NO_TESTS:
144 self.crashed += 1
145
146class LineStream:
147 """
148 A class to represent the lines of kernel output.
149 Provides a lazy peek()/pop() interface over an iterator of
150 (line#, text).
151 """
152 _lines: Iterator[Tuple[int, str]]
153 _next: Tuple[int, str]
154 _need_next: bool
155 _done: bool
156
157 def __init__(self, lines: Iterator[Tuple[int, str]]):
158 """Creates a new LineStream that wraps the given iterator."""
159 self._lines = lines
160 self._done = False
161 self._need_next = True
162 self._next = (0, '')
163
164 def _get_next(self) -> None:
165 """Advances the LineSteam to the next line, if necessary."""
166 if not self._need_next:
167 return
168 try:
169 self._next = next(self._lines)
170 except StopIteration:
171 self._done = True
172 finally:
173 self._need_next = False
174
175 def peek(self) -> str:
176 """Returns the current line, without advancing the LineStream.
177 """
178 self._get_next()
179 return self._next[1]
180
181 def pop(self) -> str:
182 """Returns the current line and advances the LineStream to
183 the next line.
184 """
185 s = self.peek()
186 if self._done:
187 raise ValueError(f'LineStream: going past EOF, last line was {s}')
188 self._need_next = True
189 return s
190
191 def __bool__(self) -> bool:
192 """Returns True if stream has more lines."""
193 self._get_next()
194 return not self._done
195
196 # Only used by kunit_tool_test.py.
197 def __iter__(self) -> Iterator[str]:
198 """Empties all lines stored in LineStream object into
199 Iterator object and returns the Iterator object.
200 """
201 while bool(self):
202 yield self.pop()
203
204 def line_number(self) -> int:
205 """Returns the line number of the current line."""
206 self._get_next()
207 return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214 'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
221 line_num = 0
222 started = False
223 for line in kernel_output:
224 line_num += 1
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
229 prefix_len = len(
230 line.split('KTAP version')[0])
231 started = True
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
237 started = True
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
241 break
242 elif started:
243 # remove the prefix, if any.
244 line = line[prefix_len:]
245 yield line_num, line
246 elif EXECUTOR_ERROR.search(line):
247 yield line_num, line
248 return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254 version_type: str, test: Test) -> None:
255 """
256 Adds error to test object if version number is too high or too
257 low.
258
259 Parameters:
260 version_num - The inputted version number from the parsed KTAP or TAP
261 header line
262 accepted_version - List of accepted KTAP or TAP versions
263 version_type - 'KTAP' or 'TAP' depending on the type of
264 version line.
265 test - Test object for current test being parsed
266 """
267 if version_num < min(accepted_versions):
268 test.add_error(f'{version_type} version lower than expected!')
269 elif version_num > max(accepted_versions):
270 test.add_error(f'{version_type} version higer than expected!')
271
272def parse_ktap_header(lines: LineStream, test: Test) -> bool:
273 """
274 Parses KTAP/TAP header line and checks version number.
275 Returns False if fails to parse KTAP/TAP header line.
276
277 Accepted formats:
278 - 'KTAP version [version number]'
279 - 'TAP version [version number]'
280
281 Parameters:
282 lines - LineStream of KTAP output to parse
283 test - Test object for current test being parsed
284
285 Return:
286 True if successfully parsed KTAP/TAP header line
287 """
288 ktap_match = KTAP_START.match(lines.peek())
289 tap_match = TAP_START.match(lines.peek())
290 if ktap_match:
291 version_num = int(ktap_match.group(1))
292 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
293 elif tap_match:
294 version_num = int(tap_match.group(1))
295 check_version(version_num, TAP_VERSIONS, 'TAP', test)
296 else:
297 return False
298 lines.pop()
299 return True
300
301TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
302
303def parse_test_header(lines: LineStream, test: Test) -> bool:
304 """
305 Parses test header and stores test name in test object.
306 Returns False if fails to parse test header line.
307
308 Accepted format:
309 - '# Subtest: [test name]'
310
311 Parameters:
312 lines - LineStream of KTAP output to parse
313 test - Test object for current test being parsed
314
315 Return:
316 True if successfully parsed test header line
317 """
318 match = TEST_HEADER.match(lines.peek())
319 if not match:
320 return False
321 test.name = match.group(1)
322 lines.pop()
323 return True
324
325TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
326
327def parse_test_plan(lines: LineStream, test: Test) -> bool:
328 """
329 Parses test plan line and stores the expected number of subtests in
330 test object. Reports an error if expected count is 0.
331 Returns False and sets expected_count to None if there is no valid test
332 plan.
333
334 Accepted format:
335 - '1..[number of subtests]'
336
337 Parameters:
338 lines - LineStream of KTAP output to parse
339 test - Test object for current test being parsed
340
341 Return:
342 True if successfully parsed test plan line
343 """
344 match = TEST_PLAN.match(lines.peek())
345 if not match:
346 test.expected_count = None
347 return False
348 expected_count = int(match.group(1))
349 test.expected_count = expected_count
350 lines.pop()
351 return True
352
353TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
354
355TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
356
357def peek_test_name_match(lines: LineStream, test: Test) -> bool:
358 """
359 Matches current line with the format of a test result line and checks
360 if the name matches the name of the current test.
361 Returns False if fails to match format or name.
362
363 Accepted format:
364 - '[ok|not ok] [test number] [-] [test name] [optional skip
365 directive]'
366
367 Parameters:
368 lines - LineStream of KTAP output to parse
369 test - Test object for current test being parsed
370
371 Return:
372 True if matched a test result line and the name matching the
373 expected test name
374 """
375 line = lines.peek()
376 match = TEST_RESULT.match(line)
377 if not match:
378 return False
379 name = match.group(4)
380 return name == test.name
381
382def parse_test_result(lines: LineStream, test: Test,
383 expected_num: int) -> bool:
384 """
385 Parses test result line and stores the status and name in the test
386 object. Reports an error if the test number does not match expected
387 test number.
388 Returns False if fails to parse test result line.
389
390 Note that the SKIP directive is the only direction that causes a
391 change in status.
392
393 Accepted format:
394 - '[ok|not ok] [test number] [-] [test name] [optional skip
395 directive]'
396
397 Parameters:
398 lines - LineStream of KTAP output to parse
399 test - Test object for current test being parsed
400 expected_num - expected test number for current test
401
402 Return:
403 True if successfully parsed a test result line.
404 """
405 line = lines.peek()
406 match = TEST_RESULT.match(line)
407 skip_match = TEST_RESULT_SKIP.match(line)
408
409 # Check if line matches test result line format
410 if not match:
411 return False
412 lines.pop()
413
414 # Set name of test object
415 if skip_match:
416 test.name = skip_match.group(4)
417 else:
418 test.name = match.group(4)
419
420 # Check test num
421 num = int(match.group(2))
422 if num != expected_num:
423 test.add_error(f'Expected test number {expected_num} but found {num}')
424
425 # Set status of test object
426 status = match.group(1)
427 if skip_match:
428 test.status = TestStatus.SKIPPED
429 elif status == 'ok':
430 test.status = TestStatus.SUCCESS
431 else:
432 test.status = TestStatus.FAILURE
433 return True
434
435def parse_diagnostic(lines: LineStream) -> List[str]:
436 """
437 Parse lines that do not match the format of a test result line or
438 test header line and returns them in list.
439
440 Line formats that are not parsed:
441 - '# Subtest: [test name]'
442 - '[ok|not ok] [test number] [-] [test name] [optional skip
443 directive]'
444 - 'KTAP version [version number]'
445
446 Parameters:
447 lines - LineStream of KTAP output to parse
448
449 Return:
450 Log of diagnostic lines
451 """
452 log = [] # type: List[str]
453 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
454 while lines and not any(re.match(lines.peek())
455 for re in non_diagnostic_lines):
456 log.append(lines.pop())
457 return log
458
459
460# Printing helper methods:
461
462DIVIDER = '=' * 60
463
464def format_test_divider(message: str, len_message: int) -> str:
465 """
466 Returns string with message centered in fixed width divider.
467
468 Example:
469 '===================== message example ====================='
470
471 Parameters:
472 message - message to be centered in divider line
473 len_message - length of the message to be printed such that
474 any characters of the color codes are not counted
475
476 Return:
477 String containing message centered in fixed width divider
478 """
479 default_count = 3 # default number of dashes
480 len_1 = default_count
481 len_2 = default_count
482 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
483 if difference > 0:
484 # calculate number of dashes for each side of the divider
485 len_1 = int(difference / 2)
486 len_2 = difference - len_1
487 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
488
489def print_test_header(test: Test) -> None:
490 """
491 Prints test header with test name and optionally the expected number
492 of subtests.
493
494 Example:
495 '=================== example (2 subtests) ==================='
496
497 Parameters:
498 test - Test object representing current test being printed
499 """
500 message = test.name
501 if message != "":
502 # Add a leading space before the subtest counts only if a test name
503 # is provided using a "# Subtest" header line.
504 message += " "
505 if test.expected_count:
506 if test.expected_count == 1:
507 message += '(1 subtest)'
508 else:
509 message += f'({test.expected_count} subtests)'
510 stdout.print_with_timestamp(format_test_divider(message, len(message)))
511
512def print_log(log: Iterable[str]) -> None:
513 """Prints all strings in saved log for test in yellow."""
514 formatted = textwrap.dedent('\n'.join(log))
515 for line in formatted.splitlines():
516 stdout.print_with_timestamp(stdout.yellow(line))
517
518def format_test_result(test: Test) -> str:
519 """
520 Returns string with formatted test result with colored status and test
521 name.
522
523 Example:
524 '[PASSED] example'
525
526 Parameters:
527 test - Test object representing current test being printed
528
529 Return:
530 String containing formatted test result
531 """
532 if test.status == TestStatus.SUCCESS:
533 return stdout.green('[PASSED] ') + test.name
534 if test.status == TestStatus.SKIPPED:
535 return stdout.yellow('[SKIPPED] ') + test.name
536 if test.status == TestStatus.NO_TESTS:
537 return stdout.yellow('[NO TESTS RUN] ') + test.name
538 if test.status == TestStatus.TEST_CRASHED:
539 print_log(test.log)
540 return stdout.red('[CRASHED] ') + test.name
541 print_log(test.log)
542 return stdout.red('[FAILED] ') + test.name
543
544def print_test_result(test: Test) -> None:
545 """
546 Prints result line with status of test.
547
548 Example:
549 '[PASSED] example'
550
551 Parameters:
552 test - Test object representing current test being printed
553 """
554 stdout.print_with_timestamp(format_test_result(test))
555
556def print_test_footer(test: Test) -> None:
557 """
558 Prints test footer with status of test.
559
560 Example:
561 '===================== [PASSED] example ====================='
562
563 Parameters:
564 test - Test object representing current test being printed
565 """
566 message = format_test_result(test)
567 stdout.print_with_timestamp(format_test_divider(message,
568 len(message) - stdout.color_len()))
569
570
571
572def _summarize_failed_tests(test: Test) -> str:
573 """Tries to summarize all the failing subtests in `test`."""
574
575 def failed_names(test: Test, parent_name: str) -> List[str]:
576 # Note: we use 'main' internally for the top-level test.
577 if not parent_name or parent_name == 'main':
578 full_name = test.name
579 else:
580 full_name = parent_name + '.' + test.name
581
582 if not test.subtests: # this is a leaf node
583 return [full_name]
584
585 # If all the children failed, just say this subtest failed.
586 # Don't summarize it down "the top-level test failed", though.
587 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
588 if parent_name and len(failed_subtests) == len(test.subtests):
589 return [full_name]
590
591 all_failures = [] # type: List[str]
592 for t in failed_subtests:
593 all_failures.extend(failed_names(t, full_name))
594 return all_failures
595
596 failures = failed_names(test, '')
597 # If there are too many failures, printing them out will just be noisy.
598 if len(failures) > 10: # this is an arbitrary limit
599 return ''
600
601 return 'Failures: ' + ', '.join(failures)
602
603
604def print_summary_line(test: Test) -> None:
605 """
606 Prints summary line of test object. Color of line is dependent on
607 status of test. Color is green if test passes, yellow if test is
608 skipped, and red if the test fails or crashes. Summary line contains
609 counts of the statuses of the tests subtests or the test itself if it
610 has no subtests.
611
612 Example:
613 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
614 Errors: 0"
615
616 test - Test object representing current test being printed
617 """
618 if test.status == TestStatus.SUCCESS:
619 color = stdout.green
620 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
621 color = stdout.yellow
622 else:
623 color = stdout.red
624 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
625
626 # Summarize failures that might have gone off-screen since we had a lot
627 # of tests (arbitrarily defined as >=100 for now).
628 if test.ok_status() or test.counts.total() < 100:
629 return
630 summarized = _summarize_failed_tests(test)
631 if not summarized:
632 return
633 stdout.print_with_timestamp(color(summarized))
634
635# Other methods:
636
637def bubble_up_test_results(test: Test) -> None:
638 """
639 If the test has subtests, add the test counts of the subtests to the
640 test and check if any of the tests crashed and if so set the test
641 status to crashed. Otherwise if the test has no subtests add the
642 status of the test to the test counts.
643
644 Parameters:
645 test - Test object for current test being parsed
646 """
647 subtests = test.subtests
648 counts = test.counts
649 status = test.status
650 for t in subtests:
651 counts.add_subtest_counts(t.counts)
652 if counts.total() == 0:
653 counts.add_status(status)
654 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
655 test.status = TestStatus.TEST_CRASHED
656
657def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
658 """
659 Finds next test to parse in LineStream, creates new Test object,
660 parses any subtests of the test, populates Test object with all
661 information (status, name) about the test and the Test objects for
662 any subtests, and then returns the Test object. The method accepts
663 three formats of tests:
664
665 Accepted test formats:
666
667 - Main KTAP/TAP header
668
669 Example:
670
671 KTAP version 1
672 1..4
673 [subtests]
674
675 - Subtest header (must include either the KTAP version line or
676 "# Subtest" header line)
677
678 Example (preferred format with both KTAP version line and
679 "# Subtest" line):
680
681 KTAP version 1
682 # Subtest: name
683 1..3
684 [subtests]
685 ok 1 name
686
687 Example (only "# Subtest" line):
688
689 # Subtest: name
690 1..3
691 [subtests]
692 ok 1 name
693
694 Example (only KTAP version line, compliant with KTAP v1 spec):
695
696 KTAP version 1
697 1..3
698 [subtests]
699 ok 1 name
700
701 - Test result line
702
703 Example:
704
705 ok 1 - test
706
707 Parameters:
708 lines - LineStream of KTAP output to parse
709 expected_num - expected test number for test to be parsed
710 log - list of strings containing any preceding diagnostic lines
711 corresponding to the current test
712 is_subtest - boolean indicating whether test is a subtest
713
714 Return:
715 Test object populated with characteristics and any subtests
716 """
717 test = Test()
718 test.log.extend(log)
719
720 # Parse any errors prior to parsing tests
721 err_log = parse_diagnostic(lines)
722 test.log.extend(err_log)
723
724 if not is_subtest:
725 # If parsing the main/top-level test, parse KTAP version line and
726 # test plan
727 test.name = "main"
728 ktap_line = parse_ktap_header(lines, test)
729 test.log.extend(parse_diagnostic(lines))
730 parse_test_plan(lines, test)
731 parent_test = True
732 else:
733 # If not the main test, attempt to parse a test header containing
734 # the KTAP version line and/or subtest header line
735 ktap_line = parse_ktap_header(lines, test)
736 subtest_line = parse_test_header(lines, test)
737 parent_test = (ktap_line or subtest_line)
738 if parent_test:
739 # If KTAP version line and/or subtest header is found, attempt
740 # to parse test plan and print test header
741 test.log.extend(parse_diagnostic(lines))
742 parse_test_plan(lines, test)
743 print_test_header(test)
744 expected_count = test.expected_count
745 subtests = []
746 test_num = 1
747 while parent_test and (expected_count is None or test_num <= expected_count):
748 # Loop to parse any subtests.
749 # Break after parsing expected number of tests or
750 # if expected number of tests is unknown break when test
751 # result line with matching name to subtest header is found
752 # or no more lines in stream.
753 sub_log = parse_diagnostic(lines)
754 sub_test = Test()
755 if not lines or (peek_test_name_match(lines, test) and
756 is_subtest):
757 if expected_count and test_num <= expected_count:
758 # If parser reaches end of test before
759 # parsing expected number of subtests, print
760 # crashed subtest and record error
761 test.add_error('missing expected subtest!')
762 sub_test.log.extend(sub_log)
763 test.counts.add_status(
764 TestStatus.TEST_CRASHED)
765 print_test_result(sub_test)
766 else:
767 test.log.extend(sub_log)
768 break
769 else:
770 sub_test = parse_test(lines, test_num, sub_log, True)
771 subtests.append(sub_test)
772 test_num += 1
773 test.subtests = subtests
774 if is_subtest:
775 # If not main test, look for test result line
776 test.log.extend(parse_diagnostic(lines))
777 if test.name != "" and not peek_test_name_match(lines, test):
778 test.add_error('missing subtest result line!')
779 else:
780 parse_test_result(lines, test, expected_num)
781
782 # Check for there being no subtests within parent test
783 if parent_test and len(subtests) == 0:
784 # Don't override a bad status if this test had one reported.
785 # Assumption: no subtests means CRASHED is from Test.__init__()
786 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
787 print_log(test.log)
788 test.status = TestStatus.NO_TESTS
789 test.add_error('0 tests run!')
790
791 # Add statuses to TestCounts attribute in Test object
792 bubble_up_test_results(test)
793 if parent_test and is_subtest:
794 # If test has subtests and is not the main test object, print
795 # footer.
796 print_test_footer(test)
797 elif is_subtest:
798 print_test_result(test)
799 return test
800
801def parse_run_tests(kernel_output: Iterable[str]) -> Test:
802 """
803 Using kernel output, extract KTAP lines, parse the lines for test
804 results and print condensed test results and summary line.
805
806 Parameters:
807 kernel_output - Iterable object contains lines of kernel output
808
809 Return:
810 Test - the main test object with all subtests.
811 """
812 stdout.print_with_timestamp(DIVIDER)
813 lines = extract_tap_lines(kernel_output)
814 test = Test()
815 if not lines:
816 test.name = '<missing>'
817 test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
818 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
819 else:
820 test = parse_test(lines, 0, [], False)
821 if test.status != TestStatus.NO_TESTS:
822 test.status = test.counts.get_status()
823 stdout.print_with_timestamp(DIVIDER)
824 print_summary_line(test)
825 return test