Loading...
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import textwrap
16
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20from kunit_printer import Printer, stdout
21
22class Test:
23 """
24 A class to represent a test parsed from KTAP results. All KTAP
25 results within a test log are stored in a main Test object as
26 subtests.
27
28 Attributes:
29 status : TestStatus - status of the test
30 name : str - name of the test
31 expected_count : int - expected number of subtests (0 if single
32 test case and None if unknown expected number of subtests)
33 subtests : List[Test] - list of subtests
34 log : List[str] - log of KTAP lines that correspond to the test
35 counts : TestCounts - counts of the test statuses and errors of
36 subtests or of the test itself if the test is a single
37 test case.
38 """
39 def __init__(self) -> None:
40 """Creates Test object with default attributes."""
41 self.status = TestStatus.TEST_CRASHED
42 self.name = ''
43 self.expected_count = 0 # type: Optional[int]
44 self.subtests = [] # type: List[Test]
45 self.log = [] # type: List[str]
46 self.counts = TestCounts()
47
48 def __str__(self) -> str:
49 """Returns string representation of a Test class object."""
50 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51 f'{self.subtests}, {self.log}, {self.counts})')
52
53 def __repr__(self) -> str:
54 """Returns string representation of a Test class object."""
55 return str(self)
56
57 def add_error(self, printer: Printer, error_message: str) -> None:
58 """Records an error that occurred while parsing this test."""
59 self.counts.errors += 1
60 printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62 def ok_status(self) -> bool:
63 """Returns true if the status was ok, i.e. passed or skipped."""
64 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66class TestStatus(Enum):
67 """An enumeration class to represent the status of a test."""
68 SUCCESS = auto()
69 FAILURE = auto()
70 SKIPPED = auto()
71 TEST_CRASHED = auto()
72 NO_TESTS = auto()
73 FAILURE_TO_PARSE_TESTS = auto()
74
75@dataclass
76class TestCounts:
77 """
78 Tracks the counts of statuses of all test cases and any errors within
79 a Test.
80 """
81 passed: int = 0
82 failed: int = 0
83 crashed: int = 0
84 skipped: int = 0
85 errors: int = 0
86
87 def __str__(self) -> str:
88 """Returns the string representation of a TestCounts object."""
89 statuses = [('passed', self.passed), ('failed', self.failed),
90 ('crashed', self.crashed), ('skipped', self.skipped),
91 ('errors', self.errors)]
92 return f'Ran {self.total()} tests: ' + \
93 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95 def total(self) -> int:
96 """Returns the total number of test cases within a test
97 object, where a test case is a test with no subtests.
98 """
99 return (self.passed + self.failed + self.crashed +
100 self.skipped)
101
102 def add_subtest_counts(self, counts: TestCounts) -> None:
103 """
104 Adds the counts of another TestCounts object to the current
105 TestCounts object. Used to add the counts of a subtest to the
106 parent test.
107
108 Parameters:
109 counts - a different TestCounts object whose counts
110 will be added to the counts of the TestCounts object
111 """
112 self.passed += counts.passed
113 self.failed += counts.failed
114 self.crashed += counts.crashed
115 self.skipped += counts.skipped
116 self.errors += counts.errors
117
118 def get_status(self) -> TestStatus:
119 """Returns the aggregated status of a Test using test
120 counts.
121 """
122 if self.total() == 0:
123 return TestStatus.NO_TESTS
124 if self.crashed:
125 # Crashes should take priority.
126 return TestStatus.TEST_CRASHED
127 if self.failed:
128 return TestStatus.FAILURE
129 if self.passed:
130 # No failures or crashes, looks good!
131 return TestStatus.SUCCESS
132 # We have only skipped tests.
133 return TestStatus.SKIPPED
134
135 def add_status(self, status: TestStatus) -> None:
136 """Increments the count for `status`."""
137 if status == TestStatus.SUCCESS:
138 self.passed += 1
139 elif status == TestStatus.FAILURE:
140 self.failed += 1
141 elif status == TestStatus.SKIPPED:
142 self.skipped += 1
143 elif status != TestStatus.NO_TESTS:
144 self.crashed += 1
145
146class LineStream:
147 """
148 A class to represent the lines of kernel output.
149 Provides a lazy peek()/pop() interface over an iterator of
150 (line#, text).
151 """
152 _lines: Iterator[Tuple[int, str]]
153 _next: Tuple[int, str]
154 _need_next: bool
155 _done: bool
156
157 def __init__(self, lines: Iterator[Tuple[int, str]]):
158 """Creates a new LineStream that wraps the given iterator."""
159 self._lines = lines
160 self._done = False
161 self._need_next = True
162 self._next = (0, '')
163
164 def _get_next(self) -> None:
165 """Advances the LineSteam to the next line, if necessary."""
166 if not self._need_next:
167 return
168 try:
169 self._next = next(self._lines)
170 except StopIteration:
171 self._done = True
172 finally:
173 self._need_next = False
174
175 def peek(self) -> str:
176 """Returns the current line, without advancing the LineStream.
177 """
178 self._get_next()
179 return self._next[1]
180
181 def pop(self) -> str:
182 """Returns the current line and advances the LineStream to
183 the next line.
184 """
185 s = self.peek()
186 if self._done:
187 raise ValueError(f'LineStream: going past EOF, last line was {s}')
188 self._need_next = True
189 return s
190
191 def __bool__(self) -> bool:
192 """Returns True if stream has more lines."""
193 self._get_next()
194 return not self._done
195
196 # Only used by kunit_tool_test.py.
197 def __iter__(self) -> Iterator[str]:
198 """Empties all lines stored in LineStream object into
199 Iterator object and returns the Iterator object.
200 """
201 while bool(self):
202 yield self.pop()
203
204 def line_number(self) -> int:
205 """Returns the line number of the current line."""
206 self._get_next()
207 return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214 'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
221 line_num = 0
222 started = False
223 for line in kernel_output:
224 line_num += 1
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
229 prefix_len = len(
230 line.split('KTAP version')[0])
231 started = True
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
237 started = True
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
241 break
242 elif started:
243 # remove the prefix, if any.
244 line = line[prefix_len:]
245 yield line_num, line
246 elif EXECUTOR_ERROR.search(line):
247 yield line_num, line
248 return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254 version_type: str, test: Test, printer: Printer) -> None:
255 """
256 Adds error to test object if version number is too high or too
257 low.
258
259 Parameters:
260 version_num - The inputted version number from the parsed KTAP or TAP
261 header line
262 accepted_version - List of accepted KTAP or TAP versions
263 version_type - 'KTAP' or 'TAP' depending on the type of
264 version line.
265 test - Test object for current test being parsed
266 printer - Printer object to output error
267 """
268 if version_num < min(accepted_versions):
269 test.add_error(printer, f'{version_type} version lower than expected!')
270 elif version_num > max(accepted_versions):
271 test.add_error(printer, f'{version_type} version higer than expected!')
272
273def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
274 """
275 Parses KTAP/TAP header line and checks version number.
276 Returns False if fails to parse KTAP/TAP header line.
277
278 Accepted formats:
279 - 'KTAP version [version number]'
280 - 'TAP version [version number]'
281
282 Parameters:
283 lines - LineStream of KTAP output to parse
284 test - Test object for current test being parsed
285 printer - Printer object to output results
286
287 Return:
288 True if successfully parsed KTAP/TAP header line
289 """
290 ktap_match = KTAP_START.match(lines.peek())
291 tap_match = TAP_START.match(lines.peek())
292 if ktap_match:
293 version_num = int(ktap_match.group(1))
294 check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
295 elif tap_match:
296 version_num = int(tap_match.group(1))
297 check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
298 else:
299 return False
300 lines.pop()
301 return True
302
303TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306 """
307 Parses test header and stores test name in test object.
308 Returns False if fails to parse test header line.
309
310 Accepted format:
311 - '# Subtest: [test name]'
312
313 Parameters:
314 lines - LineStream of KTAP output to parse
315 test - Test object for current test being parsed
316
317 Return:
318 True if successfully parsed test header line
319 """
320 match = TEST_HEADER.match(lines.peek())
321 if not match:
322 return False
323 test.name = match.group(1)
324 lines.pop()
325 return True
326
327TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330 """
331 Parses test plan line and stores the expected number of subtests in
332 test object. Reports an error if expected count is 0.
333 Returns False and sets expected_count to None if there is no valid test
334 plan.
335
336 Accepted format:
337 - '1..[number of subtests]'
338
339 Parameters:
340 lines - LineStream of KTAP output to parse
341 test - Test object for current test being parsed
342
343 Return:
344 True if successfully parsed test plan line
345 """
346 match = TEST_PLAN.match(lines.peek())
347 if not match:
348 test.expected_count = None
349 return False
350 expected_count = int(match.group(1))
351 test.expected_count = expected_count
352 lines.pop()
353 return True
354
355TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360 """
361 Matches current line with the format of a test result line and checks
362 if the name matches the name of the current test.
363 Returns False if fails to match format or name.
364
365 Accepted format:
366 - '[ok|not ok] [test number] [-] [test name] [optional skip
367 directive]'
368
369 Parameters:
370 lines - LineStream of KTAP output to parse
371 test - Test object for current test being parsed
372
373 Return:
374 True if matched a test result line and the name matching the
375 expected test name
376 """
377 line = lines.peek()
378 match = TEST_RESULT.match(line)
379 if not match:
380 return False
381 name = match.group(4)
382 return name == test.name
383
384def parse_test_result(lines: LineStream, test: Test,
385 expected_num: int, printer: Printer) -> bool:
386 """
387 Parses test result line and stores the status and name in the test
388 object. Reports an error if the test number does not match expected
389 test number.
390 Returns False if fails to parse test result line.
391
392 Note that the SKIP directive is the only direction that causes a
393 change in status.
394
395 Accepted format:
396 - '[ok|not ok] [test number] [-] [test name] [optional skip
397 directive]'
398
399 Parameters:
400 lines - LineStream of KTAP output to parse
401 test - Test object for current test being parsed
402 expected_num - expected test number for current test
403 printer - Printer object to output results
404
405 Return:
406 True if successfully parsed a test result line.
407 """
408 line = lines.peek()
409 match = TEST_RESULT.match(line)
410 skip_match = TEST_RESULT_SKIP.match(line)
411
412 # Check if line matches test result line format
413 if not match:
414 return False
415 lines.pop()
416
417 # Set name of test object
418 if skip_match:
419 test.name = skip_match.group(4)
420 else:
421 test.name = match.group(4)
422
423 # Check test num
424 num = int(match.group(2))
425 if num != expected_num:
426 test.add_error(printer, f'Expected test number {expected_num} but found {num}')
427
428 # Set status of test object
429 status = match.group(1)
430 if skip_match:
431 test.status = TestStatus.SKIPPED
432 elif status == 'ok':
433 test.status = TestStatus.SUCCESS
434 else:
435 test.status = TestStatus.FAILURE
436 return True
437
438def parse_diagnostic(lines: LineStream) -> List[str]:
439 """
440 Parse lines that do not match the format of a test result line or
441 test header line and returns them in list.
442
443 Line formats that are not parsed:
444 - '# Subtest: [test name]'
445 - '[ok|not ok] [test number] [-] [test name] [optional skip
446 directive]'
447 - 'KTAP version [version number]'
448
449 Parameters:
450 lines - LineStream of KTAP output to parse
451
452 Return:
453 Log of diagnostic lines
454 """
455 log = [] # type: List[str]
456 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
457 while lines and not any(re.match(lines.peek())
458 for re in non_diagnostic_lines):
459 log.append(lines.pop())
460 return log
461
462
463# Printing helper methods:
464
465DIVIDER = '=' * 60
466
467def format_test_divider(message: str, len_message: int) -> str:
468 """
469 Returns string with message centered in fixed width divider.
470
471 Example:
472 '===================== message example ====================='
473
474 Parameters:
475 message - message to be centered in divider line
476 len_message - length of the message to be printed such that
477 any characters of the color codes are not counted
478
479 Return:
480 String containing message centered in fixed width divider
481 """
482 default_count = 3 # default number of dashes
483 len_1 = default_count
484 len_2 = default_count
485 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
486 if difference > 0:
487 # calculate number of dashes for each side of the divider
488 len_1 = int(difference / 2)
489 len_2 = difference - len_1
490 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
491
492def print_test_header(test: Test, printer: Printer) -> None:
493 """
494 Prints test header with test name and optionally the expected number
495 of subtests.
496
497 Example:
498 '=================== example (2 subtests) ==================='
499
500 Parameters:
501 test - Test object representing current test being printed
502 printer - Printer object to output results
503 """
504 message = test.name
505 if message != "":
506 # Add a leading space before the subtest counts only if a test name
507 # is provided using a "# Subtest" header line.
508 message += " "
509 if test.expected_count:
510 if test.expected_count == 1:
511 message += '(1 subtest)'
512 else:
513 message += f'({test.expected_count} subtests)'
514 printer.print_with_timestamp(format_test_divider(message, len(message)))
515
516def print_log(log: Iterable[str], printer: Printer) -> None:
517 """Prints all strings in saved log for test in yellow."""
518 formatted = textwrap.dedent('\n'.join(log))
519 for line in formatted.splitlines():
520 printer.print_with_timestamp(printer.yellow(line))
521
522def format_test_result(test: Test, printer: Printer) -> str:
523 """
524 Returns string with formatted test result with colored status and test
525 name.
526
527 Example:
528 '[PASSED] example'
529
530 Parameters:
531 test - Test object representing current test being printed
532 printer - Printer object to output results
533
534 Return:
535 String containing formatted test result
536 """
537 if test.status == TestStatus.SUCCESS:
538 return printer.green('[PASSED] ') + test.name
539 if test.status == TestStatus.SKIPPED:
540 return printer.yellow('[SKIPPED] ') + test.name
541 if test.status == TestStatus.NO_TESTS:
542 return printer.yellow('[NO TESTS RUN] ') + test.name
543 if test.status == TestStatus.TEST_CRASHED:
544 print_log(test.log, printer)
545 return stdout.red('[CRASHED] ') + test.name
546 print_log(test.log, printer)
547 return printer.red('[FAILED] ') + test.name
548
549def print_test_result(test: Test, printer: Printer) -> None:
550 """
551 Prints result line with status of test.
552
553 Example:
554 '[PASSED] example'
555
556 Parameters:
557 test - Test object representing current test being printed
558 printer - Printer object
559 """
560 printer.print_with_timestamp(format_test_result(test, printer))
561
562def print_test_footer(test: Test, printer: Printer) -> None:
563 """
564 Prints test footer with status of test.
565
566 Example:
567 '===================== [PASSED] example ====================='
568
569 Parameters:
570 test - Test object representing current test being printed
571 printer - Printer object to output results
572 """
573 message = format_test_result(test, printer)
574 printer.print_with_timestamp(format_test_divider(message,
575 len(message) - printer.color_len()))
576
577def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
578 """
579 Prints Test object to given printer. For a child test, the result line is
580 printed. For a parent test, the test header, all child test results, and
581 the test footer are all printed. If failed_only is true, only failed/crashed
582 tests will be printed.
583
584 Parameters:
585 test - Test object to print
586 failed_only - True if only failed/crashed tests should be printed.
587 printer - Printer object to output results
588 """
589 if test.name == "main":
590 printer.print_with_timestamp(DIVIDER)
591 for subtest in test.subtests:
592 print_test(subtest, failed_only, printer)
593 printer.print_with_timestamp(DIVIDER)
594 elif test.subtests != []:
595 if not failed_only or not test.ok_status():
596 print_test_header(test, printer)
597 for subtest in test.subtests:
598 print_test(subtest, failed_only, printer)
599 print_test_footer(test, printer)
600 else:
601 if not failed_only or not test.ok_status():
602 print_test_result(test, printer)
603
604def _summarize_failed_tests(test: Test) -> str:
605 """Tries to summarize all the failing subtests in `test`."""
606
607 def failed_names(test: Test, parent_name: str) -> List[str]:
608 # Note: we use 'main' internally for the top-level test.
609 if not parent_name or parent_name == 'main':
610 full_name = test.name
611 else:
612 full_name = parent_name + '.' + test.name
613
614 if not test.subtests: # this is a leaf node
615 return [full_name]
616
617 # If all the children failed, just say this subtest failed.
618 # Don't summarize it down "the top-level test failed", though.
619 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
620 if parent_name and len(failed_subtests) == len(test.subtests):
621 return [full_name]
622
623 all_failures = [] # type: List[str]
624 for t in failed_subtests:
625 all_failures.extend(failed_names(t, full_name))
626 return all_failures
627
628 failures = failed_names(test, '')
629 # If there are too many failures, printing them out will just be noisy.
630 if len(failures) > 10: # this is an arbitrary limit
631 return ''
632
633 return 'Failures: ' + ', '.join(failures)
634
635
636def print_summary_line(test: Test, printer: Printer) -> None:
637 """
638 Prints summary line of test object. Color of line is dependent on
639 status of test. Color is green if test passes, yellow if test is
640 skipped, and red if the test fails or crashes. Summary line contains
641 counts of the statuses of the tests subtests or the test itself if it
642 has no subtests.
643
644 Example:
645 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
646 Errors: 0"
647
648 test - Test object representing current test being printed
649 printer - Printer object to output results
650 """
651 if test.status == TestStatus.SUCCESS:
652 color = stdout.green
653 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
654 color = stdout.yellow
655 else:
656 color = stdout.red
657 printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
658
659 # Summarize failures that might have gone off-screen since we had a lot
660 # of tests (arbitrarily defined as >=100 for now).
661 if test.ok_status() or test.counts.total() < 100:
662 return
663 summarized = _summarize_failed_tests(test)
664 if not summarized:
665 return
666 printer.print_with_timestamp(color(summarized))
667
668# Other methods:
669
670def bubble_up_test_results(test: Test) -> None:
671 """
672 If the test has subtests, add the test counts of the subtests to the
673 test and check if any of the tests crashed and if so set the test
674 status to crashed. Otherwise if the test has no subtests add the
675 status of the test to the test counts.
676
677 Parameters:
678 test - Test object for current test being parsed
679 """
680 subtests = test.subtests
681 counts = test.counts
682 status = test.status
683 for t in subtests:
684 counts.add_subtest_counts(t.counts)
685 if counts.total() == 0:
686 counts.add_status(status)
687 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
688 test.status = TestStatus.TEST_CRASHED
689
690def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
691 """
692 Finds next test to parse in LineStream, creates new Test object,
693 parses any subtests of the test, populates Test object with all
694 information (status, name) about the test and the Test objects for
695 any subtests, and then returns the Test object. The method accepts
696 three formats of tests:
697
698 Accepted test formats:
699
700 - Main KTAP/TAP header
701
702 Example:
703
704 KTAP version 1
705 1..4
706 [subtests]
707
708 - Subtest header (must include either the KTAP version line or
709 "# Subtest" header line)
710
711 Example (preferred format with both KTAP version line and
712 "# Subtest" line):
713
714 KTAP version 1
715 # Subtest: name
716 1..3
717 [subtests]
718 ok 1 name
719
720 Example (only "# Subtest" line):
721
722 # Subtest: name
723 1..3
724 [subtests]
725 ok 1 name
726
727 Example (only KTAP version line, compliant with KTAP v1 spec):
728
729 KTAP version 1
730 1..3
731 [subtests]
732 ok 1 name
733
734 - Test result line
735
736 Example:
737
738 ok 1 - test
739
740 Parameters:
741 lines - LineStream of KTAP output to parse
742 expected_num - expected test number for test to be parsed
743 log - list of strings containing any preceding diagnostic lines
744 corresponding to the current test
745 is_subtest - boolean indicating whether test is a subtest
746 printer - Printer object to output results
747
748 Return:
749 Test object populated with characteristics and any subtests
750 """
751 test = Test()
752 test.log.extend(log)
753
754 # Parse any errors prior to parsing tests
755 err_log = parse_diagnostic(lines)
756 test.log.extend(err_log)
757
758 if not is_subtest:
759 # If parsing the main/top-level test, parse KTAP version line and
760 # test plan
761 test.name = "main"
762 ktap_line = parse_ktap_header(lines, test, printer)
763 test.log.extend(parse_diagnostic(lines))
764 parse_test_plan(lines, test)
765 parent_test = True
766 else:
767 # If not the main test, attempt to parse a test header containing
768 # the KTAP version line and/or subtest header line
769 ktap_line = parse_ktap_header(lines, test, printer)
770 subtest_line = parse_test_header(lines, test)
771 parent_test = (ktap_line or subtest_line)
772 if parent_test:
773 # If KTAP version line and/or subtest header is found, attempt
774 # to parse test plan and print test header
775 test.log.extend(parse_diagnostic(lines))
776 parse_test_plan(lines, test)
777 print_test_header(test, printer)
778 expected_count = test.expected_count
779 subtests = []
780 test_num = 1
781 while parent_test and (expected_count is None or test_num <= expected_count):
782 # Loop to parse any subtests.
783 # Break after parsing expected number of tests or
784 # if expected number of tests is unknown break when test
785 # result line with matching name to subtest header is found
786 # or no more lines in stream.
787 sub_log = parse_diagnostic(lines)
788 sub_test = Test()
789 if not lines or (peek_test_name_match(lines, test) and
790 is_subtest):
791 if expected_count and test_num <= expected_count:
792 # If parser reaches end of test before
793 # parsing expected number of subtests, print
794 # crashed subtest and record error
795 test.add_error(printer, 'missing expected subtest!')
796 sub_test.log.extend(sub_log)
797 test.counts.add_status(
798 TestStatus.TEST_CRASHED)
799 print_test_result(sub_test, printer)
800 else:
801 test.log.extend(sub_log)
802 break
803 else:
804 sub_test = parse_test(lines, test_num, sub_log, True, printer)
805 subtests.append(sub_test)
806 test_num += 1
807 test.subtests = subtests
808 if is_subtest:
809 # If not main test, look for test result line
810 test.log.extend(parse_diagnostic(lines))
811 if test.name != "" and not peek_test_name_match(lines, test):
812 test.add_error(printer, 'missing subtest result line!')
813 else:
814 parse_test_result(lines, test, expected_num, printer)
815
816 # Check for there being no subtests within parent test
817 if parent_test and len(subtests) == 0:
818 # Don't override a bad status if this test had one reported.
819 # Assumption: no subtests means CRASHED is from Test.__init__()
820 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
821 print_log(test.log, printer)
822 test.status = TestStatus.NO_TESTS
823 test.add_error(printer, '0 tests run!')
824
825 # Add statuses to TestCounts attribute in Test object
826 bubble_up_test_results(test)
827 if parent_test and is_subtest:
828 # If test has subtests and is not the main test object, print
829 # footer.
830 print_test_footer(test, printer)
831 elif is_subtest:
832 print_test_result(test, printer)
833 return test
834
835def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
836 """
837 Using kernel output, extract KTAP lines, parse the lines for test
838 results and print condensed test results and summary line.
839
840 Parameters:
841 kernel_output - Iterable object contains lines of kernel output
842 printer - Printer object to output results
843
844 Return:
845 Test - the main test object with all subtests.
846 """
847 printer.print_with_timestamp(DIVIDER)
848 lines = extract_tap_lines(kernel_output)
849 test = Test()
850 if not lines:
851 test.name = '<missing>'
852 test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?')
853 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
854 else:
855 test = parse_test(lines, 0, [], False, printer)
856 if test.status != TestStatus.NO_TESTS:
857 test.status = test.counts.get_status()
858 printer.print_with_timestamp(DIVIDER)
859 return test
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import textwrap
16
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20from kunit_printer import stdout
21
22class Test:
23 """
24 A class to represent a test parsed from KTAP results. All KTAP
25 results within a test log are stored in a main Test object as
26 subtests.
27
28 Attributes:
29 status : TestStatus - status of the test
30 name : str - name of the test
31 expected_count : int - expected number of subtests (0 if single
32 test case and None if unknown expected number of subtests)
33 subtests : List[Test] - list of subtests
34 log : List[str] - log of KTAP lines that correspond to the test
35 counts : TestCounts - counts of the test statuses and errors of
36 subtests or of the test itself if the test is a single
37 test case.
38 """
39 def __init__(self) -> None:
40 """Creates Test object with default attributes."""
41 self.status = TestStatus.TEST_CRASHED
42 self.name = ''
43 self.expected_count = 0 # type: Optional[int]
44 self.subtests = [] # type: List[Test]
45 self.log = [] # type: List[str]
46 self.counts = TestCounts()
47
48 def __str__(self) -> str:
49 """Returns string representation of a Test class object."""
50 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51 f'{self.subtests}, {self.log}, {self.counts})')
52
53 def __repr__(self) -> str:
54 """Returns string representation of a Test class object."""
55 return str(self)
56
57 def add_error(self, error_message: str) -> None:
58 """Records an error that occurred while parsing this test."""
59 self.counts.errors += 1
60 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62 def ok_status(self) -> bool:
63 """Returns true if the status was ok, i.e. passed or skipped."""
64 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66class TestStatus(Enum):
67 """An enumeration class to represent the status of a test."""
68 SUCCESS = auto()
69 FAILURE = auto()
70 SKIPPED = auto()
71 TEST_CRASHED = auto()
72 NO_TESTS = auto()
73 FAILURE_TO_PARSE_TESTS = auto()
74
75@dataclass
76class TestCounts:
77 """
78 Tracks the counts of statuses of all test cases and any errors within
79 a Test.
80 """
81 passed: int = 0
82 failed: int = 0
83 crashed: int = 0
84 skipped: int = 0
85 errors: int = 0
86
87 def __str__(self) -> str:
88 """Returns the string representation of a TestCounts object."""
89 statuses = [('passed', self.passed), ('failed', self.failed),
90 ('crashed', self.crashed), ('skipped', self.skipped),
91 ('errors', self.errors)]
92 return f'Ran {self.total()} tests: ' + \
93 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95 def total(self) -> int:
96 """Returns the total number of test cases within a test
97 object, where a test case is a test with no subtests.
98 """
99 return (self.passed + self.failed + self.crashed +
100 self.skipped)
101
102 def add_subtest_counts(self, counts: TestCounts) -> None:
103 """
104 Adds the counts of another TestCounts object to the current
105 TestCounts object. Used to add the counts of a subtest to the
106 parent test.
107
108 Parameters:
109 counts - a different TestCounts object whose counts
110 will be added to the counts of the TestCounts object
111 """
112 self.passed += counts.passed
113 self.failed += counts.failed
114 self.crashed += counts.crashed
115 self.skipped += counts.skipped
116 self.errors += counts.errors
117
118 def get_status(self) -> TestStatus:
119 """Returns the aggregated status of a Test using test
120 counts.
121 """
122 if self.total() == 0:
123 return TestStatus.NO_TESTS
124 if self.crashed:
125 # Crashes should take priority.
126 return TestStatus.TEST_CRASHED
127 if self.failed:
128 return TestStatus.FAILURE
129 if self.passed:
130 # No failures or crashes, looks good!
131 return TestStatus.SUCCESS
132 # We have only skipped tests.
133 return TestStatus.SKIPPED
134
135 def add_status(self, status: TestStatus) -> None:
136 """Increments the count for `status`."""
137 if status == TestStatus.SUCCESS:
138 self.passed += 1
139 elif status == TestStatus.FAILURE:
140 self.failed += 1
141 elif status == TestStatus.SKIPPED:
142 self.skipped += 1
143 elif status != TestStatus.NO_TESTS:
144 self.crashed += 1
145
146class LineStream:
147 """
148 A class to represent the lines of kernel output.
149 Provides a lazy peek()/pop() interface over an iterator of
150 (line#, text).
151 """
152 _lines: Iterator[Tuple[int, str]]
153 _next: Tuple[int, str]
154 _need_next: bool
155 _done: bool
156
157 def __init__(self, lines: Iterator[Tuple[int, str]]):
158 """Creates a new LineStream that wraps the given iterator."""
159 self._lines = lines
160 self._done = False
161 self._need_next = True
162 self._next = (0, '')
163
164 def _get_next(self) -> None:
165 """Advances the LineSteam to the next line, if necessary."""
166 if not self._need_next:
167 return
168 try:
169 self._next = next(self._lines)
170 except StopIteration:
171 self._done = True
172 finally:
173 self._need_next = False
174
175 def peek(self) -> str:
176 """Returns the current line, without advancing the LineStream.
177 """
178 self._get_next()
179 return self._next[1]
180
181 def pop(self) -> str:
182 """Returns the current line and advances the LineStream to
183 the next line.
184 """
185 s = self.peek()
186 if self._done:
187 raise ValueError(f'LineStream: going past EOF, last line was {s}')
188 self._need_next = True
189 return s
190
191 def __bool__(self) -> bool:
192 """Returns True if stream has more lines."""
193 self._get_next()
194 return not self._done
195
196 # Only used by kunit_tool_test.py.
197 def __iter__(self) -> Iterator[str]:
198 """Empties all lines stored in LineStream object into
199 Iterator object and returns the Iterator object.
200 """
201 while bool(self):
202 yield self.pop()
203
204 def line_number(self) -> int:
205 """Returns the line number of the current line."""
206 self._get_next()
207 return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214 'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
221 line_num = 0
222 started = False
223 for line in kernel_output:
224 line_num += 1
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
229 prefix_len = len(
230 line.split('KTAP version')[0])
231 started = True
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
237 started = True
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
241 break
242 elif started:
243 # remove the prefix, if any.
244 line = line[prefix_len:]
245 yield line_num, line
246 elif EXECUTOR_ERROR.search(line):
247 yield line_num, line
248 return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254 version_type: str, test: Test) -> None:
255 """
256 Adds error to test object if version number is too high or too
257 low.
258
259 Parameters:
260 version_num - The inputted version number from the parsed KTAP or TAP
261 header line
262 accepted_version - List of accepted KTAP or TAP versions
263 version_type - 'KTAP' or 'TAP' depending on the type of
264 version line.
265 test - Test object for current test being parsed
266 """
267 if version_num < min(accepted_versions):
268 test.add_error(f'{version_type} version lower than expected!')
269 elif version_num > max(accepted_versions):
270 test.add_error(f'{version_type} version higer than expected!')
271
272def parse_ktap_header(lines: LineStream, test: Test) -> bool:
273 """
274 Parses KTAP/TAP header line and checks version number.
275 Returns False if fails to parse KTAP/TAP header line.
276
277 Accepted formats:
278 - 'KTAP version [version number]'
279 - 'TAP version [version number]'
280
281 Parameters:
282 lines - LineStream of KTAP output to parse
283 test - Test object for current test being parsed
284
285 Return:
286 True if successfully parsed KTAP/TAP header line
287 """
288 ktap_match = KTAP_START.match(lines.peek())
289 tap_match = TAP_START.match(lines.peek())
290 if ktap_match:
291 version_num = int(ktap_match.group(1))
292 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
293 elif tap_match:
294 version_num = int(tap_match.group(1))
295 check_version(version_num, TAP_VERSIONS, 'TAP', test)
296 else:
297 return False
298 lines.pop()
299 return True
300
301TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
302
303def parse_test_header(lines: LineStream, test: Test) -> bool:
304 """
305 Parses test header and stores test name in test object.
306 Returns False if fails to parse test header line.
307
308 Accepted format:
309 - '# Subtest: [test name]'
310
311 Parameters:
312 lines - LineStream of KTAP output to parse
313 test - Test object for current test being parsed
314
315 Return:
316 True if successfully parsed test header line
317 """
318 match = TEST_HEADER.match(lines.peek())
319 if not match:
320 return False
321 test.name = match.group(1)
322 lines.pop()
323 return True
324
325TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
326
327def parse_test_plan(lines: LineStream, test: Test) -> bool:
328 """
329 Parses test plan line and stores the expected number of subtests in
330 test object. Reports an error if expected count is 0.
331 Returns False and sets expected_count to None if there is no valid test
332 plan.
333
334 Accepted format:
335 - '1..[number of subtests]'
336
337 Parameters:
338 lines - LineStream of KTAP output to parse
339 test - Test object for current test being parsed
340
341 Return:
342 True if successfully parsed test plan line
343 """
344 match = TEST_PLAN.match(lines.peek())
345 if not match:
346 test.expected_count = None
347 return False
348 expected_count = int(match.group(1))
349 test.expected_count = expected_count
350 lines.pop()
351 return True
352
353TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
354
355TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
356
357def peek_test_name_match(lines: LineStream, test: Test) -> bool:
358 """
359 Matches current line with the format of a test result line and checks
360 if the name matches the name of the current test.
361 Returns False if fails to match format or name.
362
363 Accepted format:
364 - '[ok|not ok] [test number] [-] [test name] [optional skip
365 directive]'
366
367 Parameters:
368 lines - LineStream of KTAP output to parse
369 test - Test object for current test being parsed
370
371 Return:
372 True if matched a test result line and the name matching the
373 expected test name
374 """
375 line = lines.peek()
376 match = TEST_RESULT.match(line)
377 if not match:
378 return False
379 name = match.group(4)
380 return name == test.name
381
382def parse_test_result(lines: LineStream, test: Test,
383 expected_num: int) -> bool:
384 """
385 Parses test result line and stores the status and name in the test
386 object. Reports an error if the test number does not match expected
387 test number.
388 Returns False if fails to parse test result line.
389
390 Note that the SKIP directive is the only direction that causes a
391 change in status.
392
393 Accepted format:
394 - '[ok|not ok] [test number] [-] [test name] [optional skip
395 directive]'
396
397 Parameters:
398 lines - LineStream of KTAP output to parse
399 test - Test object for current test being parsed
400 expected_num - expected test number for current test
401
402 Return:
403 True if successfully parsed a test result line.
404 """
405 line = lines.peek()
406 match = TEST_RESULT.match(line)
407 skip_match = TEST_RESULT_SKIP.match(line)
408
409 # Check if line matches test result line format
410 if not match:
411 return False
412 lines.pop()
413
414 # Set name of test object
415 if skip_match:
416 test.name = skip_match.group(4)
417 else:
418 test.name = match.group(4)
419
420 # Check test num
421 num = int(match.group(2))
422 if num != expected_num:
423 test.add_error(f'Expected test number {expected_num} but found {num}')
424
425 # Set status of test object
426 status = match.group(1)
427 if skip_match:
428 test.status = TestStatus.SKIPPED
429 elif status == 'ok':
430 test.status = TestStatus.SUCCESS
431 else:
432 test.status = TestStatus.FAILURE
433 return True
434
435def parse_diagnostic(lines: LineStream) -> List[str]:
436 """
437 Parse lines that do not match the format of a test result line or
438 test header line and returns them in list.
439
440 Line formats that are not parsed:
441 - '# Subtest: [test name]'
442 - '[ok|not ok] [test number] [-] [test name] [optional skip
443 directive]'
444 - 'KTAP version [version number]'
445
446 Parameters:
447 lines - LineStream of KTAP output to parse
448
449 Return:
450 Log of diagnostic lines
451 """
452 log = [] # type: List[str]
453 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
454 while lines and not any(re.match(lines.peek())
455 for re in non_diagnostic_lines):
456 log.append(lines.pop())
457 return log
458
459
460# Printing helper methods:
461
462DIVIDER = '=' * 60
463
464def format_test_divider(message: str, len_message: int) -> str:
465 """
466 Returns string with message centered in fixed width divider.
467
468 Example:
469 '===================== message example ====================='
470
471 Parameters:
472 message - message to be centered in divider line
473 len_message - length of the message to be printed such that
474 any characters of the color codes are not counted
475
476 Return:
477 String containing message centered in fixed width divider
478 """
479 default_count = 3 # default number of dashes
480 len_1 = default_count
481 len_2 = default_count
482 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
483 if difference > 0:
484 # calculate number of dashes for each side of the divider
485 len_1 = int(difference / 2)
486 len_2 = difference - len_1
487 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
488
489def print_test_header(test: Test) -> None:
490 """
491 Prints test header with test name and optionally the expected number
492 of subtests.
493
494 Example:
495 '=================== example (2 subtests) ==================='
496
497 Parameters:
498 test - Test object representing current test being printed
499 """
500 message = test.name
501 if message != "":
502 # Add a leading space before the subtest counts only if a test name
503 # is provided using a "# Subtest" header line.
504 message += " "
505 if test.expected_count:
506 if test.expected_count == 1:
507 message += '(1 subtest)'
508 else:
509 message += f'({test.expected_count} subtests)'
510 stdout.print_with_timestamp(format_test_divider(message, len(message)))
511
512def print_log(log: Iterable[str]) -> None:
513 """Prints all strings in saved log for test in yellow."""
514 formatted = textwrap.dedent('\n'.join(log))
515 for line in formatted.splitlines():
516 stdout.print_with_timestamp(stdout.yellow(line))
517
518def format_test_result(test: Test) -> str:
519 """
520 Returns string with formatted test result with colored status and test
521 name.
522
523 Example:
524 '[PASSED] example'
525
526 Parameters:
527 test - Test object representing current test being printed
528
529 Return:
530 String containing formatted test result
531 """
532 if test.status == TestStatus.SUCCESS:
533 return stdout.green('[PASSED] ') + test.name
534 if test.status == TestStatus.SKIPPED:
535 return stdout.yellow('[SKIPPED] ') + test.name
536 if test.status == TestStatus.NO_TESTS:
537 return stdout.yellow('[NO TESTS RUN] ') + test.name
538 if test.status == TestStatus.TEST_CRASHED:
539 print_log(test.log)
540 return stdout.red('[CRASHED] ') + test.name
541 print_log(test.log)
542 return stdout.red('[FAILED] ') + test.name
543
544def print_test_result(test: Test) -> None:
545 """
546 Prints result line with status of test.
547
548 Example:
549 '[PASSED] example'
550
551 Parameters:
552 test - Test object representing current test being printed
553 """
554 stdout.print_with_timestamp(format_test_result(test))
555
556def print_test_footer(test: Test) -> None:
557 """
558 Prints test footer with status of test.
559
560 Example:
561 '===================== [PASSED] example ====================='
562
563 Parameters:
564 test - Test object representing current test being printed
565 """
566 message = format_test_result(test)
567 stdout.print_with_timestamp(format_test_divider(message,
568 len(message) - stdout.color_len()))
569
570
571
572def _summarize_failed_tests(test: Test) -> str:
573 """Tries to summarize all the failing subtests in `test`."""
574
575 def failed_names(test: Test, parent_name: str) -> List[str]:
576 # Note: we use 'main' internally for the top-level test.
577 if not parent_name or parent_name == 'main':
578 full_name = test.name
579 else:
580 full_name = parent_name + '.' + test.name
581
582 if not test.subtests: # this is a leaf node
583 return [full_name]
584
585 # If all the children failed, just say this subtest failed.
586 # Don't summarize it down "the top-level test failed", though.
587 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
588 if parent_name and len(failed_subtests) == len(test.subtests):
589 return [full_name]
590
591 all_failures = [] # type: List[str]
592 for t in failed_subtests:
593 all_failures.extend(failed_names(t, full_name))
594 return all_failures
595
596 failures = failed_names(test, '')
597 # If there are too many failures, printing them out will just be noisy.
598 if len(failures) > 10: # this is an arbitrary limit
599 return ''
600
601 return 'Failures: ' + ', '.join(failures)
602
603
604def print_summary_line(test: Test) -> None:
605 """
606 Prints summary line of test object. Color of line is dependent on
607 status of test. Color is green if test passes, yellow if test is
608 skipped, and red if the test fails or crashes. Summary line contains
609 counts of the statuses of the tests subtests or the test itself if it
610 has no subtests.
611
612 Example:
613 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
614 Errors: 0"
615
616 test - Test object representing current test being printed
617 """
618 if test.status == TestStatus.SUCCESS:
619 color = stdout.green
620 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
621 color = stdout.yellow
622 else:
623 color = stdout.red
624 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
625
626 # Summarize failures that might have gone off-screen since we had a lot
627 # of tests (arbitrarily defined as >=100 for now).
628 if test.ok_status() or test.counts.total() < 100:
629 return
630 summarized = _summarize_failed_tests(test)
631 if not summarized:
632 return
633 stdout.print_with_timestamp(color(summarized))
634
635# Other methods:
636
637def bubble_up_test_results(test: Test) -> None:
638 """
639 If the test has subtests, add the test counts of the subtests to the
640 test and check if any of the tests crashed and if so set the test
641 status to crashed. Otherwise if the test has no subtests add the
642 status of the test to the test counts.
643
644 Parameters:
645 test - Test object for current test being parsed
646 """
647 subtests = test.subtests
648 counts = test.counts
649 status = test.status
650 for t in subtests:
651 counts.add_subtest_counts(t.counts)
652 if counts.total() == 0:
653 counts.add_status(status)
654 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
655 test.status = TestStatus.TEST_CRASHED
656
657def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
658 """
659 Finds next test to parse in LineStream, creates new Test object,
660 parses any subtests of the test, populates Test object with all
661 information (status, name) about the test and the Test objects for
662 any subtests, and then returns the Test object. The method accepts
663 three formats of tests:
664
665 Accepted test formats:
666
667 - Main KTAP/TAP header
668
669 Example:
670
671 KTAP version 1
672 1..4
673 [subtests]
674
675 - Subtest header (must include either the KTAP version line or
676 "# Subtest" header line)
677
678 Example (preferred format with both KTAP version line and
679 "# Subtest" line):
680
681 KTAP version 1
682 # Subtest: name
683 1..3
684 [subtests]
685 ok 1 name
686
687 Example (only "# Subtest" line):
688
689 # Subtest: name
690 1..3
691 [subtests]
692 ok 1 name
693
694 Example (only KTAP version line, compliant with KTAP v1 spec):
695
696 KTAP version 1
697 1..3
698 [subtests]
699 ok 1 name
700
701 - Test result line
702
703 Example:
704
705 ok 1 - test
706
707 Parameters:
708 lines - LineStream of KTAP output to parse
709 expected_num - expected test number for test to be parsed
710 log - list of strings containing any preceding diagnostic lines
711 corresponding to the current test
712 is_subtest - boolean indicating whether test is a subtest
713
714 Return:
715 Test object populated with characteristics and any subtests
716 """
717 test = Test()
718 test.log.extend(log)
719
720 # Parse any errors prior to parsing tests
721 err_log = parse_diagnostic(lines)
722 test.log.extend(err_log)
723
724 if not is_subtest:
725 # If parsing the main/top-level test, parse KTAP version line and
726 # test plan
727 test.name = "main"
728 ktap_line = parse_ktap_header(lines, test)
729 test.log.extend(parse_diagnostic(lines))
730 parse_test_plan(lines, test)
731 parent_test = True
732 else:
733 # If not the main test, attempt to parse a test header containing
734 # the KTAP version line and/or subtest header line
735 ktap_line = parse_ktap_header(lines, test)
736 subtest_line = parse_test_header(lines, test)
737 parent_test = (ktap_line or subtest_line)
738 if parent_test:
739 # If KTAP version line and/or subtest header is found, attempt
740 # to parse test plan and print test header
741 test.log.extend(parse_diagnostic(lines))
742 parse_test_plan(lines, test)
743 print_test_header(test)
744 expected_count = test.expected_count
745 subtests = []
746 test_num = 1
747 while parent_test and (expected_count is None or test_num <= expected_count):
748 # Loop to parse any subtests.
749 # Break after parsing expected number of tests or
750 # if expected number of tests is unknown break when test
751 # result line with matching name to subtest header is found
752 # or no more lines in stream.
753 sub_log = parse_diagnostic(lines)
754 sub_test = Test()
755 if not lines or (peek_test_name_match(lines, test) and
756 is_subtest):
757 if expected_count and test_num <= expected_count:
758 # If parser reaches end of test before
759 # parsing expected number of subtests, print
760 # crashed subtest and record error
761 test.add_error('missing expected subtest!')
762 sub_test.log.extend(sub_log)
763 test.counts.add_status(
764 TestStatus.TEST_CRASHED)
765 print_test_result(sub_test)
766 else:
767 test.log.extend(sub_log)
768 break
769 else:
770 sub_test = parse_test(lines, test_num, sub_log, True)
771 subtests.append(sub_test)
772 test_num += 1
773 test.subtests = subtests
774 if is_subtest:
775 # If not main test, look for test result line
776 test.log.extend(parse_diagnostic(lines))
777 if test.name != "" and not peek_test_name_match(lines, test):
778 test.add_error('missing subtest result line!')
779 else:
780 parse_test_result(lines, test, expected_num)
781
782 # Check for there being no subtests within parent test
783 if parent_test and len(subtests) == 0:
784 # Don't override a bad status if this test had one reported.
785 # Assumption: no subtests means CRASHED is from Test.__init__()
786 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
787 print_log(test.log)
788 test.status = TestStatus.NO_TESTS
789 test.add_error('0 tests run!')
790
791 # Add statuses to TestCounts attribute in Test object
792 bubble_up_test_results(test)
793 if parent_test and is_subtest:
794 # If test has subtests and is not the main test object, print
795 # footer.
796 print_test_footer(test)
797 elif is_subtest:
798 print_test_result(test)
799 return test
800
801def parse_run_tests(kernel_output: Iterable[str]) -> Test:
802 """
803 Using kernel output, extract KTAP lines, parse the lines for test
804 results and print condensed test results and summary line.
805
806 Parameters:
807 kernel_output - Iterable object contains lines of kernel output
808
809 Return:
810 Test - the main test object with all subtests.
811 """
812 stdout.print_with_timestamp(DIVIDER)
813 lines = extract_tap_lines(kernel_output)
814 test = Test()
815 if not lines:
816 test.name = '<missing>'
817 test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
818 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
819 else:
820 test = parse_test(lines, 0, [], False)
821 if test.status != TestStatus.NO_TESTS:
822 test.status = test.counts.get_status()
823 stdout.print_with_timestamp(DIVIDER)
824 print_summary_line(test)
825 return test