Loading...
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import textwrap
16
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20from kunit_printer import Printer, stdout
21
22class Test:
23 """
24 A class to represent a test parsed from KTAP results. All KTAP
25 results within a test log are stored in a main Test object as
26 subtests.
27
28 Attributes:
29 status : TestStatus - status of the test
30 name : str - name of the test
31 expected_count : int - expected number of subtests (0 if single
32 test case and None if unknown expected number of subtests)
33 subtests : List[Test] - list of subtests
34 log : List[str] - log of KTAP lines that correspond to the test
35 counts : TestCounts - counts of the test statuses and errors of
36 subtests or of the test itself if the test is a single
37 test case.
38 """
39 def __init__(self) -> None:
40 """Creates Test object with default attributes."""
41 self.status = TestStatus.TEST_CRASHED
42 self.name = ''
43 self.expected_count = 0 # type: Optional[int]
44 self.subtests = [] # type: List[Test]
45 self.log = [] # type: List[str]
46 self.counts = TestCounts()
47
48 def __str__(self) -> str:
49 """Returns string representation of a Test class object."""
50 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51 f'{self.subtests}, {self.log}, {self.counts})')
52
53 def __repr__(self) -> str:
54 """Returns string representation of a Test class object."""
55 return str(self)
56
57 def add_error(self, printer: Printer, error_message: str) -> None:
58 """Records an error that occurred while parsing this test."""
59 self.counts.errors += 1
60 printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62 def ok_status(self) -> bool:
63 """Returns true if the status was ok, i.e. passed or skipped."""
64 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66class TestStatus(Enum):
67 """An enumeration class to represent the status of a test."""
68 SUCCESS = auto()
69 FAILURE = auto()
70 SKIPPED = auto()
71 TEST_CRASHED = auto()
72 NO_TESTS = auto()
73 FAILURE_TO_PARSE_TESTS = auto()
74
75@dataclass
76class TestCounts:
77 """
78 Tracks the counts of statuses of all test cases and any errors within
79 a Test.
80 """
81 passed: int = 0
82 failed: int = 0
83 crashed: int = 0
84 skipped: int = 0
85 errors: int = 0
86
87 def __str__(self) -> str:
88 """Returns the string representation of a TestCounts object."""
89 statuses = [('passed', self.passed), ('failed', self.failed),
90 ('crashed', self.crashed), ('skipped', self.skipped),
91 ('errors', self.errors)]
92 return f'Ran {self.total()} tests: ' + \
93 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95 def total(self) -> int:
96 """Returns the total number of test cases within a test
97 object, where a test case is a test with no subtests.
98 """
99 return (self.passed + self.failed + self.crashed +
100 self.skipped)
101
102 def add_subtest_counts(self, counts: TestCounts) -> None:
103 """
104 Adds the counts of another TestCounts object to the current
105 TestCounts object. Used to add the counts of a subtest to the
106 parent test.
107
108 Parameters:
109 counts - a different TestCounts object whose counts
110 will be added to the counts of the TestCounts object
111 """
112 self.passed += counts.passed
113 self.failed += counts.failed
114 self.crashed += counts.crashed
115 self.skipped += counts.skipped
116 self.errors += counts.errors
117
118 def get_status(self) -> TestStatus:
119 """Returns the aggregated status of a Test using test
120 counts.
121 """
122 if self.total() == 0:
123 return TestStatus.NO_TESTS
124 if self.crashed:
125 # Crashes should take priority.
126 return TestStatus.TEST_CRASHED
127 if self.failed:
128 return TestStatus.FAILURE
129 if self.passed:
130 # No failures or crashes, looks good!
131 return TestStatus.SUCCESS
132 # We have only skipped tests.
133 return TestStatus.SKIPPED
134
135 def add_status(self, status: TestStatus) -> None:
136 """Increments the count for `status`."""
137 if status == TestStatus.SUCCESS:
138 self.passed += 1
139 elif status == TestStatus.FAILURE:
140 self.failed += 1
141 elif status == TestStatus.SKIPPED:
142 self.skipped += 1
143 elif status != TestStatus.NO_TESTS:
144 self.crashed += 1
145
146class LineStream:
147 """
148 A class to represent the lines of kernel output.
149 Provides a lazy peek()/pop() interface over an iterator of
150 (line#, text).
151 """
152 _lines: Iterator[Tuple[int, str]]
153 _next: Tuple[int, str]
154 _need_next: bool
155 _done: bool
156
157 def __init__(self, lines: Iterator[Tuple[int, str]]):
158 """Creates a new LineStream that wraps the given iterator."""
159 self._lines = lines
160 self._done = False
161 self._need_next = True
162 self._next = (0, '')
163
164 def _get_next(self) -> None:
165 """Advances the LineSteam to the next line, if necessary."""
166 if not self._need_next:
167 return
168 try:
169 self._next = next(self._lines)
170 except StopIteration:
171 self._done = True
172 finally:
173 self._need_next = False
174
175 def peek(self) -> str:
176 """Returns the current line, without advancing the LineStream.
177 """
178 self._get_next()
179 return self._next[1]
180
181 def pop(self) -> str:
182 """Returns the current line and advances the LineStream to
183 the next line.
184 """
185 s = self.peek()
186 if self._done:
187 raise ValueError(f'LineStream: going past EOF, last line was {s}')
188 self._need_next = True
189 return s
190
191 def __bool__(self) -> bool:
192 """Returns True if stream has more lines."""
193 self._get_next()
194 return not self._done
195
196 # Only used by kunit_tool_test.py.
197 def __iter__(self) -> Iterator[str]:
198 """Empties all lines stored in LineStream object into
199 Iterator object and returns the Iterator object.
200 """
201 while bool(self):
202 yield self.pop()
203
204 def line_number(self) -> int:
205 """Returns the line number of the current line."""
206 self._get_next()
207 return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214 'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
221 line_num = 0
222 started = False
223 for line in kernel_output:
224 line_num += 1
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
229 prefix_len = len(
230 line.split('KTAP version')[0])
231 started = True
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
237 started = True
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
241 break
242 elif started:
243 # remove the prefix, if any.
244 line = line[prefix_len:]
245 yield line_num, line
246 elif EXECUTOR_ERROR.search(line):
247 yield line_num, line
248 return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254 version_type: str, test: Test, printer: Printer) -> None:
255 """
256 Adds error to test object if version number is too high or too
257 low.
258
259 Parameters:
260 version_num - The inputted version number from the parsed KTAP or TAP
261 header line
262 accepted_version - List of accepted KTAP or TAP versions
263 version_type - 'KTAP' or 'TAP' depending on the type of
264 version line.
265 test - Test object for current test being parsed
266 printer - Printer object to output error
267 """
268 if version_num < min(accepted_versions):
269 test.add_error(printer, f'{version_type} version lower than expected!')
270 elif version_num > max(accepted_versions):
271 test.add_error(printer, f'{version_type} version higer than expected!')
272
273def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
274 """
275 Parses KTAP/TAP header line and checks version number.
276 Returns False if fails to parse KTAP/TAP header line.
277
278 Accepted formats:
279 - 'KTAP version [version number]'
280 - 'TAP version [version number]'
281
282 Parameters:
283 lines - LineStream of KTAP output to parse
284 test - Test object for current test being parsed
285 printer - Printer object to output results
286
287 Return:
288 True if successfully parsed KTAP/TAP header line
289 """
290 ktap_match = KTAP_START.match(lines.peek())
291 tap_match = TAP_START.match(lines.peek())
292 if ktap_match:
293 version_num = int(ktap_match.group(1))
294 check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
295 elif tap_match:
296 version_num = int(tap_match.group(1))
297 check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
298 else:
299 return False
300 lines.pop()
301 return True
302
303TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306 """
307 Parses test header and stores test name in test object.
308 Returns False if fails to parse test header line.
309
310 Accepted format:
311 - '# Subtest: [test name]'
312
313 Parameters:
314 lines - LineStream of KTAP output to parse
315 test - Test object for current test being parsed
316
317 Return:
318 True if successfully parsed test header line
319 """
320 match = TEST_HEADER.match(lines.peek())
321 if not match:
322 return False
323 test.name = match.group(1)
324 lines.pop()
325 return True
326
327TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330 """
331 Parses test plan line and stores the expected number of subtests in
332 test object. Reports an error if expected count is 0.
333 Returns False and sets expected_count to None if there is no valid test
334 plan.
335
336 Accepted format:
337 - '1..[number of subtests]'
338
339 Parameters:
340 lines - LineStream of KTAP output to parse
341 test - Test object for current test being parsed
342
343 Return:
344 True if successfully parsed test plan line
345 """
346 match = TEST_PLAN.match(lines.peek())
347 if not match:
348 test.expected_count = None
349 return False
350 expected_count = int(match.group(1))
351 test.expected_count = expected_count
352 lines.pop()
353 return True
354
355TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360 """
361 Matches current line with the format of a test result line and checks
362 if the name matches the name of the current test.
363 Returns False if fails to match format or name.
364
365 Accepted format:
366 - '[ok|not ok] [test number] [-] [test name] [optional skip
367 directive]'
368
369 Parameters:
370 lines - LineStream of KTAP output to parse
371 test - Test object for current test being parsed
372
373 Return:
374 True if matched a test result line and the name matching the
375 expected test name
376 """
377 line = lines.peek()
378 match = TEST_RESULT.match(line)
379 if not match:
380 return False
381 name = match.group(4)
382 return name == test.name
383
384def parse_test_result(lines: LineStream, test: Test,
385 expected_num: int, printer: Printer) -> bool:
386 """
387 Parses test result line and stores the status and name in the test
388 object. Reports an error if the test number does not match expected
389 test number.
390 Returns False if fails to parse test result line.
391
392 Note that the SKIP directive is the only direction that causes a
393 change in status.
394
395 Accepted format:
396 - '[ok|not ok] [test number] [-] [test name] [optional skip
397 directive]'
398
399 Parameters:
400 lines - LineStream of KTAP output to parse
401 test - Test object for current test being parsed
402 expected_num - expected test number for current test
403 printer - Printer object to output results
404
405 Return:
406 True if successfully parsed a test result line.
407 """
408 line = lines.peek()
409 match = TEST_RESULT.match(line)
410 skip_match = TEST_RESULT_SKIP.match(line)
411
412 # Check if line matches test result line format
413 if not match:
414 return False
415 lines.pop()
416
417 # Set name of test object
418 if skip_match:
419 test.name = skip_match.group(4)
420 else:
421 test.name = match.group(4)
422
423 # Check test num
424 num = int(match.group(2))
425 if num != expected_num:
426 test.add_error(printer, f'Expected test number {expected_num} but found {num}')
427
428 # Set status of test object
429 status = match.group(1)
430 if skip_match:
431 test.status = TestStatus.SKIPPED
432 elif status == 'ok':
433 test.status = TestStatus.SUCCESS
434 else:
435 test.status = TestStatus.FAILURE
436 return True
437
438def parse_diagnostic(lines: LineStream) -> List[str]:
439 """
440 Parse lines that do not match the format of a test result line or
441 test header line and returns them in list.
442
443 Line formats that are not parsed:
444 - '# Subtest: [test name]'
445 - '[ok|not ok] [test number] [-] [test name] [optional skip
446 directive]'
447 - 'KTAP version [version number]'
448
449 Parameters:
450 lines - LineStream of KTAP output to parse
451
452 Return:
453 Log of diagnostic lines
454 """
455 log = [] # type: List[str]
456 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
457 while lines and not any(re.match(lines.peek())
458 for re in non_diagnostic_lines):
459 log.append(lines.pop())
460 return log
461
462
463# Printing helper methods:
464
465DIVIDER = '=' * 60
466
467def format_test_divider(message: str, len_message: int) -> str:
468 """
469 Returns string with message centered in fixed width divider.
470
471 Example:
472 '===================== message example ====================='
473
474 Parameters:
475 message - message to be centered in divider line
476 len_message - length of the message to be printed such that
477 any characters of the color codes are not counted
478
479 Return:
480 String containing message centered in fixed width divider
481 """
482 default_count = 3 # default number of dashes
483 len_1 = default_count
484 len_2 = default_count
485 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
486 if difference > 0:
487 # calculate number of dashes for each side of the divider
488 len_1 = int(difference / 2)
489 len_2 = difference - len_1
490 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
491
492def print_test_header(test: Test, printer: Printer) -> None:
493 """
494 Prints test header with test name and optionally the expected number
495 of subtests.
496
497 Example:
498 '=================== example (2 subtests) ==================='
499
500 Parameters:
501 test - Test object representing current test being printed
502 printer - Printer object to output results
503 """
504 message = test.name
505 if message != "":
506 # Add a leading space before the subtest counts only if a test name
507 # is provided using a "# Subtest" header line.
508 message += " "
509 if test.expected_count:
510 if test.expected_count == 1:
511 message += '(1 subtest)'
512 else:
513 message += f'({test.expected_count} subtests)'
514 printer.print_with_timestamp(format_test_divider(message, len(message)))
515
516def print_log(log: Iterable[str], printer: Printer) -> None:
517 """Prints all strings in saved log for test in yellow."""
518 formatted = textwrap.dedent('\n'.join(log))
519 for line in formatted.splitlines():
520 printer.print_with_timestamp(printer.yellow(line))
521
522def format_test_result(test: Test, printer: Printer) -> str:
523 """
524 Returns string with formatted test result with colored status and test
525 name.
526
527 Example:
528 '[PASSED] example'
529
530 Parameters:
531 test - Test object representing current test being printed
532 printer - Printer object to output results
533
534 Return:
535 String containing formatted test result
536 """
537 if test.status == TestStatus.SUCCESS:
538 return printer.green('[PASSED] ') + test.name
539 if test.status == TestStatus.SKIPPED:
540 return printer.yellow('[SKIPPED] ') + test.name
541 if test.status == TestStatus.NO_TESTS:
542 return printer.yellow('[NO TESTS RUN] ') + test.name
543 if test.status == TestStatus.TEST_CRASHED:
544 print_log(test.log, printer)
545 return stdout.red('[CRASHED] ') + test.name
546 print_log(test.log, printer)
547 return printer.red('[FAILED] ') + test.name
548
549def print_test_result(test: Test, printer: Printer) -> None:
550 """
551 Prints result line with status of test.
552
553 Example:
554 '[PASSED] example'
555
556 Parameters:
557 test - Test object representing current test being printed
558 printer - Printer object
559 """
560 printer.print_with_timestamp(format_test_result(test, printer))
561
562def print_test_footer(test: Test, printer: Printer) -> None:
563 """
564 Prints test footer with status of test.
565
566 Example:
567 '===================== [PASSED] example ====================='
568
569 Parameters:
570 test - Test object representing current test being printed
571 printer - Printer object to output results
572 """
573 message = format_test_result(test, printer)
574 printer.print_with_timestamp(format_test_divider(message,
575 len(message) - printer.color_len()))
576
577def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
578 """
579 Prints Test object to given printer. For a child test, the result line is
580 printed. For a parent test, the test header, all child test results, and
581 the test footer are all printed. If failed_only is true, only failed/crashed
582 tests will be printed.
583
584 Parameters:
585 test - Test object to print
586 failed_only - True if only failed/crashed tests should be printed.
587 printer - Printer object to output results
588 """
589 if test.name == "main":
590 printer.print_with_timestamp(DIVIDER)
591 for subtest in test.subtests:
592 print_test(subtest, failed_only, printer)
593 printer.print_with_timestamp(DIVIDER)
594 elif test.subtests != []:
595 if not failed_only or not test.ok_status():
596 print_test_header(test, printer)
597 for subtest in test.subtests:
598 print_test(subtest, failed_only, printer)
599 print_test_footer(test, printer)
600 else:
601 if not failed_only or not test.ok_status():
602 print_test_result(test, printer)
603
604def _summarize_failed_tests(test: Test) -> str:
605 """Tries to summarize all the failing subtests in `test`."""
606
607 def failed_names(test: Test, parent_name: str) -> List[str]:
608 # Note: we use 'main' internally for the top-level test.
609 if not parent_name or parent_name == 'main':
610 full_name = test.name
611 else:
612 full_name = parent_name + '.' + test.name
613
614 if not test.subtests: # this is a leaf node
615 return [full_name]
616
617 # If all the children failed, just say this subtest failed.
618 # Don't summarize it down "the top-level test failed", though.
619 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
620 if parent_name and len(failed_subtests) == len(test.subtests):
621 return [full_name]
622
623 all_failures = [] # type: List[str]
624 for t in failed_subtests:
625 all_failures.extend(failed_names(t, full_name))
626 return all_failures
627
628 failures = failed_names(test, '')
629 # If there are too many failures, printing them out will just be noisy.
630 if len(failures) > 10: # this is an arbitrary limit
631 return ''
632
633 return 'Failures: ' + ', '.join(failures)
634
635
636def print_summary_line(test: Test, printer: Printer) -> None:
637 """
638 Prints summary line of test object. Color of line is dependent on
639 status of test. Color is green if test passes, yellow if test is
640 skipped, and red if the test fails or crashes. Summary line contains
641 counts of the statuses of the tests subtests or the test itself if it
642 has no subtests.
643
644 Example:
645 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
646 Errors: 0"
647
648 test - Test object representing current test being printed
649 printer - Printer object to output results
650 """
651 if test.status == TestStatus.SUCCESS:
652 color = stdout.green
653 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
654 color = stdout.yellow
655 else:
656 color = stdout.red
657 printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
658
659 # Summarize failures that might have gone off-screen since we had a lot
660 # of tests (arbitrarily defined as >=100 for now).
661 if test.ok_status() or test.counts.total() < 100:
662 return
663 summarized = _summarize_failed_tests(test)
664 if not summarized:
665 return
666 printer.print_with_timestamp(color(summarized))
667
668# Other methods:
669
670def bubble_up_test_results(test: Test) -> None:
671 """
672 If the test has subtests, add the test counts of the subtests to the
673 test and check if any of the tests crashed and if so set the test
674 status to crashed. Otherwise if the test has no subtests add the
675 status of the test to the test counts.
676
677 Parameters:
678 test - Test object for current test being parsed
679 """
680 subtests = test.subtests
681 counts = test.counts
682 status = test.status
683 for t in subtests:
684 counts.add_subtest_counts(t.counts)
685 if counts.total() == 0:
686 counts.add_status(status)
687 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
688 test.status = TestStatus.TEST_CRASHED
689
690def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
691 """
692 Finds next test to parse in LineStream, creates new Test object,
693 parses any subtests of the test, populates Test object with all
694 information (status, name) about the test and the Test objects for
695 any subtests, and then returns the Test object. The method accepts
696 three formats of tests:
697
698 Accepted test formats:
699
700 - Main KTAP/TAP header
701
702 Example:
703
704 KTAP version 1
705 1..4
706 [subtests]
707
708 - Subtest header (must include either the KTAP version line or
709 "# Subtest" header line)
710
711 Example (preferred format with both KTAP version line and
712 "# Subtest" line):
713
714 KTAP version 1
715 # Subtest: name
716 1..3
717 [subtests]
718 ok 1 name
719
720 Example (only "# Subtest" line):
721
722 # Subtest: name
723 1..3
724 [subtests]
725 ok 1 name
726
727 Example (only KTAP version line, compliant with KTAP v1 spec):
728
729 KTAP version 1
730 1..3
731 [subtests]
732 ok 1 name
733
734 - Test result line
735
736 Example:
737
738 ok 1 - test
739
740 Parameters:
741 lines - LineStream of KTAP output to parse
742 expected_num - expected test number for test to be parsed
743 log - list of strings containing any preceding diagnostic lines
744 corresponding to the current test
745 is_subtest - boolean indicating whether test is a subtest
746 printer - Printer object to output results
747
748 Return:
749 Test object populated with characteristics and any subtests
750 """
751 test = Test()
752 test.log.extend(log)
753
754 # Parse any errors prior to parsing tests
755 err_log = parse_diagnostic(lines)
756 test.log.extend(err_log)
757
758 if not is_subtest:
759 # If parsing the main/top-level test, parse KTAP version line and
760 # test plan
761 test.name = "main"
762 ktap_line = parse_ktap_header(lines, test, printer)
763 test.log.extend(parse_diagnostic(lines))
764 parse_test_plan(lines, test)
765 parent_test = True
766 else:
767 # If not the main test, attempt to parse a test header containing
768 # the KTAP version line and/or subtest header line
769 ktap_line = parse_ktap_header(lines, test, printer)
770 subtest_line = parse_test_header(lines, test)
771 parent_test = (ktap_line or subtest_line)
772 if parent_test:
773 # If KTAP version line and/or subtest header is found, attempt
774 # to parse test plan and print test header
775 test.log.extend(parse_diagnostic(lines))
776 parse_test_plan(lines, test)
777 print_test_header(test, printer)
778 expected_count = test.expected_count
779 subtests = []
780 test_num = 1
781 while parent_test and (expected_count is None or test_num <= expected_count):
782 # Loop to parse any subtests.
783 # Break after parsing expected number of tests or
784 # if expected number of tests is unknown break when test
785 # result line with matching name to subtest header is found
786 # or no more lines in stream.
787 sub_log = parse_diagnostic(lines)
788 sub_test = Test()
789 if not lines or (peek_test_name_match(lines, test) and
790 is_subtest):
791 if expected_count and test_num <= expected_count:
792 # If parser reaches end of test before
793 # parsing expected number of subtests, print
794 # crashed subtest and record error
795 test.add_error(printer, 'missing expected subtest!')
796 sub_test.log.extend(sub_log)
797 test.counts.add_status(
798 TestStatus.TEST_CRASHED)
799 print_test_result(sub_test, printer)
800 else:
801 test.log.extend(sub_log)
802 break
803 else:
804 sub_test = parse_test(lines, test_num, sub_log, True, printer)
805 subtests.append(sub_test)
806 test_num += 1
807 test.subtests = subtests
808 if is_subtest:
809 # If not main test, look for test result line
810 test.log.extend(parse_diagnostic(lines))
811 if test.name != "" and not peek_test_name_match(lines, test):
812 test.add_error(printer, 'missing subtest result line!')
813 else:
814 parse_test_result(lines, test, expected_num, printer)
815
816 # Check for there being no subtests within parent test
817 if parent_test and len(subtests) == 0:
818 # Don't override a bad status if this test had one reported.
819 # Assumption: no subtests means CRASHED is from Test.__init__()
820 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
821 print_log(test.log, printer)
822 test.status = TestStatus.NO_TESTS
823 test.add_error(printer, '0 tests run!')
824
825 # Add statuses to TestCounts attribute in Test object
826 bubble_up_test_results(test)
827 if parent_test and is_subtest:
828 # If test has subtests and is not the main test object, print
829 # footer.
830 print_test_footer(test, printer)
831 elif is_subtest:
832 print_test_result(test, printer)
833 return test
834
835def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
836 """
837 Using kernel output, extract KTAP lines, parse the lines for test
838 results and print condensed test results and summary line.
839
840 Parameters:
841 kernel_output - Iterable object contains lines of kernel output
842 printer - Printer object to output results
843
844 Return:
845 Test - the main test object with all subtests.
846 """
847 printer.print_with_timestamp(DIVIDER)
848 lines = extract_tap_lines(kernel_output)
849 test = Test()
850 if not lines:
851 test.name = '<missing>'
852 test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?')
853 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
854 else:
855 test = parse_test(lines, 0, [], False, printer)
856 if test.status != TestStatus.NO_TESTS:
857 test.status = test.counts.get_status()
858 printer.print_with_timestamp(DIVIDER)
859 return test
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import sys
16import textwrap
17
18from enum import Enum, auto
19from typing import Iterable, Iterator, List, Optional, Tuple
20
21from kunit_printer import stdout
22
23class Test:
24 """
25 A class to represent a test parsed from KTAP results. All KTAP
26 results within a test log are stored in a main Test object as
27 subtests.
28
29 Attributes:
30 status : TestStatus - status of the test
31 name : str - name of the test
32 expected_count : int - expected number of subtests (0 if single
33 test case and None if unknown expected number of subtests)
34 subtests : List[Test] - list of subtests
35 log : List[str] - log of KTAP lines that correspond to the test
36 counts : TestCounts - counts of the test statuses and errors of
37 subtests or of the test itself if the test is a single
38 test case.
39 """
40 def __init__(self) -> None:
41 """Creates Test object with default attributes."""
42 self.status = TestStatus.TEST_CRASHED
43 self.name = ''
44 self.expected_count = 0 # type: Optional[int]
45 self.subtests = [] # type: List[Test]
46 self.log = [] # type: List[str]
47 self.counts = TestCounts()
48
49 def __str__(self) -> str:
50 """Returns string representation of a Test class object."""
51 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
52 f'{self.subtests}, {self.log}, {self.counts})')
53
54 def __repr__(self) -> str:
55 """Returns string representation of a Test class object."""
56 return str(self)
57
58 def add_error(self, error_message: str) -> None:
59 """Records an error that occurred while parsing this test."""
60 self.counts.errors += 1
61 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
62
63 def ok_status(self) -> bool:
64 """Returns true if the status was ok, i.e. passed or skipped."""
65 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
66
67class TestStatus(Enum):
68 """An enumeration class to represent the status of a test."""
69 SUCCESS = auto()
70 FAILURE = auto()
71 SKIPPED = auto()
72 TEST_CRASHED = auto()
73 NO_TESTS = auto()
74 FAILURE_TO_PARSE_TESTS = auto()
75
76@dataclass
77class TestCounts:
78 """
79 Tracks the counts of statuses of all test cases and any errors within
80 a Test.
81 """
82 passed: int = 0
83 failed: int = 0
84 crashed: int = 0
85 skipped: int = 0
86 errors: int = 0
87
88 def __str__(self) -> str:
89 """Returns the string representation of a TestCounts object."""
90 statuses = [('passed', self.passed), ('failed', self.failed),
91 ('crashed', self.crashed), ('skipped', self.skipped),
92 ('errors', self.errors)]
93 return f'Ran {self.total()} tests: ' + \
94 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
95
96 def total(self) -> int:
97 """Returns the total number of test cases within a test
98 object, where a test case is a test with no subtests.
99 """
100 return (self.passed + self.failed + self.crashed +
101 self.skipped)
102
103 def add_subtest_counts(self, counts: TestCounts) -> None:
104 """
105 Adds the counts of another TestCounts object to the current
106 TestCounts object. Used to add the counts of a subtest to the
107 parent test.
108
109 Parameters:
110 counts - a different TestCounts object whose counts
111 will be added to the counts of the TestCounts object
112 """
113 self.passed += counts.passed
114 self.failed += counts.failed
115 self.crashed += counts.crashed
116 self.skipped += counts.skipped
117 self.errors += counts.errors
118
119 def get_status(self) -> TestStatus:
120 """Returns the aggregated status of a Test using test
121 counts.
122 """
123 if self.total() == 0:
124 return TestStatus.NO_TESTS
125 if self.crashed:
126 # Crashes should take priority.
127 return TestStatus.TEST_CRASHED
128 if self.failed:
129 return TestStatus.FAILURE
130 if self.passed:
131 # No failures or crashes, looks good!
132 return TestStatus.SUCCESS
133 # We have only skipped tests.
134 return TestStatus.SKIPPED
135
136 def add_status(self, status: TestStatus) -> None:
137 """Increments the count for `status`."""
138 if status == TestStatus.SUCCESS:
139 self.passed += 1
140 elif status == TestStatus.FAILURE:
141 self.failed += 1
142 elif status == TestStatus.SKIPPED:
143 self.skipped += 1
144 elif status != TestStatus.NO_TESTS:
145 self.crashed += 1
146
147class LineStream:
148 """
149 A class to represent the lines of kernel output.
150 Provides a lazy peek()/pop() interface over an iterator of
151 (line#, text).
152 """
153 _lines: Iterator[Tuple[int, str]]
154 _next: Tuple[int, str]
155 _need_next: bool
156 _done: bool
157
158 def __init__(self, lines: Iterator[Tuple[int, str]]):
159 """Creates a new LineStream that wraps the given iterator."""
160 self._lines = lines
161 self._done = False
162 self._need_next = True
163 self._next = (0, '')
164
165 def _get_next(self) -> None:
166 """Advances the LineSteam to the next line, if necessary."""
167 if not self._need_next:
168 return
169 try:
170 self._next = next(self._lines)
171 except StopIteration:
172 self._done = True
173 finally:
174 self._need_next = False
175
176 def peek(self) -> str:
177 """Returns the current line, without advancing the LineStream.
178 """
179 self._get_next()
180 return self._next[1]
181
182 def pop(self) -> str:
183 """Returns the current line and advances the LineStream to
184 the next line.
185 """
186 s = self.peek()
187 if self._done:
188 raise ValueError(f'LineStream: going past EOF, last line was {s}')
189 self._need_next = True
190 return s
191
192 def __bool__(self) -> bool:
193 """Returns True if stream has more lines."""
194 self._get_next()
195 return not self._done
196
197 # Only used by kunit_tool_test.py.
198 def __iter__(self) -> Iterator[str]:
199 """Empties all lines stored in LineStream object into
200 Iterator object and returns the Iterator object.
201 """
202 while bool(self):
203 yield self.pop()
204
205 def line_number(self) -> int:
206 """Returns the line number of the current line."""
207 self._get_next()
208 return self._next[0]
209
210# Parsing helper methods:
211
212KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
213TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
214KTAP_END = re.compile(r'\s*(List of all partitions:|'
215 'Kernel panic - not syncing: VFS:|reboot: System halted)')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
221 line_num = 0
222 started = False
223 for line in kernel_output:
224 line_num += 1
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
229 prefix_len = len(
230 line.split('KTAP version')[0])
231 started = True
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
237 started = True
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
241 break
242 elif started:
243 # remove the prefix, if any.
244 line = line[prefix_len:]
245 yield line_num, line
246 return LineStream(lines=isolate_ktap_output(kernel_output))
247
248KTAP_VERSIONS = [1]
249TAP_VERSIONS = [13, 14]
250
251def check_version(version_num: int, accepted_versions: List[int],
252 version_type: str, test: Test) -> None:
253 """
254 Adds error to test object if version number is too high or too
255 low.
256
257 Parameters:
258 version_num - The inputted version number from the parsed KTAP or TAP
259 header line
260 accepted_version - List of accepted KTAP or TAP versions
261 version_type - 'KTAP' or 'TAP' depending on the type of
262 version line.
263 test - Test object for current test being parsed
264 """
265 if version_num < min(accepted_versions):
266 test.add_error(f'{version_type} version lower than expected!')
267 elif version_num > max(accepted_versions):
268 test.add_error(f'{version_type} version higer than expected!')
269
270def parse_ktap_header(lines: LineStream, test: Test) -> bool:
271 """
272 Parses KTAP/TAP header line and checks version number.
273 Returns False if fails to parse KTAP/TAP header line.
274
275 Accepted formats:
276 - 'KTAP version [version number]'
277 - 'TAP version [version number]'
278
279 Parameters:
280 lines - LineStream of KTAP output to parse
281 test - Test object for current test being parsed
282
283 Return:
284 True if successfully parsed KTAP/TAP header line
285 """
286 ktap_match = KTAP_START.match(lines.peek())
287 tap_match = TAP_START.match(lines.peek())
288 if ktap_match:
289 version_num = int(ktap_match.group(1))
290 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
291 elif tap_match:
292 version_num = int(tap_match.group(1))
293 check_version(version_num, TAP_VERSIONS, 'TAP', test)
294 else:
295 return False
296 lines.pop()
297 return True
298
299TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
300
301def parse_test_header(lines: LineStream, test: Test) -> bool:
302 """
303 Parses test header and stores test name in test object.
304 Returns False if fails to parse test header line.
305
306 Accepted format:
307 - '# Subtest: [test name]'
308
309 Parameters:
310 lines - LineStream of KTAP output to parse
311 test - Test object for current test being parsed
312
313 Return:
314 True if successfully parsed test header line
315 """
316 match = TEST_HEADER.match(lines.peek())
317 if not match:
318 return False
319 test.name = match.group(1)
320 lines.pop()
321 return True
322
323TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
324
325def parse_test_plan(lines: LineStream, test: Test) -> bool:
326 """
327 Parses test plan line and stores the expected number of subtests in
328 test object. Reports an error if expected count is 0.
329 Returns False and sets expected_count to None if there is no valid test
330 plan.
331
332 Accepted format:
333 - '1..[number of subtests]'
334
335 Parameters:
336 lines - LineStream of KTAP output to parse
337 test - Test object for current test being parsed
338
339 Return:
340 True if successfully parsed test plan line
341 """
342 match = TEST_PLAN.match(lines.peek())
343 if not match:
344 test.expected_count = None
345 return False
346 expected_count = int(match.group(1))
347 test.expected_count = expected_count
348 lines.pop()
349 return True
350
351TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
352
353TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
354
355def peek_test_name_match(lines: LineStream, test: Test) -> bool:
356 """
357 Matches current line with the format of a test result line and checks
358 if the name matches the name of the current test.
359 Returns False if fails to match format or name.
360
361 Accepted format:
362 - '[ok|not ok] [test number] [-] [test name] [optional skip
363 directive]'
364
365 Parameters:
366 lines - LineStream of KTAP output to parse
367 test - Test object for current test being parsed
368
369 Return:
370 True if matched a test result line and the name matching the
371 expected test name
372 """
373 line = lines.peek()
374 match = TEST_RESULT.match(line)
375 if not match:
376 return False
377 name = match.group(4)
378 return name == test.name
379
380def parse_test_result(lines: LineStream, test: Test,
381 expected_num: int) -> bool:
382 """
383 Parses test result line and stores the status and name in the test
384 object. Reports an error if the test number does not match expected
385 test number.
386 Returns False if fails to parse test result line.
387
388 Note that the SKIP directive is the only direction that causes a
389 change in status.
390
391 Accepted format:
392 - '[ok|not ok] [test number] [-] [test name] [optional skip
393 directive]'
394
395 Parameters:
396 lines - LineStream of KTAP output to parse
397 test - Test object for current test being parsed
398 expected_num - expected test number for current test
399
400 Return:
401 True if successfully parsed a test result line.
402 """
403 line = lines.peek()
404 match = TEST_RESULT.match(line)
405 skip_match = TEST_RESULT_SKIP.match(line)
406
407 # Check if line matches test result line format
408 if not match:
409 return False
410 lines.pop()
411
412 # Set name of test object
413 if skip_match:
414 test.name = skip_match.group(4)
415 else:
416 test.name = match.group(4)
417
418 # Check test num
419 num = int(match.group(2))
420 if num != expected_num:
421 test.add_error(f'Expected test number {expected_num} but found {num}')
422
423 # Set status of test object
424 status = match.group(1)
425 if skip_match:
426 test.status = TestStatus.SKIPPED
427 elif status == 'ok':
428 test.status = TestStatus.SUCCESS
429 else:
430 test.status = TestStatus.FAILURE
431 return True
432
433def parse_diagnostic(lines: LineStream) -> List[str]:
434 """
435 Parse lines that do not match the format of a test result line or
436 test header line and returns them in list.
437
438 Line formats that are not parsed:
439 - '# Subtest: [test name]'
440 - '[ok|not ok] [test number] [-] [test name] [optional skip
441 directive]'
442 - 'KTAP version [version number]'
443
444 Parameters:
445 lines - LineStream of KTAP output to parse
446
447 Return:
448 Log of diagnostic lines
449 """
450 log = [] # type: List[str]
451 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START]
452 while lines and not any(re.match(lines.peek())
453 for re in non_diagnostic_lines):
454 log.append(lines.pop())
455 return log
456
457
458# Printing helper methods:
459
460DIVIDER = '=' * 60
461
462def format_test_divider(message: str, len_message: int) -> str:
463 """
464 Returns string with message centered in fixed width divider.
465
466 Example:
467 '===================== message example ====================='
468
469 Parameters:
470 message - message to be centered in divider line
471 len_message - length of the message to be printed such that
472 any characters of the color codes are not counted
473
474 Return:
475 String containing message centered in fixed width divider
476 """
477 default_count = 3 # default number of dashes
478 len_1 = default_count
479 len_2 = default_count
480 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
481 if difference > 0:
482 # calculate number of dashes for each side of the divider
483 len_1 = int(difference / 2)
484 len_2 = difference - len_1
485 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
486
487def print_test_header(test: Test) -> None:
488 """
489 Prints test header with test name and optionally the expected number
490 of subtests.
491
492 Example:
493 '=================== example (2 subtests) ==================='
494
495 Parameters:
496 test - Test object representing current test being printed
497 """
498 message = test.name
499 if message != "":
500 # Add a leading space before the subtest counts only if a test name
501 # is provided using a "# Subtest" header line.
502 message += " "
503 if test.expected_count:
504 if test.expected_count == 1:
505 message += '(1 subtest)'
506 else:
507 message += f'({test.expected_count} subtests)'
508 stdout.print_with_timestamp(format_test_divider(message, len(message)))
509
510def print_log(log: Iterable[str]) -> None:
511 """Prints all strings in saved log for test in yellow."""
512 formatted = textwrap.dedent('\n'.join(log))
513 for line in formatted.splitlines():
514 stdout.print_with_timestamp(stdout.yellow(line))
515
516def format_test_result(test: Test) -> str:
517 """
518 Returns string with formatted test result with colored status and test
519 name.
520
521 Example:
522 '[PASSED] example'
523
524 Parameters:
525 test - Test object representing current test being printed
526
527 Return:
528 String containing formatted test result
529 """
530 if test.status == TestStatus.SUCCESS:
531 return stdout.green('[PASSED] ') + test.name
532 if test.status == TestStatus.SKIPPED:
533 return stdout.yellow('[SKIPPED] ') + test.name
534 if test.status == TestStatus.NO_TESTS:
535 return stdout.yellow('[NO TESTS RUN] ') + test.name
536 if test.status == TestStatus.TEST_CRASHED:
537 print_log(test.log)
538 return stdout.red('[CRASHED] ') + test.name
539 print_log(test.log)
540 return stdout.red('[FAILED] ') + test.name
541
542def print_test_result(test: Test) -> None:
543 """
544 Prints result line with status of test.
545
546 Example:
547 '[PASSED] example'
548
549 Parameters:
550 test - Test object representing current test being printed
551 """
552 stdout.print_with_timestamp(format_test_result(test))
553
554def print_test_footer(test: Test) -> None:
555 """
556 Prints test footer with status of test.
557
558 Example:
559 '===================== [PASSED] example ====================='
560
561 Parameters:
562 test - Test object representing current test being printed
563 """
564 message = format_test_result(test)
565 stdout.print_with_timestamp(format_test_divider(message,
566 len(message) - stdout.color_len()))
567
568
569
570def _summarize_failed_tests(test: Test) -> str:
571 """Tries to summarize all the failing subtests in `test`."""
572
573 def failed_names(test: Test, parent_name: str) -> List[str]:
574 # Note: we use 'main' internally for the top-level test.
575 if not parent_name or parent_name == 'main':
576 full_name = test.name
577 else:
578 full_name = parent_name + '.' + test.name
579
580 if not test.subtests: # this is a leaf node
581 return [full_name]
582
583 # If all the children failed, just say this subtest failed.
584 # Don't summarize it down "the top-level test failed", though.
585 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
586 if parent_name and len(failed_subtests) == len(test.subtests):
587 return [full_name]
588
589 all_failures = [] # type: List[str]
590 for t in failed_subtests:
591 all_failures.extend(failed_names(t, full_name))
592 return all_failures
593
594 failures = failed_names(test, '')
595 # If there are too many failures, printing them out will just be noisy.
596 if len(failures) > 10: # this is an arbitrary limit
597 return ''
598
599 return 'Failures: ' + ', '.join(failures)
600
601
602def print_summary_line(test: Test) -> None:
603 """
604 Prints summary line of test object. Color of line is dependent on
605 status of test. Color is green if test passes, yellow if test is
606 skipped, and red if the test fails or crashes. Summary line contains
607 counts of the statuses of the tests subtests or the test itself if it
608 has no subtests.
609
610 Example:
611 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
612 Errors: 0"
613
614 test - Test object representing current test being printed
615 """
616 if test.status == TestStatus.SUCCESS:
617 color = stdout.green
618 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
619 color = stdout.yellow
620 else:
621 color = stdout.red
622 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
623
624 # Summarize failures that might have gone off-screen since we had a lot
625 # of tests (arbitrarily defined as >=100 for now).
626 if test.ok_status() or test.counts.total() < 100:
627 return
628 summarized = _summarize_failed_tests(test)
629 if not summarized:
630 return
631 stdout.print_with_timestamp(color(summarized))
632
633# Other methods:
634
635def bubble_up_test_results(test: Test) -> None:
636 """
637 If the test has subtests, add the test counts of the subtests to the
638 test and check if any of the tests crashed and if so set the test
639 status to crashed. Otherwise if the test has no subtests add the
640 status of the test to the test counts.
641
642 Parameters:
643 test - Test object for current test being parsed
644 """
645 subtests = test.subtests
646 counts = test.counts
647 status = test.status
648 for t in subtests:
649 counts.add_subtest_counts(t.counts)
650 if counts.total() == 0:
651 counts.add_status(status)
652 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
653 test.status = TestStatus.TEST_CRASHED
654
655def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
656 """
657 Finds next test to parse in LineStream, creates new Test object,
658 parses any subtests of the test, populates Test object with all
659 information (status, name) about the test and the Test objects for
660 any subtests, and then returns the Test object. The method accepts
661 three formats of tests:
662
663 Accepted test formats:
664
665 - Main KTAP/TAP header
666
667 Example:
668
669 KTAP version 1
670 1..4
671 [subtests]
672
673 - Subtest header (must include either the KTAP version line or
674 "# Subtest" header line)
675
676 Example (preferred format with both KTAP version line and
677 "# Subtest" line):
678
679 KTAP version 1
680 # Subtest: name
681 1..3
682 [subtests]
683 ok 1 name
684
685 Example (only "# Subtest" line):
686
687 # Subtest: name
688 1..3
689 [subtests]
690 ok 1 name
691
692 Example (only KTAP version line, compliant with KTAP v1 spec):
693
694 KTAP version 1
695 1..3
696 [subtests]
697 ok 1 name
698
699 - Test result line
700
701 Example:
702
703 ok 1 - test
704
705 Parameters:
706 lines - LineStream of KTAP output to parse
707 expected_num - expected test number for test to be parsed
708 log - list of strings containing any preceding diagnostic lines
709 corresponding to the current test
710 is_subtest - boolean indicating whether test is a subtest
711
712 Return:
713 Test object populated with characteristics and any subtests
714 """
715 test = Test()
716 test.log.extend(log)
717 if not is_subtest:
718 # If parsing the main/top-level test, parse KTAP version line and
719 # test plan
720 test.name = "main"
721 ktap_line = parse_ktap_header(lines, test)
722 parse_test_plan(lines, test)
723 parent_test = True
724 else:
725 # If not the main test, attempt to parse a test header containing
726 # the KTAP version line and/or subtest header line
727 ktap_line = parse_ktap_header(lines, test)
728 subtest_line = parse_test_header(lines, test)
729 parent_test = (ktap_line or subtest_line)
730 if parent_test:
731 # If KTAP version line and/or subtest header is found, attempt
732 # to parse test plan and print test header
733 parse_test_plan(lines, test)
734 print_test_header(test)
735 expected_count = test.expected_count
736 subtests = []
737 test_num = 1
738 while parent_test and (expected_count is None or test_num <= expected_count):
739 # Loop to parse any subtests.
740 # Break after parsing expected number of tests or
741 # if expected number of tests is unknown break when test
742 # result line with matching name to subtest header is found
743 # or no more lines in stream.
744 sub_log = parse_diagnostic(lines)
745 sub_test = Test()
746 if not lines or (peek_test_name_match(lines, test) and
747 is_subtest):
748 if expected_count and test_num <= expected_count:
749 # If parser reaches end of test before
750 # parsing expected number of subtests, print
751 # crashed subtest and record error
752 test.add_error('missing expected subtest!')
753 sub_test.log.extend(sub_log)
754 test.counts.add_status(
755 TestStatus.TEST_CRASHED)
756 print_test_result(sub_test)
757 else:
758 test.log.extend(sub_log)
759 break
760 else:
761 sub_test = parse_test(lines, test_num, sub_log, True)
762 subtests.append(sub_test)
763 test_num += 1
764 test.subtests = subtests
765 if is_subtest:
766 # If not main test, look for test result line
767 test.log.extend(parse_diagnostic(lines))
768 if test.name != "" and not peek_test_name_match(lines, test):
769 test.add_error('missing subtest result line!')
770 else:
771 parse_test_result(lines, test, expected_num)
772
773 # Check for there being no subtests within parent test
774 if parent_test and len(subtests) == 0:
775 # Don't override a bad status if this test had one reported.
776 # Assumption: no subtests means CRASHED is from Test.__init__()
777 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
778 test.status = TestStatus.NO_TESTS
779 test.add_error('0 tests run!')
780
781 # Add statuses to TestCounts attribute in Test object
782 bubble_up_test_results(test)
783 if parent_test and is_subtest:
784 # If test has subtests and is not the main test object, print
785 # footer.
786 print_test_footer(test)
787 elif is_subtest:
788 print_test_result(test)
789 return test
790
791def parse_run_tests(kernel_output: Iterable[str]) -> Test:
792 """
793 Using kernel output, extract KTAP lines, parse the lines for test
794 results and print condensed test results and summary line.
795
796 Parameters:
797 kernel_output - Iterable object contains lines of kernel output
798
799 Return:
800 Test - the main test object with all subtests.
801 """
802 stdout.print_with_timestamp(DIVIDER)
803 lines = extract_tap_lines(kernel_output)
804 test = Test()
805 if not lines:
806 test.name = '<missing>'
807 test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
808 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
809 else:
810 test = parse_test(lines, 0, [], False)
811 if test.status != TestStatus.NO_TESTS:
812 test.status = test.counts.get_status()
813 stdout.print_with_timestamp(DIVIDER)
814 print_summary_line(test)
815 return test