Linux Audio

Check our new training course

Loading...
v6.9.4
   1#!/usr/bin/env python3
   2# SPDX-License-Identifier: GPL-2.0
   3
   4"""
   5tdc.py - Linux tc (Traffic Control) unit test driver
   6
   7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
   8"""
   9
  10import re
  11import os
  12import sys
  13import argparse
  14import importlib
  15import json
  16import subprocess
  17import time
  18import traceback
  19import random
  20from multiprocessing import Pool
  21from collections import OrderedDict
  22from string import Template
  23
  24from tdc_config import *
  25from tdc_helper import *
  26
  27import TdcPlugin
  28from TdcResults import *
  29
  30class PluginDependencyException(Exception):
  31    def __init__(self, missing_pg):
  32        self.missing_pg = missing_pg
  33
  34class PluginMgrTestFail(Exception):
  35    def __init__(self, stage, output, message):
  36        self.stage = stage
  37        self.output = output
  38        self.message = message
  39
  40class PluginMgr:
  41    def __init__(self, argparser):
  42        super().__init__()
  43        self.plugins = set()
  44        self.plugin_instances = []
  45        self.failed_plugins = {}
  46        self.argparser = argparser
  47
 
  48        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
  49        for dirpath, dirnames, filenames in os.walk(plugindir):
  50            for fn in filenames:
  51                if (fn.endswith('.py') and
  52                    not fn == '__init__.py' and
  53                    not fn.startswith('#') and
  54                    not fn.startswith('.#')):
  55                    mn = fn[0:-3]
  56                    foo = importlib.import_module('plugins.' + mn)
  57                    self.plugins.add(mn)
  58                    self.plugin_instances[mn] = foo.SubPlugin()
  59
  60    def load_plugin(self, pgdir, pgname):
  61        pgname = pgname[0:-3]
  62        self.plugins.add(pgname)
  63
  64        foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
  65
  66        # nsPlugin must always be the first one
  67        if pgname == "nsPlugin":
  68            self.plugin_instances.insert(0, (pgname, foo.SubPlugin()))
  69            self.plugin_instances[0][1].check_args(self.args, None)
  70        else:
  71            self.plugin_instances.append((pgname, foo.SubPlugin()))
  72            self.plugin_instances[-1][1].check_args(self.args, None)
  73
  74    def get_required_plugins(self, testlist):
  75        '''
  76        Get all required plugins from the list of test cases and return
  77        all unique items.
  78        '''
  79        reqs = set()
  80        for t in testlist:
  81            try:
  82                if 'requires' in t['plugins']:
  83                    if isinstance(t['plugins']['requires'], list):
  84                        reqs.update(set(t['plugins']['requires']))
  85                    else:
  86                        reqs.add(t['plugins']['requires'])
  87                    t['plugins'] = t['plugins']['requires']
  88                else:
  89                    t['plugins'] = []
  90            except KeyError:
  91                t['plugins'] = []
  92                continue
  93
  94        return reqs
  95
  96    def load_required_plugins(self, reqs, parser, args, remaining):
  97        '''
  98        Get all required plugins from the list of test cases and load any plugin
  99        that is not already enabled.
 100        '''
 101        pgd = ['plugin-lib', 'plugin-lib-custom']
 102        pnf = []
 103
 104        for r in reqs:
 105            if r not in self.plugins:
 106                fname = '{}.py'.format(r)
 107                source_path = []
 108                for d in pgd:
 109                    pgpath = '{}/{}'.format(d, fname)
 110                    if os.path.isfile(pgpath):
 111                        source_path.append(pgpath)
 112                if len(source_path) == 0:
 113                    print('ERROR: unable to find required plugin {}'.format(r))
 114                    pnf.append(fname)
 115                    continue
 116                elif len(source_path) > 1:
 117                    print('WARNING: multiple copies of plugin {} found, using version found')
 118                    print('at {}'.format(source_path[0]))
 119                pgdir = source_path[0]
 120                pgdir = pgdir.split('/')[0]
 121                self.load_plugin(pgdir, fname)
 122        if len(pnf) > 0:
 123            raise PluginDependencyException(pnf)
 124
 125        parser = self.call_add_args(parser)
 126        (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
 127        return args
 128
 129    def call_pre_suite(self, testcount, testidlist):
 130        for (_, pgn_inst) in self.plugin_instances:
 131            pgn_inst.pre_suite(testcount, testidlist)
 132
 133    def call_post_suite(self, index):
 134        for (_, pgn_inst) in reversed(self.plugin_instances):
 135            pgn_inst.post_suite(index)
 136
 137    def call_pre_case(self, caseinfo, *, test_skip=False):
 138        for (pgn, pgn_inst) in self.plugin_instances:
 139            if pgn not in caseinfo['plugins']:
 140                continue
 141            try:
 142                pgn_inst.pre_case(caseinfo, test_skip)
 143            except Exception as ee:
 144                print('exception {} in call to pre_case for {} plugin'.
 145                      format(ee, pgn_inst.__class__))
 146                print('test_ordinal is {}'.format(test_ordinal))
 147                print('testid is {}'.format(caseinfo['id']))
 148                raise
 149
 150    def call_post_case(self, caseinfo):
 151        for (pgn, pgn_inst) in reversed(self.plugin_instances):
 152            if pgn not in caseinfo['plugins']:
 153                continue
 154            pgn_inst.post_case()
 155
 156    def call_pre_execute(self, caseinfo):
 157        for (pgn, pgn_inst) in self.plugin_instances:
 158            if pgn not in caseinfo['plugins']:
 159                continue
 160            pgn_inst.pre_execute()
 161
 162    def call_post_execute(self, caseinfo):
 163        for (pgn, pgn_inst) in reversed(self.plugin_instances):
 164            if pgn not in caseinfo['plugins']:
 165                continue
 166            pgn_inst.post_execute()
 167
 168    def call_add_args(self, parser):
 169        for (pgn, pgn_inst) in self.plugin_instances:
 170            parser = pgn_inst.add_args(parser)
 171        return parser
 172
 173    def call_check_args(self, args, remaining):
 174        for (pgn, pgn_inst) in self.plugin_instances:
 175            pgn_inst.check_args(args, remaining)
 176
 177    def call_adjust_command(self, caseinfo, stage, command):
 178        for (pgn, pgn_inst) in self.plugin_instances:
 179            if pgn not in caseinfo['plugins']:
 180                continue
 181            command = pgn_inst.adjust_command(stage, command)
 182        return command
 183
 184    def set_args(self, args):
 185        self.args = args
 186
 187    @staticmethod
 188    def _make_argparser(args):
 189        self.argparser = argparse.ArgumentParser(
 190            description='Linux TC unit tests')
 191
 
 192def replace_keywords(cmd):
 193    """
 194    For a given executable command, substitute any known
 195    variables contained within NAMES with the correct values
 196    """
 197    tcmd = Template(cmd)
 198    subcmd = tcmd.safe_substitute(NAMES)
 199    return subcmd
 200
 201
 202def exec_cmd(caseinfo, args, pm, stage, command):
 203    """
 204    Perform any required modifications on an executable command, then run
 205    it in a subprocess and return the results.
 206    """
 207    if len(command.strip()) == 0:
 208        return None, None
 209    if '$' in command:
 210        command = replace_keywords(command)
 211
 212    command = pm.call_adjust_command(caseinfo, stage, command)
 213    if args.verbose > 0:
 214        print('command "{}"'.format(command))
 215
 216    proc = subprocess.Popen(command,
 217        shell=True,
 218        stdout=subprocess.PIPE,
 219        stderr=subprocess.PIPE,
 220        env=ENVIR)
 
 221
 222    try:
 223        (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
 224        if proc.returncode != 0 and len(serr) > 0:
 225            foutput = serr.decode("utf-8", errors="ignore")
 226        else:
 227            foutput = rawout.decode("utf-8", errors="ignore")
 228    except subprocess.TimeoutExpired:
 229        foutput = "Command \"{}\" timed out\n".format(command)
 230        proc.returncode = 255
 231
 232    proc.stdout.close()
 233    proc.stderr.close()
 234    return proc, foutput
 235
 236
 237def prepare_env(caseinfo, args, pm, stage, prefix, cmdlist, output = None):
 238    """
 239    Execute the setup/teardown commands for a test case.
 240    Optionally terminate test execution if the command fails.
 241    """
 242    if args.verbose > 0:
 243        print('{}'.format(prefix))
 244    for cmdinfo in cmdlist:
 245        if isinstance(cmdinfo, list):
 246            exit_codes = cmdinfo[1:]
 247            cmd = cmdinfo[0]
 248        else:
 249            exit_codes = [0]
 250            cmd = cmdinfo
 251
 252        if not cmd:
 253            continue
 254
 255        (proc, foutput) = exec_cmd(caseinfo, args, pm, stage, cmd)
 256
 257        if proc and (proc.returncode not in exit_codes):
 258            print('', file=sys.stderr)
 259            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
 260                  file=sys.stderr)
 261            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
 262                  file=sys.stderr)
 263            print("returncode {}; expected {}".format(proc.returncode,
 264                                                      exit_codes))
 265            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
 266            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
 267            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
 268            raise PluginMgrTestFail(
 269                stage, output,
 270                '"{}" did not complete successfully'.format(prefix))
 271
 272def verify_by_json(procout, res, tidx, args, pm):
 273    try:
 274        outputJSON = json.loads(procout)
 275    except json.JSONDecodeError:
 276        res.set_result(ResultState.fail)
 277        res.set_failmsg('Cannot decode verify command\'s output. Is it JSON?')
 278        return res
 279
 280    matchJSON = json.loads(json.dumps(tidx['matchJSON']))
 281
 282    if type(outputJSON) != type(matchJSON):
 283        failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {} '
 284        failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
 285        res.set_result(ResultState.fail)
 286        res.set_failmsg(failmsg)
 287        return res
 288
 289    if len(matchJSON) > len(outputJSON):
 290        failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
 291        failmsg = failmsg.format(len(outputJSON), outputJSON, len(matchJSON), matchJSON)
 292        res.set_result(ResultState.fail)
 293        res.set_failmsg(failmsg)
 294        return res
 295    res = find_in_json(res, outputJSON, matchJSON, 0)
 296
 297    return res
 298
 299def find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
 300    if res.get_result() == ResultState.fail:
 301        return res
 302
 303    if type(matchJSONVal) == list:
 304        res = find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey)
 305
 306    elif type(matchJSONVal) == dict:
 307        res = find_in_json_dict(res, outputJSONVal, matchJSONVal)
 308    else:
 309        res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
 310
 311    if res.get_result() != ResultState.fail:
 312        res.set_result(ResultState.success)
 313        return res
 314
 315    return res
 316
 317def find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
 318    if (type(matchJSONVal) != type(outputJSONVal)):
 319        failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
 320        failmsg = failmsg.format(outputJSONVal, matchJSONVal)
 321        res.set_result(ResultState.fail)
 322        res.set_failmsg(failmsg)
 323        return res
 324
 325    if len(matchJSONVal) > len(outputJSONVal):
 326        failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
 327        failmsg = failmsg.format(len(outputJSONVal), outputJSONVal, len(matchJSONVal), matchJSONVal)
 328        res.set_result(ResultState.fail)
 329        res.set_failmsg(failmsg)
 330        return res
 331
 332    for matchJSONIdx, matchJSONVal in enumerate(matchJSONVal):
 333        res = find_in_json(res, outputJSONVal[matchJSONIdx], matchJSONVal,
 334                           matchJSONKey)
 335    return res
 336
 337def find_in_json_dict(res, outputJSONVal, matchJSONVal):
 338    for matchJSONKey, matchJSONVal in matchJSONVal.items():
 339        if type(outputJSONVal) == dict:
 340            if matchJSONKey not in outputJSONVal:
 341                failmsg = 'Key not found in json output: {}: {}\nMatching against output: {}'
 342                failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal)
 343                res.set_result(ResultState.fail)
 344                res.set_failmsg(failmsg)
 345                return res
 346
 347        else:
 348            failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
 349            failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
 350            res.set_result(ResultState.fail)
 351            res.set_failmsg(failmsg)
 352            return rest
 353
 354        if type(outputJSONVal) == dict and (type(outputJSONVal[matchJSONKey]) == dict or
 355                type(outputJSONVal[matchJSONKey]) == list):
 356            if len(matchJSONVal) > 0:
 357                res = find_in_json(res, outputJSONVal[matchJSONKey], matchJSONVal, matchJSONKey)
 358            # handling corner case where matchJSONVal == [] or matchJSONVal == {}
 359            else:
 360                res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
 361        else:
 362            res = find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey)
 363    return res
 364
 365def find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
 366    if matchJSONKey in outputJSONVal:
 367        if matchJSONVal != outputJSONVal[matchJSONKey]:
 368            failmsg = 'Value doesn\'t match: {}: {} != {}\nMatching against output: {}'
 369            failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal[matchJSONKey], outputJSONVal)
 370            res.set_result(ResultState.fail)
 371            res.set_failmsg(failmsg)
 372            return res
 373
 374    return res
 375
 376def run_one_test(pm, args, index, tidx):
 377    global NAMES
 378    ns = NAMES['NS']
 379    dev0 = NAMES['DEV0']
 380    dev1 = NAMES['DEV1']
 381    dummy = NAMES['DUMMY']
 382    result = True
 383    tresult = ""
 384    tap = ""
 385    res = TestResult(tidx['id'], tidx['name'])
 386    if args.verbose > 0:
 387        print("\t====================\n=====> ", end="")
 388    print("Test " + tidx["id"] + ": " + tidx["name"])
 389
 390    if 'skip' in tidx:
 391        if tidx['skip'] == 'yes':
 392            res = TestResult(tidx['id'], tidx['name'])
 393            res.set_result(ResultState.skip)
 394            res.set_errormsg('Test case designated as skipped.')
 395            pm.call_pre_case(tidx, test_skip=True)
 396            pm.call_post_execute(tidx)
 397            return res
 398
 399    if 'dependsOn' in tidx:
 400        if (args.verbose > 0):
 401            print('probe command for test skip')
 402        (p, procout) = exec_cmd(tidx, args, pm, 'execute', tidx['dependsOn'])
 403        if p:
 404            if (p.returncode != 0):
 405                res = TestResult(tidx['id'], tidx['name'])
 406                res.set_result(ResultState.skip)
 407                res.set_errormsg('probe command: test skipped.')
 408                pm.call_pre_case(tidx, test_skip=True)
 409                pm.call_post_execute(tidx)
 410                return res
 411
 412    # populate NAMES with TESTID for this test
 413    NAMES['TESTID'] = tidx['id']
 414    NAMES['NS'] = '{}-{}'.format(NAMES['NS'], tidx['random'])
 415    NAMES['DEV0'] = '{}id{}'.format(NAMES['DEV0'], tidx['id'])
 416    NAMES['DEV1'] = '{}id{}'.format(NAMES['DEV1'], tidx['id'])
 417    NAMES['DUMMY'] = '{}id{}'.format(NAMES['DUMMY'], tidx['id'])
 418
 419    pm.call_pre_case(tidx)
 420    prepare_env(tidx, args, pm, 'setup', "-----> prepare stage", tidx["setup"])
 421
 422    if (args.verbose > 0):
 423        print('-----> execute stage')
 424    pm.call_pre_execute(tidx)
 425    (p, procout) = exec_cmd(tidx, args, pm, 'execute', tidx["cmdUnderTest"])
 426    if p:
 427        exit_code = p.returncode
 428    else:
 429        exit_code = None
 430
 431    pm.call_post_execute(tidx)
 432
 433    if (exit_code is None or exit_code != int(tidx["expExitCode"])):
 434        print("exit: {!r}".format(exit_code))
 435        print("exit: {}".format(int(tidx["expExitCode"])))
 436        #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
 437        res.set_result(ResultState.fail)
 438        res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
 439        print(procout)
 440    else:
 441        if args.verbose > 0:
 442            print('-----> verify stage')
 443        (p, procout) = exec_cmd(tidx, args, pm, 'verify', tidx["verifyCmd"])
 
 
 444        if procout:
 445            if 'matchJSON' in tidx:
 446                verify_by_json(procout, res, tidx, args, pm)
 447            elif 'matchPattern' in tidx:
 448                match_pattern = re.compile(
 449                    str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
 450                match_index = re.findall(match_pattern, procout)
 451                if len(match_index) != int(tidx["matchCount"]):
 452                    res.set_result(ResultState.fail)
 453                    res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
 454                else:
 455                    res.set_result(ResultState.success)
 456            else:
 457                res.set_result(ResultState.fail)
 458                res.set_failmsg('Must specify a match option: matchJSON or matchPattern\n{}'.format(procout))
 459        elif int(tidx["matchCount"]) != 0:
 460            res.set_result(ResultState.fail)
 461            res.set_failmsg('No output generated by verify command.')
 
 
 
 
 
 
 
 
 462        else:
 463            res.set_result(ResultState.success)
 464
 465    prepare_env(tidx, args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
 466    pm.call_post_case(tidx)
 467
 468    index += 1
 469
 470    # remove TESTID from NAMES
 471    del(NAMES['TESTID'])
 472
 473    # Restore names
 474    NAMES['NS'] = ns
 475    NAMES['DEV0'] = dev0
 476    NAMES['DEV1'] = dev1
 477    NAMES['DUMMY'] = dummy
 478
 479    return res
 480
 481def prepare_run(pm, args, testlist):
 482    tcount = len(testlist)
 483    emergency_exit = False
 484    emergency_exit_message = ''
 485
 486    try:
 487        pm.call_pre_suite(tcount, testlist)
 488    except Exception as ee:
 489        ex_type, ex, ex_tb = sys.exc_info()
 490        print('Exception {} {} (caught in pre_suite).'.
 491              format(ex_type, ex))
 492        traceback.print_tb(ex_tb)
 493        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
 494        emergency_exit = True
 495
 496    if emergency_exit:
 497        pm.call_post_suite(1)
 498        return emergency_exit_message
 499
 500def purge_run(pm, index):
 501    pm.call_post_suite(index)
 502
 503def test_runner(pm, args, filtered_tests):
 504    """
 505    Driver function for the unit tests.
 506
 507    Prints information about the tests being run, executes the setup and
 508    teardown commands and the command under test itself. Also determines
 509    success/failure based on the information in the test case and generates
 510    TAP output accordingly.
 511    """
 512    testlist = filtered_tests
 513    tcount = len(testlist)
 514    index = 1
 515    tap = ''
 516    badtest = None
 517    stage = None
 
 
 518
 519    tsr = TestSuiteReport()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 520
 
 
 
 
 
 
 521    for tidx in testlist:
 522        if "flower" in tidx["category"] and args.device == None:
 523            errmsg = "Tests using the DEV2 variable must define the name of a "
 524            errmsg += "physical NIC with the -d option when running tdc.\n"
 525            errmsg += "Test has been skipped."
 526            if args.verbose > 1:
 527                print(errmsg)
 528            res = TestResult(tidx['id'], tidx['name'])
 529            res.set_result(ResultState.skip)
 530            res.set_errormsg(errmsg)
 531            tsr.add_resultdata(res)
 532            index += 1
 533            continue
 534        try:
 535            badtest = tidx  # in case it goes bad
 536            res = run_one_test(pm, args, index, tidx)
 537            tsr.add_resultdata(res)
 538        except PluginMgrTestFail as pmtf:
 539            ex_type, ex, ex_tb = sys.exc_info()
 540            stage = pmtf.stage
 541            message = pmtf.message
 542            output = pmtf.output
 543            res = TestResult(tidx['id'], tidx['name'])
 544            res.set_result(ResultState.fail)
 545            res.set_errormsg(pmtf.message)
 546            res.set_failmsg(pmtf.output)
 547            tsr.add_resultdata(res)
 548            index += 1
 549            print(message)
 550            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
 551                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
 552            print('---------------')
 553            print('traceback')
 554            traceback.print_tb(ex_tb)
 555            print('---------------')
 556            if stage == 'teardown':
 557                print('accumulated output for this test:')
 558                if pmtf.output:
 559                    print(pmtf.output)
 560            print('---------------')
 561            break
 562        index += 1
 563
 564    # if we failed in setup or teardown,
 565    # fill in the remaining tests with ok-skipped
 566    count = index
 
 
 
 
 
 
 
 
 567
 568    if tcount + 1 != count:
 569        for tidx in testlist[count - 1:]:
 570            res = TestResult(tidx['id'], tidx['name'])
 571            res.set_result(ResultState.skip)
 572            msg = 'skipped - previous {} failed {} {}'.format(stage,
 573                index, badtest.get('id', '--Unknown--'))
 574            res.set_errormsg(msg)
 575            tsr.add_resultdata(res)
 576            count += 1
 577
 578    if args.pause:
 579        print('Want to pause\nPress enter to continue ...')
 580        if input(sys.stdin):
 581            print('got something on stdin')
 582
 583    return (index, tsr)
 584
 585def mp_bins(alltests):
 586    serial = []
 587    parallel = []
 588
 589    for test in alltests:
 590        if 'nsPlugin' not in test['plugins']:
 591            serial.append(test)
 592        else:
 593            # We can only create one netdevsim device at a time
 594            if 'netdevsim/new_device' in str(test['setup']):
 595                serial.append(test)
 596            else:
 597                parallel.append(test)
 598
 599    return (serial, parallel)
 600
 601def __mp_runner(tests):
 602    (_, tsr) = test_runner(mp_pm, mp_args, tests)
 603    return tsr._testsuite
 604
 605def test_runner_mp(pm, args, alltests):
 606    prepare_run(pm, args, alltests)
 607
 608    (serial, parallel) = mp_bins(alltests)
 609
 610    batches = [parallel[n : n + 32] for n in range(0, len(parallel), 32)]
 611    batches.insert(0, serial)
 612
 613    print("Executing {} tests in parallel and {} in serial".format(len(parallel), len(serial)))
 614    print("Using {} batches and {} workers".format(len(batches), args.mp))
 615
 616    # We can't pickle these objects so workaround them
 617    global mp_pm
 618    mp_pm = pm
 619
 620    global mp_args
 621    mp_args = args
 622
 623    with Pool(args.mp) as p:
 624        pres = p.map(__mp_runner, batches)
 625
 626    tsr = TestSuiteReport()
 627    for trs in pres:
 628        for res in trs:
 629            tsr.add_resultdata(res)
 630
 631    # Passing an index is not useful in MP
 632    purge_run(pm, None)
 633
 634    return tsr
 635
 636def test_runner_serial(pm, args, alltests):
 637    prepare_run(pm, args, alltests)
 638
 639    if args.verbose:
 640        print("Executing {} tests in serial".format(len(alltests)))
 641
 642    (index, tsr) = test_runner(pm, args, alltests)
 643
 644    purge_run(pm, index)
 645
 646    return tsr
 647
 648def has_blank_ids(idlist):
 649    """
 650    Search the list for empty ID fields and return true/false accordingly.
 651    """
 652    return not(all(k for k in idlist))
 653
 654
 655def load_from_file(filename):
 656    """
 657    Open the JSON file containing the test cases and return them
 658    as list of ordered dictionary objects.
 659    """
 660    try:
 661        with open(filename) as test_data:
 662            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
 663    except json.JSONDecodeError as jde:
 664        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
 665        testlist = list()
 666    else:
 667        idlist = get_id_list(testlist)
 668        if (has_blank_ids(idlist)):
 669            for k in testlist:
 670                k['filename'] = filename
 671    return testlist
 672
 673def identity(string):
 674    return string
 675
 676def args_parse():
 677    """
 678    Create the argument parser.
 679    """
 680    parser = argparse.ArgumentParser(description='Linux TC unit tests')
 681    parser.register('type', None, identity)
 682    return parser
 683
 684
 685def set_args(parser):
 686    """
 687    Set the command line arguments for tdc.
 688    """
 689    parser.add_argument(
 690        '--outfile', type=str,
 691        help='Path to the file in which results should be saved. ' +
 692        'Default target is the current directory.')
 693    parser.add_argument(
 694        '-p', '--path', type=str,
 695        help='The full path to the tc executable to use')
 696    sg = parser.add_argument_group(
 697        'selection', 'select which test cases: ' +
 698        'files plus directories; filtered by categories plus testids')
 699    ag = parser.add_argument_group(
 700        'action', 'select action to perform on selected test cases')
 701
 702    sg.add_argument(
 703        '-D', '--directory', nargs='+', metavar='DIR',
 704        help='Collect tests from the specified directory(ies) ' +
 705        '(default [tc-tests])')
 706    sg.add_argument(
 707        '-f', '--file', nargs='+', metavar='FILE',
 708        help='Run tests from the specified file(s)')
 709    sg.add_argument(
 710        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
 711        help='Run tests only from the specified category/ies, ' +
 712        'or if no category/ies is/are specified, list known categories.')
 713    sg.add_argument(
 714        '-e', '--execute', nargs='+', metavar='ID',
 715        help='Execute the specified test cases with specified IDs')
 716    ag.add_argument(
 717        '-l', '--list', action='store_true',
 718        help='List all test cases, or those only within the specified category')
 719    ag.add_argument(
 720        '-s', '--show', action='store_true', dest='showID',
 721        help='Display the selected test cases')
 722    ag.add_argument(
 723        '-i', '--id', action='store_true', dest='gen_id',
 724        help='Generate ID numbers for new test cases')
 725    parser.add_argument(
 726        '-v', '--verbose', action='count', default=0,
 727        help='Show the commands that are being run')
 728    parser.add_argument(
 729        '--format', default='tap', const='tap', nargs='?',
 730        choices=['none', 'xunit', 'tap'],
 731        help='Specify the format for test results. (Default: TAP)')
 732    parser.add_argument('-d', '--device',
 733                        help='Execute test cases that use a physical device, ' +
 734                        'where DEVICE is its name. (If not defined, tests ' +
 735                        'that require a physical device will be skipped)')
 736    parser.add_argument(
 737        '-P', '--pause', action='store_true',
 738        help='Pause execution just before post-suite stage')
 739    parser.add_argument(
 740        '-J', '--multiprocess', type=int, default=1, dest='mp',
 741        help='Run tests in parallel whenever possible')
 742    return parser
 743
 744
 745def check_default_settings(args, remaining, pm):
 746    """
 747    Process any arguments overriding the default settings,
 748    and ensure the settings are correct.
 749    """
 750    # Allow for overriding specific settings
 751    global NAMES
 752
 753    if args.path != None:
 754        NAMES['TC'] = args.path
 755    if args.device != None:
 756        NAMES['DEV2'] = args.device
 757    if 'TIMEOUT' not in NAMES:
 758        NAMES['TIMEOUT'] = None
 759    if not os.path.isfile(NAMES['TC']):
 760        print("The specified tc path " + NAMES['TC'] + " does not exist.")
 761        exit(1)
 762
 763    pm.call_check_args(args, remaining)
 764
 765
 766def get_id_list(alltests):
 767    """
 768    Generate a list of all IDs in the test cases.
 769    """
 770    return [x["id"] for x in alltests]
 771
 
 772def check_case_id(alltests):
 773    """
 774    Check for duplicate test case IDs.
 775    """
 776    idl = get_id_list(alltests)
 777    return [x for x in idl if idl.count(x) > 1]
 778
 779
 780def does_id_exist(alltests, newid):
 781    """
 782    Check if a given ID already exists in the list of test cases.
 783    """
 784    idl = get_id_list(alltests)
 785    return (any(newid == x for x in idl))
 786
 787
 788def generate_case_ids(alltests):
 789    """
 790    If a test case has a blank ID field, generate a random hex ID for it
 791    and then write the test cases back to disk.
 792    """
 
 793    for c in alltests:
 794        if (c["id"] == ""):
 795            while True:
 796                newid = str('{:04x}'.format(random.randrange(16**4)))
 797                if (does_id_exist(alltests, newid)):
 798                    continue
 799                else:
 800                    c['id'] = newid
 801                    break
 802
 803    ufilename = []
 804    for c in alltests:
 805        if ('filename' in c):
 806            ufilename.append(c['filename'])
 807    ufilename = get_unique_item(ufilename)
 808    for f in ufilename:
 809        testlist = []
 810        for t in alltests:
 811            if 'filename' in t:
 812                if t['filename'] == f:
 813                    del t['filename']
 814                    testlist.append(t)
 815        outfile = open(f, "w")
 816        json.dump(testlist, outfile, indent=4)
 817        outfile.write("\n")
 818        outfile.close()
 819
 820def filter_tests_by_id(args, testlist):
 821    '''
 822    Remove tests from testlist that are not in the named id list.
 823    If id list is empty, return empty list.
 824    '''
 825    newlist = list()
 826    if testlist and args.execute:
 827        target_ids = args.execute
 828
 829        if isinstance(target_ids, list) and (len(target_ids) > 0):
 830            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
 831    return newlist
 832
 833def filter_tests_by_category(args, testlist):
 834    '''
 835    Remove tests from testlist that are not in a named category.
 836    '''
 837    answer = list()
 838    if args.category and testlist:
 839        test_ids = list()
 840        for catg in set(args.category):
 841            if catg == '+c':
 842                continue
 843            print('considering category {}'.format(catg))
 844            for tc in testlist:
 845                if catg in tc['category'] and tc['id'] not in test_ids:
 846                    answer.append(tc)
 847                    test_ids.append(tc['id'])
 848
 849    return answer
 850
 851def set_random(alltests):
 852    for tidx in alltests:
 853        tidx['random'] = random.getrandbits(32)
 854
 855def get_test_cases(args):
 856    """
 857    If a test case file is specified, retrieve tests from that file.
 858    Otherwise, glob for all json files in subdirectories and load from
 859    each one.
 860    Also, if requested, filter by category, and add tests matching
 861    certain ids.
 862    """
 863    import fnmatch
 864
 865    flist = []
 866    testdirs = ['tc-tests']
 867
 868    if args.file:
 869        # at least one file was specified - remove the default directory
 870        testdirs = []
 871
 872        for ff in args.file:
 873            if not os.path.isfile(ff):
 874                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
 875            else:
 876                flist.append(os.path.abspath(ff))
 877
 878    if args.directory:
 879        testdirs = args.directory
 880
 881    for testdir in testdirs:
 882        for root, dirnames, filenames in os.walk(testdir):
 883            for filename in fnmatch.filter(filenames, '*.json'):
 884                candidate = os.path.abspath(os.path.join(root, filename))
 885                if candidate not in testdirs:
 886                    flist.append(candidate)
 887
 888    alltestcases = list()
 889    for casefile in flist:
 890        alltestcases = alltestcases + (load_from_file(casefile))
 891
 892    allcatlist = get_test_categories(alltestcases)
 893    allidlist = get_id_list(alltestcases)
 894
 895    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
 896    idtestcases = filter_tests_by_id(args, alltestcases)
 897    cattestcases = filter_tests_by_category(args, alltestcases)
 898
 899    cat_ids = [x['id'] for x in cattestcases]
 900    if args.execute:
 901        if args.category:
 902            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
 903        else:
 904            alltestcases = idtestcases
 905    else:
 906        if cat_ids:
 907            alltestcases = cattestcases
 908        else:
 909            # just accept the existing value of alltestcases,
 910            # which has been filtered by file/directory
 911            pass
 912
 913    return allcatlist, allidlist, testcases_by_cats, alltestcases
 914
 915
 916def set_operation_mode(pm, parser, args, remaining):
 917    """
 918    Load the test case data and process remaining arguments to determine
 919    what the script should do for this run, and call the appropriate
 920    function.
 921    """
 922    ucat, idlist, testcases, alltests = get_test_cases(args)
 923
 924    if args.gen_id:
 925        if (has_blank_ids(idlist)):
 926            alltests = generate_case_ids(alltests)
 927        else:
 928            print("No empty ID fields found in test files.")
 929        exit(0)
 930
 931    duplicate_ids = check_case_id(alltests)
 932    if (len(duplicate_ids) > 0):
 933        print("The following test case IDs are not unique:")
 934        print(str(set(duplicate_ids)))
 935        print("Please correct them before continuing.")
 936        exit(1)
 937
 938    if args.showID:
 939        for atest in alltests:
 940            print_test_case(atest)
 941        exit(0)
 942
 943    if isinstance(args.category, list) and (len(args.category) == 0):
 944        print("Available categories:")
 945        print_sll(ucat)
 946        exit(0)
 947
 948    if args.list:
 949        list_test_cases(alltests)
 950        exit(0)
 
 951
 952    set_random(alltests)
 953
 954    exit_code = 0 # KSFT_PASS
 955    if len(alltests):
 956        req_plugins = pm.get_required_plugins(alltests)
 957        try:
 958            args = pm.load_required_plugins(req_plugins, parser, args, remaining)
 959        except PluginDependencyException as pde:
 960            print('The following plugins were not found:')
 961            print('{}'.format(pde.missing_pg))
 962
 963        if args.mp > 1:
 964            catresults = test_runner_mp(pm, args, alltests)
 965        else:
 966            catresults = test_runner_serial(pm, args, alltests)
 967
 968        if catresults.count_failures() != 0:
 969            exit_code = 1 # KSFT_FAIL
 970        if args.format == 'none':
 971            print('Test results output suppression requested\n')
 972        else:
 973            print('\nAll test results: \n')
 974            if args.format == 'xunit':
 975                suffix = 'xml'
 976                res = catresults.format_xunit()
 977            elif args.format == 'tap':
 978                suffix = 'tap'
 979                res = catresults.format_tap()
 980            print(res)
 981            print('\n\n')
 982            if not args.outfile:
 983                fname = 'test-results.{}'.format(suffix)
 984            else:
 985                fname = args.outfile
 986            with open(fname, 'w') as fh:
 987                fh.write(res)
 988                fh.close()
 989                if os.getenv('SUDO_UID') is not None:
 990                    os.chown(fname, uid=int(os.getenv('SUDO_UID')),
 991                        gid=int(os.getenv('SUDO_GID')))
 992    else:
 993        print('No tests found\n')
 994        exit_code = 4 # KSFT_SKIP
 995    exit(exit_code)
 
 
 996
 997def main():
 998    """
 999    Start of execution; set up argument parser and get the arguments,
1000    and start operations.
1001    """
1002    import resource
1003
1004    if sys.version_info.major < 3 or sys.version_info.minor < 8:
1005        sys.exit("tdc requires at least python 3.8")
1006
1007    resource.setrlimit(resource.RLIMIT_NOFILE, (1048576, 1048576))
1008
1009    parser = args_parse()
1010    parser = set_args(parser)
1011    pm = PluginMgr(parser)
1012    parser = pm.call_add_args(parser)
1013    (args, remaining) = parser.parse_known_args()
1014    args.NAMES = NAMES
1015    args.mp = min(args.mp, 4)
1016    pm.set_args(args)
1017    check_default_settings(args, remaining, pm)
1018    if args.verbose > 2:
1019        print('args is {}'.format(args))
1020
1021    try:
1022        set_operation_mode(pm, parser, args, remaining)
1023    except KeyboardInterrupt:
1024        # Cleanup on Ctrl-C
1025        pm.call_post_suite(None)
1026
1027if __name__ == "__main__":
1028    main()
v4.17
  1#!/usr/bin/env python3
  2# SPDX-License-Identifier: GPL-2.0
  3
  4"""
  5tdc.py - Linux tc (Traffic Control) unit test driver
  6
  7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
  8"""
  9
 10import re
 11import os
 12import sys
 13import argparse
 14import importlib
 15import json
 16import subprocess
 17import time
 18import traceback
 
 
 19from collections import OrderedDict
 20from string import Template
 21
 22from tdc_config import *
 23from tdc_helper import *
 24
 25import TdcPlugin
 
 26
 
 
 
 27
 28class PluginMgrTestFail(Exception):
 29    def __init__(self, stage, output, message):
 30        self.stage = stage
 31        self.output = output
 32        self.message = message
 33
 34class PluginMgr:
 35    def __init__(self, argparser):
 36        super().__init__()
 37        self.plugins = {}
 38        self.plugin_instances = []
 39        self.args = []
 40        self.argparser = argparser
 41
 42        # TODO, put plugins in order
 43        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
 44        for dirpath, dirnames, filenames in os.walk(plugindir):
 45            for fn in filenames:
 46                if (fn.endswith('.py') and
 47                    not fn == '__init__.py' and
 48                    not fn.startswith('#') and
 49                    not fn.startswith('.#')):
 50                    mn = fn[0:-3]
 51                    foo = importlib.import_module('plugins.' + mn)
 52                    self.plugins[mn] = foo
 53                    self.plugin_instances.append(foo.SubPlugin())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 54
 55    def call_pre_suite(self, testcount, testidlist):
 56        for pgn_inst in self.plugin_instances:
 57            pgn_inst.pre_suite(testcount, testidlist)
 58
 59    def call_post_suite(self, index):
 60        for pgn_inst in reversed(self.plugin_instances):
 61            pgn_inst.post_suite(index)
 62
 63    def call_pre_case(self, test_ordinal, testid):
 64        for pgn_inst in self.plugin_instances:
 
 
 65            try:
 66                pgn_inst.pre_case(test_ordinal, testid)
 67            except Exception as ee:
 68                print('exception {} in call to pre_case for {} plugin'.
 69                      format(ee, pgn_inst.__class__))
 70                print('test_ordinal is {}'.format(test_ordinal))
 71                print('testid is {}'.format(testid))
 72                raise
 73
 74    def call_post_case(self):
 75        for pgn_inst in reversed(self.plugin_instances):
 
 
 76            pgn_inst.post_case()
 77
 78    def call_pre_execute(self):
 79        for pgn_inst in self.plugin_instances:
 
 
 80            pgn_inst.pre_execute()
 81
 82    def call_post_execute(self):
 83        for pgn_inst in reversed(self.plugin_instances):
 
 
 84            pgn_inst.post_execute()
 85
 86    def call_add_args(self, parser):
 87        for pgn_inst in self.plugin_instances:
 88            parser = pgn_inst.add_args(parser)
 89        return parser
 90
 91    def call_check_args(self, args, remaining):
 92        for pgn_inst in self.plugin_instances:
 93            pgn_inst.check_args(args, remaining)
 94
 95    def call_adjust_command(self, stage, command):
 96        for pgn_inst in self.plugin_instances:
 
 
 97            command = pgn_inst.adjust_command(stage, command)
 98        return command
 99
 
 
 
100    @staticmethod
101    def _make_argparser(args):
102        self.argparser = argparse.ArgumentParser(
103            description='Linux TC unit tests')
104
105
106def replace_keywords(cmd):
107    """
108    For a given executable command, substitute any known
109    variables contained within NAMES with the correct values
110    """
111    tcmd = Template(cmd)
112    subcmd = tcmd.safe_substitute(NAMES)
113    return subcmd
114
115
116def exec_cmd(args, pm, stage, command):
117    """
118    Perform any required modifications on an executable command, then run
119    it in a subprocess and return the results.
120    """
121    if len(command.strip()) == 0:
122        return None, None
123    if '$' in command:
124        command = replace_keywords(command)
125
126    command = pm.call_adjust_command(stage, command)
127    if args.verbose > 0:
128        print('command "{}"'.format(command))
 
129    proc = subprocess.Popen(command,
130        shell=True,
131        stdout=subprocess.PIPE,
132        stderr=subprocess.PIPE,
133        env=ENVIR)
134    (rawout, serr) = proc.communicate()
135
136    if proc.returncode != 0 and len(serr) > 0:
137        foutput = serr.decode("utf-8")
138    else:
139        foutput = rawout.decode("utf-8")
 
 
 
 
 
140
141    proc.stdout.close()
142    proc.stderr.close()
143    return proc, foutput
144
145
146def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
147    """
148    Execute the setup/teardown commands for a test case.
149    Optionally terminate test execution if the command fails.
150    """
151    if args.verbose > 0:
152        print('{}'.format(prefix))
153    for cmdinfo in cmdlist:
154        if isinstance(cmdinfo, list):
155            exit_codes = cmdinfo[1:]
156            cmd = cmdinfo[0]
157        else:
158            exit_codes = [0]
159            cmd = cmdinfo
160
161        if not cmd:
162            continue
163
164        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
165
166        if proc and (proc.returncode not in exit_codes):
167            print('', file=sys.stderr)
168            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
169                  file=sys.stderr)
170            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
171                  file=sys.stderr)
 
 
172            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
173            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
174            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
175            raise PluginMgrTestFail(
176                stage, output,
177                '"{}" did not complete successfully'.format(prefix))
178
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179def run_one_test(pm, args, index, tidx):
180    global NAMES
 
 
 
 
181    result = True
182    tresult = ""
183    tap = ""
 
184    if args.verbose > 0:
185        print("\t====================\n=====> ", end="")
186    print("Test " + tidx["id"] + ": " + tidx["name"])
187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
188    # populate NAMES with TESTID for this test
189    NAMES['TESTID'] = tidx['id']
 
 
 
 
190
191    pm.call_pre_case(index, tidx['id'])
192    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
193
194    if (args.verbose > 0):
195        print('-----> execute stage')
196    pm.call_pre_execute()
197    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
198    exit_code = p.returncode
199    pm.call_post_execute()
200
201    if (exit_code != int(tidx["expExitCode"])):
202        result = False
203        print("exit:", exit_code, int(tidx["expExitCode"]))
 
 
 
 
 
 
 
204        print(procout)
205    else:
206        if args.verbose > 0:
207            print('-----> verify stage')
208        match_pattern = re.compile(
209            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
210        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
211        if procout:
212            match_index = re.findall(match_pattern, procout)
213            if len(match_index) != int(tidx["matchCount"]):
214                result = False
 
 
 
 
 
 
 
 
 
 
 
215        elif int(tidx["matchCount"]) != 0:
216            result = False
217
218    if not result:
219        tresult += 'not '
220    tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
221    tap += tresult
222
223    if result == False:
224        if procout:
225            tap += procout
226        else:
227            tap += 'No output!\n'
228
229    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
230    pm.call_post_case()
231
232    index += 1
233
234    # remove TESTID from NAMES
235    del(NAMES['TESTID'])
236    return tap
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
237
238def test_runner(pm, args, filtered_tests):
239    """
240    Driver function for the unit tests.
241
242    Prints information about the tests being run, executes the setup and
243    teardown commands and the command under test itself. Also determines
244    success/failure based on the information in the test case and generates
245    TAP output accordingly.
246    """
247    testlist = filtered_tests
248    tcount = len(testlist)
249    index = 1
250    tap = ''
251    badtest = None
252    stage = None
253    emergency_exit = False
254    emergency_exit_message = ''
255
256    if args.notap:
257        if args.verbose:
258            tap = 'notap requested:  omitting test plan\n'
259    else:
260        tap = str(index) + ".." + str(tcount) + "\n"
261    try:
262        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
263    except Exception as ee:
264        ex_type, ex, ex_tb = sys.exc_info()
265        print('Exception {} {} (caught in pre_suite).'.
266              format(ex_type, ex))
267        # when the extra print statements are uncommented,
268        # the traceback does not appear between them
269        # (it appears way earlier in the tdc.py output)
270        # so don't bother ...
271        # print('--------------------(')
272        # print('traceback')
273        traceback.print_tb(ex_tb)
274        # print('--------------------)')
275        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
276        emergency_exit = True
277        stage = 'pre-SUITE'
278
279    if emergency_exit:
280        pm.call_post_suite(index)
281        return emergency_exit_message
282    if args.verbose > 1:
283        print('give test rig 2 seconds to stabilize')
284    time.sleep(2)
285    for tidx in testlist:
286        if "flower" in tidx["category"] and args.device == None:
 
 
 
287            if args.verbose > 1:
288                print('Not executing test {} {} because DEV2 not defined'.
289                      format(tidx['id'], tidx['name']))
 
 
 
 
290            continue
291        try:
292            badtest = tidx  # in case it goes bad
293            tap += run_one_test(pm, args, index, tidx)
 
294        except PluginMgrTestFail as pmtf:
295            ex_type, ex, ex_tb = sys.exc_info()
296            stage = pmtf.stage
297            message = pmtf.message
298            output = pmtf.output
 
 
 
 
 
 
299            print(message)
300            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
301                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
302            print('---------------')
303            print('traceback')
304            traceback.print_tb(ex_tb)
305            print('---------------')
306            if stage == 'teardown':
307                print('accumulated output for this test:')
308                if pmtf.output:
309                    print(pmtf.output)
310            print('---------------')
311            break
312        index += 1
313
314    # if we failed in setup or teardown,
315    # fill in the remaining tests with ok-skipped
316    count = index
317    if not args.notap:
318        tap += 'about to flush the tap output if tests need to be skipped\n'
319        if tcount + 1 != index:
320            for tidx in testlist[index - 1:]:
321                msg = 'skipped - previous {} failed'.format(stage)
322                tap += 'ok {} - {} # {} {} {}\n'.format(
323                    count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
324                count += 1
325
326        tap += 'done flushing skipped test tap output\n'
 
 
 
 
 
 
 
 
327
328    if args.pause:
329        print('Want to pause\nPress enter to continue ...')
330        if input(sys.stdin):
331            print('got something on stdin')
332
333    pm.call_post_suite(index)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
334
335    return tap
336
337def has_blank_ids(idlist):
338    """
339    Search the list for empty ID fields and return true/false accordingly.
340    """
341    return not(all(k for k in idlist))
342
343
344def load_from_file(filename):
345    """
346    Open the JSON file containing the test cases and return them
347    as list of ordered dictionary objects.
348    """
349    try:
350        with open(filename) as test_data:
351            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
352    except json.JSONDecodeError as jde:
353        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
354        testlist = list()
355    else:
356        idlist = get_id_list(testlist)
357        if (has_blank_ids(idlist)):
358            for k in testlist:
359                k['filename'] = filename
360    return testlist
361
 
 
362
363def args_parse():
364    """
365    Create the argument parser.
366    """
367    parser = argparse.ArgumentParser(description='Linux TC unit tests')
 
368    return parser
369
370
371def set_args(parser):
372    """
373    Set the command line arguments for tdc.
374    """
375    parser.add_argument(
 
 
 
 
376        '-p', '--path', type=str,
377        help='The full path to the tc executable to use')
378    sg = parser.add_argument_group(
379        'selection', 'select which test cases: ' +
380        'files plus directories; filtered by categories plus testids')
381    ag = parser.add_argument_group(
382        'action', 'select action to perform on selected test cases')
383
384    sg.add_argument(
385        '-D', '--directory', nargs='+', metavar='DIR',
386        help='Collect tests from the specified directory(ies) ' +
387        '(default [tc-tests])')
388    sg.add_argument(
389        '-f', '--file', nargs='+', metavar='FILE',
390        help='Run tests from the specified file(s)')
391    sg.add_argument(
392        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
393        help='Run tests only from the specified category/ies, ' +
394        'or if no category/ies is/are specified, list known categories.')
395    sg.add_argument(
396        '-e', '--execute', nargs='+', metavar='ID',
397        help='Execute the specified test cases with specified IDs')
398    ag.add_argument(
399        '-l', '--list', action='store_true',
400        help='List all test cases, or those only within the specified category')
401    ag.add_argument(
402        '-s', '--show', action='store_true', dest='showID',
403        help='Display the selected test cases')
404    ag.add_argument(
405        '-i', '--id', action='store_true', dest='gen_id',
406        help='Generate ID numbers for new test cases')
407    parser.add_argument(
408        '-v', '--verbose', action='count', default=0,
409        help='Show the commands that are being run')
410    parser.add_argument(
411        '-N', '--notap', action='store_true',
412        help='Suppress tap results for command under test')
 
413    parser.add_argument('-d', '--device',
414                        help='Execute the test case in flower category')
 
 
415    parser.add_argument(
416        '-P', '--pause', action='store_true',
417        help='Pause execution just before post-suite stage')
 
 
 
418    return parser
419
420
421def check_default_settings(args, remaining, pm):
422    """
423    Process any arguments overriding the default settings,
424    and ensure the settings are correct.
425    """
426    # Allow for overriding specific settings
427    global NAMES
428
429    if args.path != None:
430        NAMES['TC'] = args.path
431    if args.device != None:
432        NAMES['DEV2'] = args.device
 
 
433    if not os.path.isfile(NAMES['TC']):
434        print("The specified tc path " + NAMES['TC'] + " does not exist.")
435        exit(1)
436
437    pm.call_check_args(args, remaining)
438
439
440def get_id_list(alltests):
441    """
442    Generate a list of all IDs in the test cases.
443    """
444    return [x["id"] for x in alltests]
445
446
447def check_case_id(alltests):
448    """
449    Check for duplicate test case IDs.
450    """
451    idl = get_id_list(alltests)
452    return [x for x in idl if idl.count(x) > 1]
453
454
455def does_id_exist(alltests, newid):
456    """
457    Check if a given ID already exists in the list of test cases.
458    """
459    idl = get_id_list(alltests)
460    return (any(newid == x for x in idl))
461
462
463def generate_case_ids(alltests):
464    """
465    If a test case has a blank ID field, generate a random hex ID for it
466    and then write the test cases back to disk.
467    """
468    import random
469    for c in alltests:
470        if (c["id"] == ""):
471            while True:
472                newid = str('{:04x}'.format(random.randrange(16**4)))
473                if (does_id_exist(alltests, newid)):
474                    continue
475                else:
476                    c['id'] = newid
477                    break
478
479    ufilename = []
480    for c in alltests:
481        if ('filename' in c):
482            ufilename.append(c['filename'])
483    ufilename = get_unique_item(ufilename)
484    for f in ufilename:
485        testlist = []
486        for t in alltests:
487            if 'filename' in t:
488                if t['filename'] == f:
489                    del t['filename']
490                    testlist.append(t)
491        outfile = open(f, "w")
492        json.dump(testlist, outfile, indent=4)
493        outfile.write("\n")
494        outfile.close()
495
496def filter_tests_by_id(args, testlist):
497    '''
498    Remove tests from testlist that are not in the named id list.
499    If id list is empty, return empty list.
500    '''
501    newlist = list()
502    if testlist and args.execute:
503        target_ids = args.execute
504
505        if isinstance(target_ids, list) and (len(target_ids) > 0):
506            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
507    return newlist
508
509def filter_tests_by_category(args, testlist):
510    '''
511    Remove tests from testlist that are not in a named category.
512    '''
513    answer = list()
514    if args.category and testlist:
515        test_ids = list()
516        for catg in set(args.category):
517            if catg == '+c':
518                continue
519            print('considering category {}'.format(catg))
520            for tc in testlist:
521                if catg in tc['category'] and tc['id'] not in test_ids:
522                    answer.append(tc)
523                    test_ids.append(tc['id'])
524
525    return answer
526
 
 
 
 
527def get_test_cases(args):
528    """
529    If a test case file is specified, retrieve tests from that file.
530    Otherwise, glob for all json files in subdirectories and load from
531    each one.
532    Also, if requested, filter by category, and add tests matching
533    certain ids.
534    """
535    import fnmatch
536
537    flist = []
538    testdirs = ['tc-tests']
539
540    if args.file:
541        # at least one file was specified - remove the default directory
542        testdirs = []
543
544        for ff in args.file:
545            if not os.path.isfile(ff):
546                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
547            else:
548                flist.append(os.path.abspath(ff))
549
550    if args.directory:
551        testdirs = args.directory
552
553    for testdir in testdirs:
554        for root, dirnames, filenames in os.walk(testdir):
555            for filename in fnmatch.filter(filenames, '*.json'):
556                candidate = os.path.abspath(os.path.join(root, filename))
557                if candidate not in testdirs:
558                    flist.append(candidate)
559
560    alltestcases = list()
561    for casefile in flist:
562        alltestcases = alltestcases + (load_from_file(casefile))
563
564    allcatlist = get_test_categories(alltestcases)
565    allidlist = get_id_list(alltestcases)
566
567    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
568    idtestcases = filter_tests_by_id(args, alltestcases)
569    cattestcases = filter_tests_by_category(args, alltestcases)
570
571    cat_ids = [x['id'] for x in cattestcases]
572    if args.execute:
573        if args.category:
574            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
575        else:
576            alltestcases = idtestcases
577    else:
578        if cat_ids:
579            alltestcases = cattestcases
580        else:
581            # just accept the existing value of alltestcases,
582            # which has been filtered by file/directory
583            pass
584
585    return allcatlist, allidlist, testcases_by_cats, alltestcases
586
587
588def set_operation_mode(pm, args):
589    """
590    Load the test case data and process remaining arguments to determine
591    what the script should do for this run, and call the appropriate
592    function.
593    """
594    ucat, idlist, testcases, alltests = get_test_cases(args)
595
596    if args.gen_id:
597        if (has_blank_ids(idlist)):
598            alltests = generate_case_ids(alltests)
599        else:
600            print("No empty ID fields found in test files.")
601        exit(0)
602
603    duplicate_ids = check_case_id(alltests)
604    if (len(duplicate_ids) > 0):
605        print("The following test case IDs are not unique:")
606        print(str(set(duplicate_ids)))
607        print("Please correct them before continuing.")
608        exit(1)
609
610    if args.showID:
611        for atest in alltests:
612            print_test_case(atest)
613        exit(0)
614
615    if isinstance(args.category, list) and (len(args.category) == 0):
616        print("Available categories:")
617        print_sll(ucat)
618        exit(0)
619
620    if args.list:
621        if args.list:
622            list_test_cases(alltests)
623            exit(0)
624
 
 
 
625    if len(alltests):
626        catresults = test_runner(pm, args, alltests)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
627    else:
628        catresults = 'No tests found\n'
629    if args.notap:
630        print('Tap output suppression requested\n')
631    else:
632        print('All test results: \n\n{}'.format(catresults))
633
634def main():
635    """
636    Start of execution; set up argument parser and get the arguments,
637    and start operations.
638    """
 
 
 
 
 
 
 
639    parser = args_parse()
640    parser = set_args(parser)
641    pm = PluginMgr(parser)
642    parser = pm.call_add_args(parser)
643    (args, remaining) = parser.parse_known_args()
644    args.NAMES = NAMES
 
 
645    check_default_settings(args, remaining, pm)
646    if args.verbose > 2:
647        print('args is {}'.format(args))
648
649    set_operation_mode(pm, args)
650
651    exit(0)
652
 
653
654if __name__ == "__main__":
655    main()