Loading...
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import textwrap
16
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20from kunit_printer import Printer, stdout
21
22class Test:
23 """
24 A class to represent a test parsed from KTAP results. All KTAP
25 results within a test log are stored in a main Test object as
26 subtests.
27
28 Attributes:
29 status : TestStatus - status of the test
30 name : str - name of the test
31 expected_count : int - expected number of subtests (0 if single
32 test case and None if unknown expected number of subtests)
33 subtests : List[Test] - list of subtests
34 log : List[str] - log of KTAP lines that correspond to the test
35 counts : TestCounts - counts of the test statuses and errors of
36 subtests or of the test itself if the test is a single
37 test case.
38 """
39 def __init__(self) -> None:
40 """Creates Test object with default attributes."""
41 self.status = TestStatus.TEST_CRASHED
42 self.name = ''
43 self.expected_count = 0 # type: Optional[int]
44 self.subtests = [] # type: List[Test]
45 self.log = [] # type: List[str]
46 self.counts = TestCounts()
47
48 def __str__(self) -> str:
49 """Returns string representation of a Test class object."""
50 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51 f'{self.subtests}, {self.log}, {self.counts})')
52
53 def __repr__(self) -> str:
54 """Returns string representation of a Test class object."""
55 return str(self)
56
57 def add_error(self, printer: Printer, error_message: str) -> None:
58 """Records an error that occurred while parsing this test."""
59 self.counts.errors += 1
60 printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62 def ok_status(self) -> bool:
63 """Returns true if the status was ok, i.e. passed or skipped."""
64 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66class TestStatus(Enum):
67 """An enumeration class to represent the status of a test."""
68 SUCCESS = auto()
69 FAILURE = auto()
70 SKIPPED = auto()
71 TEST_CRASHED = auto()
72 NO_TESTS = auto()
73 FAILURE_TO_PARSE_TESTS = auto()
74
75@dataclass
76class TestCounts:
77 """
78 Tracks the counts of statuses of all test cases and any errors within
79 a Test.
80 """
81 passed: int = 0
82 failed: int = 0
83 crashed: int = 0
84 skipped: int = 0
85 errors: int = 0
86
87 def __str__(self) -> str:
88 """Returns the string representation of a TestCounts object."""
89 statuses = [('passed', self.passed), ('failed', self.failed),
90 ('crashed', self.crashed), ('skipped', self.skipped),
91 ('errors', self.errors)]
92 return f'Ran {self.total()} tests: ' + \
93 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95 def total(self) -> int:
96 """Returns the total number of test cases within a test
97 object, where a test case is a test with no subtests.
98 """
99 return (self.passed + self.failed + self.crashed +
100 self.skipped)
101
102 def add_subtest_counts(self, counts: TestCounts) -> None:
103 """
104 Adds the counts of another TestCounts object to the current
105 TestCounts object. Used to add the counts of a subtest to the
106 parent test.
107
108 Parameters:
109 counts - a different TestCounts object whose counts
110 will be added to the counts of the TestCounts object
111 """
112 self.passed += counts.passed
113 self.failed += counts.failed
114 self.crashed += counts.crashed
115 self.skipped += counts.skipped
116 self.errors += counts.errors
117
118 def get_status(self) -> TestStatus:
119 """Returns the aggregated status of a Test using test
120 counts.
121 """
122 if self.total() == 0:
123 return TestStatus.NO_TESTS
124 if self.crashed:
125 # Crashes should take priority.
126 return TestStatus.TEST_CRASHED
127 if self.failed:
128 return TestStatus.FAILURE
129 if self.passed:
130 # No failures or crashes, looks good!
131 return TestStatus.SUCCESS
132 # We have only skipped tests.
133 return TestStatus.SKIPPED
134
135 def add_status(self, status: TestStatus) -> None:
136 """Increments the count for `status`."""
137 if status == TestStatus.SUCCESS:
138 self.passed += 1
139 elif status == TestStatus.FAILURE:
140 self.failed += 1
141 elif status == TestStatus.SKIPPED:
142 self.skipped += 1
143 elif status != TestStatus.NO_TESTS:
144 self.crashed += 1
145
146class LineStream:
147 """
148 A class to represent the lines of kernel output.
149 Provides a lazy peek()/pop() interface over an iterator of
150 (line#, text).
151 """
152 _lines: Iterator[Tuple[int, str]]
153 _next: Tuple[int, str]
154 _need_next: bool
155 _done: bool
156
157 def __init__(self, lines: Iterator[Tuple[int, str]]):
158 """Creates a new LineStream that wraps the given iterator."""
159 self._lines = lines
160 self._done = False
161 self._need_next = True
162 self._next = (0, '')
163
164 def _get_next(self) -> None:
165 """Advances the LineSteam to the next line, if necessary."""
166 if not self._need_next:
167 return
168 try:
169 self._next = next(self._lines)
170 except StopIteration:
171 self._done = True
172 finally:
173 self._need_next = False
174
175 def peek(self) -> str:
176 """Returns the current line, without advancing the LineStream.
177 """
178 self._get_next()
179 return self._next[1]
180
181 def pop(self) -> str:
182 """Returns the current line and advances the LineStream to
183 the next line.
184 """
185 s = self.peek()
186 if self._done:
187 raise ValueError(f'LineStream: going past EOF, last line was {s}')
188 self._need_next = True
189 return s
190
191 def __bool__(self) -> bool:
192 """Returns True if stream has more lines."""
193 self._get_next()
194 return not self._done
195
196 # Only used by kunit_tool_test.py.
197 def __iter__(self) -> Iterator[str]:
198 """Empties all lines stored in LineStream object into
199 Iterator object and returns the Iterator object.
200 """
201 while bool(self):
202 yield self.pop()
203
204 def line_number(self) -> int:
205 """Returns the line number of the current line."""
206 self._get_next()
207 return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214 'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
221 line_num = 0
222 started = False
223 for line in kernel_output:
224 line_num += 1
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
229 prefix_len = len(
230 line.split('KTAP version')[0])
231 started = True
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
237 started = True
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
241 break
242 elif started:
243 # remove the prefix, if any.
244 line = line[prefix_len:]
245 yield line_num, line
246 elif EXECUTOR_ERROR.search(line):
247 yield line_num, line
248 return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254 version_type: str, test: Test, printer: Printer) -> None:
255 """
256 Adds error to test object if version number is too high or too
257 low.
258
259 Parameters:
260 version_num - The inputted version number from the parsed KTAP or TAP
261 header line
262 accepted_version - List of accepted KTAP or TAP versions
263 version_type - 'KTAP' or 'TAP' depending on the type of
264 version line.
265 test - Test object for current test being parsed
266 printer - Printer object to output error
267 """
268 if version_num < min(accepted_versions):
269 test.add_error(printer, f'{version_type} version lower than expected!')
270 elif version_num > max(accepted_versions):
271 test.add_error(printer, f'{version_type} version higer than expected!')
272
273def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
274 """
275 Parses KTAP/TAP header line and checks version number.
276 Returns False if fails to parse KTAP/TAP header line.
277
278 Accepted formats:
279 - 'KTAP version [version number]'
280 - 'TAP version [version number]'
281
282 Parameters:
283 lines - LineStream of KTAP output to parse
284 test - Test object for current test being parsed
285 printer - Printer object to output results
286
287 Return:
288 True if successfully parsed KTAP/TAP header line
289 """
290 ktap_match = KTAP_START.match(lines.peek())
291 tap_match = TAP_START.match(lines.peek())
292 if ktap_match:
293 version_num = int(ktap_match.group(1))
294 check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
295 elif tap_match:
296 version_num = int(tap_match.group(1))
297 check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
298 else:
299 return False
300 lines.pop()
301 return True
302
303TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306 """
307 Parses test header and stores test name in test object.
308 Returns False if fails to parse test header line.
309
310 Accepted format:
311 - '# Subtest: [test name]'
312
313 Parameters:
314 lines - LineStream of KTAP output to parse
315 test - Test object for current test being parsed
316
317 Return:
318 True if successfully parsed test header line
319 """
320 match = TEST_HEADER.match(lines.peek())
321 if not match:
322 return False
323 test.name = match.group(1)
324 lines.pop()
325 return True
326
327TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330 """
331 Parses test plan line and stores the expected number of subtests in
332 test object. Reports an error if expected count is 0.
333 Returns False and sets expected_count to None if there is no valid test
334 plan.
335
336 Accepted format:
337 - '1..[number of subtests]'
338
339 Parameters:
340 lines - LineStream of KTAP output to parse
341 test - Test object for current test being parsed
342
343 Return:
344 True if successfully parsed test plan line
345 """
346 match = TEST_PLAN.match(lines.peek())
347 if not match:
348 test.expected_count = None
349 return False
350 expected_count = int(match.group(1))
351 test.expected_count = expected_count
352 lines.pop()
353 return True
354
355TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360 """
361 Matches current line with the format of a test result line and checks
362 if the name matches the name of the current test.
363 Returns False if fails to match format or name.
364
365 Accepted format:
366 - '[ok|not ok] [test number] [-] [test name] [optional skip
367 directive]'
368
369 Parameters:
370 lines - LineStream of KTAP output to parse
371 test - Test object for current test being parsed
372
373 Return:
374 True if matched a test result line and the name matching the
375 expected test name
376 """
377 line = lines.peek()
378 match = TEST_RESULT.match(line)
379 if not match:
380 return False
381 name = match.group(4)
382 return name == test.name
383
384def parse_test_result(lines: LineStream, test: Test,
385 expected_num: int, printer: Printer) -> bool:
386 """
387 Parses test result line and stores the status and name in the test
388 object. Reports an error if the test number does not match expected
389 test number.
390 Returns False if fails to parse test result line.
391
392 Note that the SKIP directive is the only direction that causes a
393 change in status.
394
395 Accepted format:
396 - '[ok|not ok] [test number] [-] [test name] [optional skip
397 directive]'
398
399 Parameters:
400 lines - LineStream of KTAP output to parse
401 test - Test object for current test being parsed
402 expected_num - expected test number for current test
403 printer - Printer object to output results
404
405 Return:
406 True if successfully parsed a test result line.
407 """
408 line = lines.peek()
409 match = TEST_RESULT.match(line)
410 skip_match = TEST_RESULT_SKIP.match(line)
411
412 # Check if line matches test result line format
413 if not match:
414 return False
415 lines.pop()
416
417 # Set name of test object
418 if skip_match:
419 test.name = skip_match.group(4)
420 else:
421 test.name = match.group(4)
422
423 # Check test num
424 num = int(match.group(2))
425 if num != expected_num:
426 test.add_error(printer, f'Expected test number {expected_num} but found {num}')
427
428 # Set status of test object
429 status = match.group(1)
430 if skip_match:
431 test.status = TestStatus.SKIPPED
432 elif status == 'ok':
433 test.status = TestStatus.SUCCESS
434 else:
435 test.status = TestStatus.FAILURE
436 return True
437
438def parse_diagnostic(lines: LineStream) -> List[str]:
439 """
440 Parse lines that do not match the format of a test result line or
441 test header line and returns them in list.
442
443 Line formats that are not parsed:
444 - '# Subtest: [test name]'
445 - '[ok|not ok] [test number] [-] [test name] [optional skip
446 directive]'
447 - 'KTAP version [version number]'
448
449 Parameters:
450 lines - LineStream of KTAP output to parse
451
452 Return:
453 Log of diagnostic lines
454 """
455 log = [] # type: List[str]
456 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
457 while lines and not any(re.match(lines.peek())
458 for re in non_diagnostic_lines):
459 log.append(lines.pop())
460 return log
461
462
463# Printing helper methods:
464
465DIVIDER = '=' * 60
466
467def format_test_divider(message: str, len_message: int) -> str:
468 """
469 Returns string with message centered in fixed width divider.
470
471 Example:
472 '===================== message example ====================='
473
474 Parameters:
475 message - message to be centered in divider line
476 len_message - length of the message to be printed such that
477 any characters of the color codes are not counted
478
479 Return:
480 String containing message centered in fixed width divider
481 """
482 default_count = 3 # default number of dashes
483 len_1 = default_count
484 len_2 = default_count
485 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
486 if difference > 0:
487 # calculate number of dashes for each side of the divider
488 len_1 = int(difference / 2)
489 len_2 = difference - len_1
490 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
491
492def print_test_header(test: Test, printer: Printer) -> None:
493 """
494 Prints test header with test name and optionally the expected number
495 of subtests.
496
497 Example:
498 '=================== example (2 subtests) ==================='
499
500 Parameters:
501 test - Test object representing current test being printed
502 printer - Printer object to output results
503 """
504 message = test.name
505 if message != "":
506 # Add a leading space before the subtest counts only if a test name
507 # is provided using a "# Subtest" header line.
508 message += " "
509 if test.expected_count:
510 if test.expected_count == 1:
511 message += '(1 subtest)'
512 else:
513 message += f'({test.expected_count} subtests)'
514 printer.print_with_timestamp(format_test_divider(message, len(message)))
515
516def print_log(log: Iterable[str], printer: Printer) -> None:
517 """Prints all strings in saved log for test in yellow."""
518 formatted = textwrap.dedent('\n'.join(log))
519 for line in formatted.splitlines():
520 printer.print_with_timestamp(printer.yellow(line))
521
522def format_test_result(test: Test, printer: Printer) -> str:
523 """
524 Returns string with formatted test result with colored status and test
525 name.
526
527 Example:
528 '[PASSED] example'
529
530 Parameters:
531 test - Test object representing current test being printed
532 printer - Printer object to output results
533
534 Return:
535 String containing formatted test result
536 """
537 if test.status == TestStatus.SUCCESS:
538 return printer.green('[PASSED] ') + test.name
539 if test.status == TestStatus.SKIPPED:
540 return printer.yellow('[SKIPPED] ') + test.name
541 if test.status == TestStatus.NO_TESTS:
542 return printer.yellow('[NO TESTS RUN] ') + test.name
543 if test.status == TestStatus.TEST_CRASHED:
544 print_log(test.log, printer)
545 return stdout.red('[CRASHED] ') + test.name
546 print_log(test.log, printer)
547 return printer.red('[FAILED] ') + test.name
548
549def print_test_result(test: Test, printer: Printer) -> None:
550 """
551 Prints result line with status of test.
552
553 Example:
554 '[PASSED] example'
555
556 Parameters:
557 test - Test object representing current test being printed
558 printer - Printer object
559 """
560 printer.print_with_timestamp(format_test_result(test, printer))
561
562def print_test_footer(test: Test, printer: Printer) -> None:
563 """
564 Prints test footer with status of test.
565
566 Example:
567 '===================== [PASSED] example ====================='
568
569 Parameters:
570 test - Test object representing current test being printed
571 printer - Printer object to output results
572 """
573 message = format_test_result(test, printer)
574 printer.print_with_timestamp(format_test_divider(message,
575 len(message) - printer.color_len()))
576
577def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
578 """
579 Prints Test object to given printer. For a child test, the result line is
580 printed. For a parent test, the test header, all child test results, and
581 the test footer are all printed. If failed_only is true, only failed/crashed
582 tests will be printed.
583
584 Parameters:
585 test - Test object to print
586 failed_only - True if only failed/crashed tests should be printed.
587 printer - Printer object to output results
588 """
589 if test.name == "main":
590 printer.print_with_timestamp(DIVIDER)
591 for subtest in test.subtests:
592 print_test(subtest, failed_only, printer)
593 printer.print_with_timestamp(DIVIDER)
594 elif test.subtests != []:
595 if not failed_only or not test.ok_status():
596 print_test_header(test, printer)
597 for subtest in test.subtests:
598 print_test(subtest, failed_only, printer)
599 print_test_footer(test, printer)
600 else:
601 if not failed_only or not test.ok_status():
602 print_test_result(test, printer)
603
604def _summarize_failed_tests(test: Test) -> str:
605 """Tries to summarize all the failing subtests in `test`."""
606
607 def failed_names(test: Test, parent_name: str) -> List[str]:
608 # Note: we use 'main' internally for the top-level test.
609 if not parent_name or parent_name == 'main':
610 full_name = test.name
611 else:
612 full_name = parent_name + '.' + test.name
613
614 if not test.subtests: # this is a leaf node
615 return [full_name]
616
617 # If all the children failed, just say this subtest failed.
618 # Don't summarize it down "the top-level test failed", though.
619 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
620 if parent_name and len(failed_subtests) == len(test.subtests):
621 return [full_name]
622
623 all_failures = [] # type: List[str]
624 for t in failed_subtests:
625 all_failures.extend(failed_names(t, full_name))
626 return all_failures
627
628 failures = failed_names(test, '')
629 # If there are too many failures, printing them out will just be noisy.
630 if len(failures) > 10: # this is an arbitrary limit
631 return ''
632
633 return 'Failures: ' + ', '.join(failures)
634
635
636def print_summary_line(test: Test, printer: Printer) -> None:
637 """
638 Prints summary line of test object. Color of line is dependent on
639 status of test. Color is green if test passes, yellow if test is
640 skipped, and red if the test fails or crashes. Summary line contains
641 counts of the statuses of the tests subtests or the test itself if it
642 has no subtests.
643
644 Example:
645 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
646 Errors: 0"
647
648 test - Test object representing current test being printed
649 printer - Printer object to output results
650 """
651 if test.status == TestStatus.SUCCESS:
652 color = stdout.green
653 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
654 color = stdout.yellow
655 else:
656 color = stdout.red
657 printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
658
659 # Summarize failures that might have gone off-screen since we had a lot
660 # of tests (arbitrarily defined as >=100 for now).
661 if test.ok_status() or test.counts.total() < 100:
662 return
663 summarized = _summarize_failed_tests(test)
664 if not summarized:
665 return
666 printer.print_with_timestamp(color(summarized))
667
668# Other methods:
669
670def bubble_up_test_results(test: Test) -> None:
671 """
672 If the test has subtests, add the test counts of the subtests to the
673 test and check if any of the tests crashed and if so set the test
674 status to crashed. Otherwise if the test has no subtests add the
675 status of the test to the test counts.
676
677 Parameters:
678 test - Test object for current test being parsed
679 """
680 subtests = test.subtests
681 counts = test.counts
682 status = test.status
683 for t in subtests:
684 counts.add_subtest_counts(t.counts)
685 if counts.total() == 0:
686 counts.add_status(status)
687 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
688 test.status = TestStatus.TEST_CRASHED
689
690def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
691 """
692 Finds next test to parse in LineStream, creates new Test object,
693 parses any subtests of the test, populates Test object with all
694 information (status, name) about the test and the Test objects for
695 any subtests, and then returns the Test object. The method accepts
696 three formats of tests:
697
698 Accepted test formats:
699
700 - Main KTAP/TAP header
701
702 Example:
703
704 KTAP version 1
705 1..4
706 [subtests]
707
708 - Subtest header (must include either the KTAP version line or
709 "# Subtest" header line)
710
711 Example (preferred format with both KTAP version line and
712 "# Subtest" line):
713
714 KTAP version 1
715 # Subtest: name
716 1..3
717 [subtests]
718 ok 1 name
719
720 Example (only "# Subtest" line):
721
722 # Subtest: name
723 1..3
724 [subtests]
725 ok 1 name
726
727 Example (only KTAP version line, compliant with KTAP v1 spec):
728
729 KTAP version 1
730 1..3
731 [subtests]
732 ok 1 name
733
734 - Test result line
735
736 Example:
737
738 ok 1 - test
739
740 Parameters:
741 lines - LineStream of KTAP output to parse
742 expected_num - expected test number for test to be parsed
743 log - list of strings containing any preceding diagnostic lines
744 corresponding to the current test
745 is_subtest - boolean indicating whether test is a subtest
746 printer - Printer object to output results
747
748 Return:
749 Test object populated with characteristics and any subtests
750 """
751 test = Test()
752 test.log.extend(log)
753
754 # Parse any errors prior to parsing tests
755 err_log = parse_diagnostic(lines)
756 test.log.extend(err_log)
757
758 if not is_subtest:
759 # If parsing the main/top-level test, parse KTAP version line and
760 # test plan
761 test.name = "main"
762 ktap_line = parse_ktap_header(lines, test, printer)
763 test.log.extend(parse_diagnostic(lines))
764 parse_test_plan(lines, test)
765 parent_test = True
766 else:
767 # If not the main test, attempt to parse a test header containing
768 # the KTAP version line and/or subtest header line
769 ktap_line = parse_ktap_header(lines, test, printer)
770 subtest_line = parse_test_header(lines, test)
771 parent_test = (ktap_line or subtest_line)
772 if parent_test:
773 # If KTAP version line and/or subtest header is found, attempt
774 # to parse test plan and print test header
775 test.log.extend(parse_diagnostic(lines))
776 parse_test_plan(lines, test)
777 print_test_header(test, printer)
778 expected_count = test.expected_count
779 subtests = []
780 test_num = 1
781 while parent_test and (expected_count is None or test_num <= expected_count):
782 # Loop to parse any subtests.
783 # Break after parsing expected number of tests or
784 # if expected number of tests is unknown break when test
785 # result line with matching name to subtest header is found
786 # or no more lines in stream.
787 sub_log = parse_diagnostic(lines)
788 sub_test = Test()
789 if not lines or (peek_test_name_match(lines, test) and
790 is_subtest):
791 if expected_count and test_num <= expected_count:
792 # If parser reaches end of test before
793 # parsing expected number of subtests, print
794 # crashed subtest and record error
795 test.add_error(printer, 'missing expected subtest!')
796 sub_test.log.extend(sub_log)
797 test.counts.add_status(
798 TestStatus.TEST_CRASHED)
799 print_test_result(sub_test, printer)
800 else:
801 test.log.extend(sub_log)
802 break
803 else:
804 sub_test = parse_test(lines, test_num, sub_log, True, printer)
805 subtests.append(sub_test)
806 test_num += 1
807 test.subtests = subtests
808 if is_subtest:
809 # If not main test, look for test result line
810 test.log.extend(parse_diagnostic(lines))
811 if test.name != "" and not peek_test_name_match(lines, test):
812 test.add_error(printer, 'missing subtest result line!')
813 else:
814 parse_test_result(lines, test, expected_num, printer)
815
816 # Check for there being no subtests within parent test
817 if parent_test and len(subtests) == 0:
818 # Don't override a bad status if this test had one reported.
819 # Assumption: no subtests means CRASHED is from Test.__init__()
820 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
821 print_log(test.log, printer)
822 test.status = TestStatus.NO_TESTS
823 test.add_error(printer, '0 tests run!')
824
825 # Add statuses to TestCounts attribute in Test object
826 bubble_up_test_results(test)
827 if parent_test and is_subtest:
828 # If test has subtests and is not the main test object, print
829 # footer.
830 print_test_footer(test, printer)
831 elif is_subtest:
832 print_test_result(test, printer)
833 return test
834
835def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
836 """
837 Using kernel output, extract KTAP lines, parse the lines for test
838 results and print condensed test results and summary line.
839
840 Parameters:
841 kernel_output - Iterable object contains lines of kernel output
842 printer - Printer object to output results
843
844 Return:
845 Test - the main test object with all subtests.
846 """
847 printer.print_with_timestamp(DIVIDER)
848 lines = extract_tap_lines(kernel_output)
849 test = Test()
850 if not lines:
851 test.name = '<missing>'
852 test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?')
853 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
854 else:
855 test = parse_test(lines, 0, [], False, printer)
856 if test.status != TestStatus.NO_TESTS:
857 test.status = test.counts.get_status()
858 printer.print_with_timestamp(DIVIDER)
859 return test