Linux Audio

Check our new training course

Loading...
v6.13.7
   1#!/usr/bin/env python3
   2# SPDX-License-Identifier: GPL-2.0
   3
   4"""
   5tdc.py - Linux tc (Traffic Control) unit test driver
   6
   7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
   8"""
   9
  10import re
  11import os
  12import sys
  13import argparse
  14import importlib
  15import json
  16import subprocess
  17import time
  18import traceback
  19import random
  20from multiprocessing import Pool
  21from collections import OrderedDict
  22from string import Template
  23
  24from tdc_config import *
  25from tdc_helper import *
  26
  27import TdcPlugin
  28from TdcResults import *
  29
  30class PluginDependencyException(Exception):
  31    def __init__(self, missing_pg):
  32        self.missing_pg = missing_pg
  33
  34class PluginMgrTestFail(Exception):
  35    def __init__(self, stage, output, message):
  36        self.stage = stage
  37        self.output = output
  38        self.message = message
  39
  40class PluginMgr:
  41    def __init__(self, argparser):
  42        super().__init__()
  43        self.plugins = set()
  44        self.plugin_instances = []
  45        self.failed_plugins = {}
  46        self.argparser = argparser
  47
 
  48        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
  49        for dirpath, dirnames, filenames in os.walk(plugindir):
  50            for fn in filenames:
  51                if (fn.endswith('.py') and
  52                    not fn == '__init__.py' and
  53                    not fn.startswith('#') and
  54                    not fn.startswith('.#')):
  55                    mn = fn[0:-3]
  56                    foo = importlib.import_module('plugins.' + mn)
  57                    self.plugins.add(mn)
  58                    self.plugin_instances[mn] = foo.SubPlugin()
  59
  60    def load_plugin(self, pgdir, pgname):
  61        pgname = pgname[0:-3]
  62        self.plugins.add(pgname)
  63
  64        foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
  65
  66        # nsPlugin must always be the first one
  67        if pgname == "nsPlugin":
  68            self.plugin_instances.insert(0, (pgname, foo.SubPlugin()))
  69            self.plugin_instances[0][1].check_args(self.args, None)
  70        else:
  71            self.plugin_instances.append((pgname, foo.SubPlugin()))
  72            self.plugin_instances[-1][1].check_args(self.args, None)
  73
  74    def get_required_plugins(self, testlist):
  75        '''
  76        Get all required plugins from the list of test cases and return
  77        all unique items.
  78        '''
  79        reqs = set()
  80        for t in testlist:
  81            try:
  82                if 'requires' in t['plugins']:
  83                    if isinstance(t['plugins']['requires'], list):
  84                        reqs.update(set(t['plugins']['requires']))
  85                    else:
  86                        reqs.add(t['plugins']['requires'])
  87                    t['plugins'] = t['plugins']['requires']
  88                else:
  89                    t['plugins'] = []
  90            except KeyError:
  91                t['plugins'] = []
  92                continue
  93
  94        return reqs
  95
  96    def load_required_plugins(self, reqs, parser, args, remaining):
  97        '''
  98        Get all required plugins from the list of test cases and load any plugin
  99        that is not already enabled.
 100        '''
 101        pgd = ['plugin-lib', 'plugin-lib-custom']
 102        pnf = []
 103
 104        for r in reqs:
 105            if r not in self.plugins:
 106                fname = '{}.py'.format(r)
 107                source_path = []
 108                for d in pgd:
 109                    pgpath = '{}/{}'.format(d, fname)
 110                    if os.path.isfile(pgpath):
 111                        source_path.append(pgpath)
 112                if len(source_path) == 0:
 113                    print('ERROR: unable to find required plugin {}'.format(r))
 114                    pnf.append(fname)
 115                    continue
 116                elif len(source_path) > 1:
 117                    print('WARNING: multiple copies of plugin {} found, using version found')
 118                    print('at {}'.format(source_path[0]))
 119                pgdir = source_path[0]
 120                pgdir = pgdir.split('/')[0]
 121                self.load_plugin(pgdir, fname)
 122        if len(pnf) > 0:
 123            raise PluginDependencyException(pnf)
 124
 125        parser = self.call_add_args(parser)
 126        (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
 127        return args
 128
 129    def call_pre_suite(self, testcount, testidlist):
 130        for (_, pgn_inst) in self.plugin_instances:
 131            pgn_inst.pre_suite(testcount, testidlist)
 132
 133    def call_post_suite(self, index):
 134        for (_, pgn_inst) in reversed(self.plugin_instances):
 135            pgn_inst.post_suite(index)
 136
 137    def call_pre_case(self, caseinfo, *, test_skip=False):
 138        for (pgn, pgn_inst) in self.plugin_instances:
 139            if pgn not in caseinfo['plugins']:
 140                continue
 141            try:
 142                pgn_inst.pre_case(caseinfo, test_skip)
 143            except Exception as ee:
 144                print('exception {} in call to pre_case for {} plugin'.
 145                      format(ee, pgn_inst.__class__))
 146                print('testid is {}'.format(caseinfo['id']))
 
 147                raise
 148
 149    def call_post_case(self, caseinfo):
 150        for (pgn, pgn_inst) in reversed(self.plugin_instances):
 151            if pgn not in caseinfo['plugins']:
 152                continue
 153            pgn_inst.post_case()
 154
 155    def call_pre_execute(self, caseinfo):
 156        for (pgn, pgn_inst) in self.plugin_instances:
 157            if pgn not in caseinfo['plugins']:
 158                continue
 159            pgn_inst.pre_execute()
 160
 161    def call_post_execute(self, caseinfo):
 162        for (pgn, pgn_inst) in reversed(self.plugin_instances):
 163            if pgn not in caseinfo['plugins']:
 164                continue
 165            pgn_inst.post_execute()
 166
 167    def call_add_args(self, parser):
 168        for (pgn, pgn_inst) in self.plugin_instances:
 169            parser = pgn_inst.add_args(parser)
 170        return parser
 171
 172    def call_check_args(self, args, remaining):
 173        for (pgn, pgn_inst) in self.plugin_instances:
 174            pgn_inst.check_args(args, remaining)
 175
 176    def call_adjust_command(self, caseinfo, stage, command):
 177        for (pgn, pgn_inst) in self.plugin_instances:
 178            if pgn not in caseinfo['plugins']:
 179                continue
 180            command = pgn_inst.adjust_command(stage, command)
 181        return command
 182
 183    def set_args(self, args):
 184        self.args = args
 185
 186    @staticmethod
 187    def _make_argparser(args):
 188        self.argparser = argparse.ArgumentParser(
 189            description='Linux TC unit tests')
 190
 
 191def replace_keywords(cmd):
 192    """
 193    For a given executable command, substitute any known
 194    variables contained within NAMES with the correct values
 195    """
 196    tcmd = Template(cmd)
 197    subcmd = tcmd.safe_substitute(NAMES)
 198    return subcmd
 199
 200
 201def exec_cmd(caseinfo, args, pm, stage, command):
 202    """
 203    Perform any required modifications on an executable command, then run
 204    it in a subprocess and return the results.
 205    """
 206    if len(command.strip()) == 0:
 207        return None, None
 208    if '$' in command:
 209        command = replace_keywords(command)
 210
 211    command = pm.call_adjust_command(caseinfo, stage, command)
 212    if args.verbose > 0:
 213        print('command "{}"'.format(command))
 214
 215    proc = subprocess.Popen(command,
 216        shell=True,
 217        stdout=subprocess.PIPE,
 218        stderr=subprocess.PIPE,
 219        env=ENVIR)
 
 220
 221    try:
 222        (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
 223        if proc.returncode != 0 and len(serr) > 0:
 224            foutput = serr.decode("utf-8", errors="ignore")
 225        else:
 226            foutput = rawout.decode("utf-8", errors="ignore")
 227    except subprocess.TimeoutExpired:
 228        foutput = "Command \"{}\" timed out\n".format(command)
 229        proc.returncode = 255
 230
 231    proc.stdout.close()
 232    proc.stderr.close()
 233    return proc, foutput
 234
 235
 236def prepare_env(caseinfo, args, pm, stage, prefix, cmdlist, output = None):
 237    """
 238    Execute the setup/teardown commands for a test case.
 239    Optionally terminate test execution if the command fails.
 240    """
 241    if args.verbose > 0:
 242        print('{}'.format(prefix))
 243    for cmdinfo in cmdlist:
 244        if isinstance(cmdinfo, list):
 245            exit_codes = cmdinfo[1:]
 246            cmd = cmdinfo[0]
 247        else:
 248            exit_codes = [0]
 249            cmd = cmdinfo
 250
 251        if not cmd:
 252            continue
 253
 254        (proc, foutput) = exec_cmd(caseinfo, args, pm, stage, cmd)
 255
 256        if proc and (proc.returncode not in exit_codes):
 257            print('', file=sys.stderr)
 258            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
 259                  file=sys.stderr)
 260            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
 261                  file=sys.stderr)
 262            print("returncode {}; expected {}".format(proc.returncode,
 263                                                      exit_codes))
 264            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
 265            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
 266            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
 267            raise PluginMgrTestFail(
 268                stage, output,
 269                '"{}" did not complete successfully'.format(prefix))
 270
 271def verify_by_json(procout, res, tidx, args, pm):
 272    try:
 273        outputJSON = json.loads(procout)
 274    except json.JSONDecodeError:
 275        res.set_result(ResultState.fail)
 276        res.set_failmsg('Cannot decode verify command\'s output. Is it JSON?')
 277        return res
 278
 279    matchJSON = json.loads(json.dumps(tidx['matchJSON']))
 280
 281    if type(outputJSON) != type(matchJSON):
 282        failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {} '
 283        failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
 284        res.set_result(ResultState.fail)
 285        res.set_failmsg(failmsg)
 286        return res
 287
 288    if len(matchJSON) > len(outputJSON):
 289        failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
 290        failmsg = failmsg.format(len(outputJSON), outputJSON, len(matchJSON), matchJSON)
 291        res.set_result(ResultState.fail)
 292        res.set_failmsg(failmsg)
 293        return res
 294    res = find_in_json(res, outputJSON, matchJSON, 0)
 295
 296    return res
 297
 298def find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
 299    if res.get_result() == ResultState.fail:
 300        return res
 301
 302    if type(matchJSONVal) == list:
 303        res = find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey)
 304
 305    elif type(matchJSONVal) == dict:
 306        res = find_in_json_dict(res, outputJSONVal, matchJSONVal)
 307    else:
 308        res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
 309
 310    if res.get_result() != ResultState.fail:
 311        res.set_result(ResultState.success)
 312        return res
 313
 314    return res
 315
 316def find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
 317    if (type(matchJSONVal) != type(outputJSONVal)):
 318        failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
 319        failmsg = failmsg.format(outputJSONVal, matchJSONVal)
 320        res.set_result(ResultState.fail)
 321        res.set_failmsg(failmsg)
 322        return res
 323
 324    if len(matchJSONVal) > len(outputJSONVal):
 325        failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
 326        failmsg = failmsg.format(len(outputJSONVal), outputJSONVal, len(matchJSONVal), matchJSONVal)
 327        res.set_result(ResultState.fail)
 328        res.set_failmsg(failmsg)
 329        return res
 330
 331    for matchJSONIdx, matchJSONVal in enumerate(matchJSONVal):
 332        res = find_in_json(res, outputJSONVal[matchJSONIdx], matchJSONVal,
 333                           matchJSONKey)
 334    return res
 335
 336def find_in_json_dict(res, outputJSONVal, matchJSONVal):
 337    for matchJSONKey, matchJSONVal in matchJSONVal.items():
 338        if type(outputJSONVal) == dict:
 339            if matchJSONKey not in outputJSONVal:
 340                failmsg = 'Key not found in json output: {}: {}\nMatching against output: {}'
 341                failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal)
 342                res.set_result(ResultState.fail)
 343                res.set_failmsg(failmsg)
 344                return res
 345
 346        else:
 347            failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
 348            failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
 349            res.set_result(ResultState.fail)
 350            res.set_failmsg(failmsg)
 351            return rest
 352
 353        if type(outputJSONVal) == dict and (type(outputJSONVal[matchJSONKey]) == dict or
 354                type(outputJSONVal[matchJSONKey]) == list):
 355            if len(matchJSONVal) > 0:
 356                res = find_in_json(res, outputJSONVal[matchJSONKey], matchJSONVal, matchJSONKey)
 357            # handling corner case where matchJSONVal == [] or matchJSONVal == {}
 358            else:
 359                res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
 360        else:
 361            res = find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey)
 362    return res
 363
 364def find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
 365    if matchJSONKey in outputJSONVal:
 366        if matchJSONVal != outputJSONVal[matchJSONKey]:
 367            failmsg = 'Value doesn\'t match: {}: {} != {}\nMatching against output: {}'
 368            failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal[matchJSONKey], outputJSONVal)
 369            res.set_result(ResultState.fail)
 370            res.set_failmsg(failmsg)
 371            return res
 372
 373    return res
 374
 375def run_one_test(pm, args, index, tidx):
 376    global NAMES
 377    ns = NAMES['NS']
 378    dev0 = NAMES['DEV0']
 379    dev1 = NAMES['DEV1']
 380    dummy = NAMES['DUMMY']
 381    result = True
 382    tresult = ""
 383    tap = ""
 384    res = TestResult(tidx['id'], tidx['name'])
 385    if args.verbose > 0:
 386        print("\t====================\n=====> ", end="")
 387    print("Test " + tidx["id"] + ": " + tidx["name"])
 388
 389    if 'skip' in tidx:
 390        if tidx['skip'] == 'yes':
 391            res = TestResult(tidx['id'], tidx['name'])
 392            res.set_result(ResultState.skip)
 393            res.set_errormsg('Test case designated as skipped.')
 394            pm.call_pre_case(tidx, test_skip=True)
 395            pm.call_post_execute(tidx)
 396            return res
 397
 398    if 'dependsOn' in tidx:
 399        if (args.verbose > 0):
 400            print('probe command for test skip')
 401        (p, procout) = exec_cmd(tidx, args, pm, 'execute', tidx['dependsOn'])
 402        if p:
 403            if (p.returncode != 0):
 404                res = TestResult(tidx['id'], tidx['name'])
 405                res.set_result(ResultState.skip)
 406                res.set_errormsg('probe command: test skipped.')
 407                pm.call_pre_case(tidx, test_skip=True)
 408                pm.call_post_execute(tidx)
 409                return res
 410
 411    # populate NAMES with TESTID for this test
 412    NAMES['TESTID'] = tidx['id']
 413    NAMES['NS'] = '{}-{}'.format(NAMES['NS'], tidx['random'])
 414    NAMES['DEV0'] = '{}id{}'.format(NAMES['DEV0'], tidx['id'])
 415    NAMES['DEV1'] = '{}id{}'.format(NAMES['DEV1'], tidx['id'])
 416    NAMES['DUMMY'] = '{}id{}'.format(NAMES['DUMMY'], tidx['id'])
 417
 418    pm.call_pre_case(tidx)
 419    prepare_env(tidx, args, pm, 'setup', "-----> prepare stage", tidx["setup"])
 420
 421    if (args.verbose > 0):
 422        print('-----> execute stage')
 423    pm.call_pre_execute(tidx)
 424    (p, procout) = exec_cmd(tidx, args, pm, 'execute', tidx["cmdUnderTest"])
 425    if p:
 426        exit_code = p.returncode
 427    else:
 428        exit_code = None
 429
 430    pm.call_post_execute(tidx)
 431
 432    if (exit_code is None or exit_code != int(tidx["expExitCode"])):
 433        print("exit: {!r}".format(exit_code))
 434        print("exit: {}".format(int(tidx["expExitCode"])))
 435        #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
 436        res.set_result(ResultState.fail)
 437        res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
 438        print(procout)
 439    else:
 440        if args.verbose > 0:
 441            print('-----> verify stage')
 442        (p, procout) = exec_cmd(tidx, args, pm, 'verify', tidx["verifyCmd"])
 
 
 443        if procout:
 444            if 'matchJSON' in tidx:
 445                verify_by_json(procout, res, tidx, args, pm)
 446            elif 'matchPattern' in tidx:
 447                match_pattern = re.compile(
 448                    str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
 449                match_index = re.findall(match_pattern, procout)
 450                if len(match_index) != int(tidx["matchCount"]):
 451                    res.set_result(ResultState.fail)
 452                    res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
 453                else:
 454                    res.set_result(ResultState.success)
 455            else:
 456                res.set_result(ResultState.fail)
 457                res.set_failmsg('Must specify a match option: matchJSON or matchPattern\n{}'.format(procout))
 458        elif int(tidx["matchCount"]) != 0:
 459            res.set_result(ResultState.fail)
 460            res.set_failmsg('No output generated by verify command.')
 
 
 
 
 
 
 
 
 461        else:
 462            res.set_result(ResultState.success)
 463
 464    prepare_env(tidx, args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
 465    pm.call_post_case(tidx)
 466
 467    index += 1
 468
 469    # remove TESTID from NAMES
 470    del(NAMES['TESTID'])
 471
 472    # Restore names
 473    NAMES['NS'] = ns
 474    NAMES['DEV0'] = dev0
 475    NAMES['DEV1'] = dev1
 476    NAMES['DUMMY'] = dummy
 477
 478    return res
 479
 480def prepare_run(pm, args, testlist):
 481    tcount = len(testlist)
 482    emergency_exit = False
 483    emergency_exit_message = ''
 484
 485    try:
 486        pm.call_pre_suite(tcount, testlist)
 487    except Exception as ee:
 488        ex_type, ex, ex_tb = sys.exc_info()
 489        print('Exception {} {} (caught in pre_suite).'.
 490              format(ex_type, ex))
 491        traceback.print_tb(ex_tb)
 492        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
 493        emergency_exit = True
 494
 495    if emergency_exit:
 496        pm.call_post_suite(1)
 497        return emergency_exit_message
 498
 499def purge_run(pm, index):
 500    pm.call_post_suite(index)
 501
 502def test_runner(pm, args, filtered_tests):
 503    """
 504    Driver function for the unit tests.
 505
 506    Prints information about the tests being run, executes the setup and
 507    teardown commands and the command under test itself. Also determines
 508    success/failure based on the information in the test case and generates
 509    TAP output accordingly.
 510    """
 511    testlist = filtered_tests
 512    tcount = len(testlist)
 513    index = 1
 514    tap = ''
 515    badtest = None
 516    stage = None
 
 
 517
 518    tsr = TestSuiteReport()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 519
 
 
 
 
 
 
 520    for tidx in testlist:
 521        if "flower" in tidx["category"] and args.device == None:
 522            errmsg = "Tests using the DEV2 variable must define the name of a "
 523            errmsg += "physical NIC with the -d option when running tdc.\n"
 524            errmsg += "Test has been skipped."
 525            if args.verbose > 1:
 526                print(errmsg)
 527            res = TestResult(tidx['id'], tidx['name'])
 528            res.set_result(ResultState.skip)
 529            res.set_errormsg(errmsg)
 530            tsr.add_resultdata(res)
 531            index += 1
 532            continue
 533        try:
 534            badtest = tidx  # in case it goes bad
 535            res = run_one_test(pm, args, index, tidx)
 536            tsr.add_resultdata(res)
 537        except PluginMgrTestFail as pmtf:
 538            ex_type, ex, ex_tb = sys.exc_info()
 539            stage = pmtf.stage
 540            message = pmtf.message
 541            output = pmtf.output
 542            res = TestResult(tidx['id'], tidx['name'])
 543            res.set_result(ResultState.fail)
 544            res.set_errormsg(pmtf.message)
 545            res.set_failmsg(pmtf.output)
 546            tsr.add_resultdata(res)
 547            index += 1
 548            print(message)
 549            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
 550                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
 551            print('---------------')
 552            print('traceback')
 553            traceback.print_tb(ex_tb)
 554            print('---------------')
 555            if stage == 'teardown':
 556                print('accumulated output for this test:')
 557                if pmtf.output:
 558                    print(pmtf.output)
 559            print('---------------')
 560            break
 561        index += 1
 562
 563    # if we failed in setup or teardown,
 564    # fill in the remaining tests with ok-skipped
 565    count = index
 
 
 
 
 
 
 
 
 566
 567    if tcount + 1 != count:
 568        for tidx in testlist[count - 1:]:
 569            res = TestResult(tidx['id'], tidx['name'])
 570            res.set_result(ResultState.skip)
 571            msg = 'skipped - previous {} failed {} {}'.format(stage,
 572                index, badtest.get('id', '--Unknown--'))
 573            res.set_errormsg(msg)
 574            tsr.add_resultdata(res)
 575            count += 1
 576
 577    if args.pause:
 578        print('Want to pause\nPress enter to continue ...')
 579        if input(sys.stdin):
 580            print('got something on stdin')
 581
 582    return (index, tsr)
 583
 584def mp_bins(alltests):
 585    serial = []
 586    parallel = []
 587
 588    for test in alltests:
 589        if 'nsPlugin' not in test['plugins']:
 590            serial.append(test)
 591        else:
 592            # We can only create one netdevsim device at a time
 593            if 'netdevsim/new_device' in str(test['setup']):
 594                serial.append(test)
 595            else:
 596                parallel.append(test)
 597
 598    return (serial, parallel)
 599
 600def __mp_runner(tests):
 601    (_, tsr) = test_runner(mp_pm, mp_args, tests)
 602    return tsr._testsuite
 603
 604def test_runner_mp(pm, args, alltests):
 605    prepare_run(pm, args, alltests)
 606
 607    (serial, parallel) = mp_bins(alltests)
 608
 609    batches = [parallel[n : n + 32] for n in range(0, len(parallel), 32)]
 610    batches.insert(0, serial)
 611
 612    print("Executing {} tests in parallel and {} in serial".format(len(parallel), len(serial)))
 613    print("Using {} batches and {} workers".format(len(batches), args.mp))
 614
 615    # We can't pickle these objects so workaround them
 616    global mp_pm
 617    mp_pm = pm
 618
 619    global mp_args
 620    mp_args = args
 621
 622    with Pool(args.mp) as p:
 623        pres = p.map(__mp_runner, batches)
 624
 625    tsr = TestSuiteReport()
 626    for trs in pres:
 627        for res in trs:
 628            tsr.add_resultdata(res)
 629
 630    # Passing an index is not useful in MP
 631    purge_run(pm, None)
 632
 633    return tsr
 634
 635def test_runner_serial(pm, args, alltests):
 636    prepare_run(pm, args, alltests)
 637
 638    if args.verbose:
 639        print("Executing {} tests in serial".format(len(alltests)))
 640
 641    (index, tsr) = test_runner(pm, args, alltests)
 642
 643    purge_run(pm, index)
 644
 645    return tsr
 646
 647def has_blank_ids(idlist):
 648    """
 649    Search the list for empty ID fields and return true/false accordingly.
 650    """
 651    return not(all(k for k in idlist))
 652
 653
 654def load_from_file(filename):
 655    """
 656    Open the JSON file containing the test cases and return them
 657    as list of ordered dictionary objects.
 658    """
 659    try:
 660        with open(filename) as test_data:
 661            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
 662    except json.JSONDecodeError as jde:
 663        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
 664        testlist = list()
 665    else:
 666        idlist = get_id_list(testlist)
 667        if (has_blank_ids(idlist)):
 668            for k in testlist:
 669                k['filename'] = filename
 670    return testlist
 671
 672def identity(string):
 673    return string
 674
 675def args_parse():
 676    """
 677    Create the argument parser.
 678    """
 679    parser = argparse.ArgumentParser(description='Linux TC unit tests')
 680    parser.register('type', None, identity)
 681    return parser
 682
 683
 684def set_args(parser):
 685    """
 686    Set the command line arguments for tdc.
 687    """
 688    parser.add_argument(
 689        '--outfile', type=str,
 690        help='Path to the file in which results should be saved. ' +
 691        'Default target is the current directory.')
 692    parser.add_argument(
 693        '-p', '--path', type=str,
 694        help='The full path to the tc executable to use')
 695    sg = parser.add_argument_group(
 696        'selection', 'select which test cases: ' +
 697        'files plus directories; filtered by categories plus testids')
 698    ag = parser.add_argument_group(
 699        'action', 'select action to perform on selected test cases')
 700
 701    sg.add_argument(
 702        '-D', '--directory', nargs='+', metavar='DIR',
 703        help='Collect tests from the specified directory(ies) ' +
 704        '(default [tc-tests])')
 705    sg.add_argument(
 706        '-f', '--file', nargs='+', metavar='FILE',
 707        help='Run tests from the specified file(s)')
 708    sg.add_argument(
 709        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
 710        help='Run tests only from the specified category/ies, ' +
 711        'or if no category/ies is/are specified, list known categories.')
 712    sg.add_argument(
 713        '-e', '--execute', nargs='+', metavar='ID',
 714        help='Execute the specified test cases with specified IDs')
 715    ag.add_argument(
 716        '-l', '--list', action='store_true',
 717        help='List all test cases, or those only within the specified category')
 718    ag.add_argument(
 719        '-s', '--show', action='store_true', dest='showID',
 720        help='Display the selected test cases')
 721    ag.add_argument(
 722        '-i', '--id', action='store_true', dest='gen_id',
 723        help='Generate ID numbers for new test cases')
 724    parser.add_argument(
 725        '-v', '--verbose', action='count', default=0,
 726        help='Show the commands that are being run')
 727    parser.add_argument(
 728        '--format', default='tap', const='tap', nargs='?',
 729        choices=['none', 'xunit', 'tap'],
 730        help='Specify the format for test results. (Default: TAP)')
 731    parser.add_argument('-d', '--device',
 732                        help='Execute test cases that use a physical device, ' +
 733                        'where DEVICE is its name. (If not defined, tests ' +
 734                        'that require a physical device will be skipped)')
 735    parser.add_argument(
 736        '-P', '--pause', action='store_true',
 737        help='Pause execution just before post-suite stage')
 738    parser.add_argument(
 739        '-J', '--multiprocess', type=int, default=1, dest='mp',
 740        help='Run tests in parallel whenever possible')
 741    return parser
 742
 743
 744def check_default_settings(args, remaining, pm):
 745    """
 746    Process any arguments overriding the default settings,
 747    and ensure the settings are correct.
 748    """
 749    # Allow for overriding specific settings
 750    global NAMES
 751
 752    if args.path != None:
 753        NAMES['TC'] = args.path
 754    if args.device != None:
 755        NAMES['DEV2'] = args.device
 756    if 'TIMEOUT' not in NAMES:
 757        NAMES['TIMEOUT'] = None
 758    if not os.path.isfile(NAMES['TC']):
 759        print("The specified tc path " + NAMES['TC'] + " does not exist.")
 760        exit(1)
 761
 762    pm.call_check_args(args, remaining)
 763
 764
 765def get_id_list(alltests):
 766    """
 767    Generate a list of all IDs in the test cases.
 768    """
 769    return [x["id"] for x in alltests]
 770
 
 771def check_case_id(alltests):
 772    """
 773    Check for duplicate test case IDs.
 774    """
 775    idl = get_id_list(alltests)
 776    return [x for x in idl if idl.count(x) > 1]
 777
 778
 779def does_id_exist(alltests, newid):
 780    """
 781    Check if a given ID already exists in the list of test cases.
 782    """
 783    idl = get_id_list(alltests)
 784    return (any(newid == x for x in idl))
 785
 786
 787def generate_case_ids(alltests):
 788    """
 789    If a test case has a blank ID field, generate a random hex ID for it
 790    and then write the test cases back to disk.
 791    """
 
 792    for c in alltests:
 793        if (c["id"] == ""):
 794            while True:
 795                newid = str('{:04x}'.format(random.randrange(16**4)))
 796                if (does_id_exist(alltests, newid)):
 797                    continue
 798                else:
 799                    c['id'] = newid
 800                    break
 801
 802    ufilename = []
 803    for c in alltests:
 804        if ('filename' in c):
 805            ufilename.append(c['filename'])
 806    ufilename = get_unique_item(ufilename)
 807    for f in ufilename:
 808        testlist = []
 809        for t in alltests:
 810            if 'filename' in t:
 811                if t['filename'] == f:
 812                    del t['filename']
 813                    testlist.append(t)
 814        outfile = open(f, "w")
 815        json.dump(testlist, outfile, indent=4)
 816        outfile.write("\n")
 817        outfile.close()
 818
 819def filter_tests_by_id(args, testlist):
 820    '''
 821    Remove tests from testlist that are not in the named id list.
 822    If id list is empty, return empty list.
 823    '''
 824    newlist = list()
 825    if testlist and args.execute:
 826        target_ids = args.execute
 827
 828        if isinstance(target_ids, list) and (len(target_ids) > 0):
 829            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
 830    return newlist
 831
 832def filter_tests_by_category(args, testlist):
 833    '''
 834    Remove tests from testlist that are not in a named category.
 835    '''
 836    answer = list()
 837    if args.category and testlist:
 838        test_ids = list()
 839        for catg in set(args.category):
 840            if catg == '+c':
 841                continue
 842            print('considering category {}'.format(catg))
 843            for tc in testlist:
 844                if catg in tc['category'] and tc['id'] not in test_ids:
 845                    answer.append(tc)
 846                    test_ids.append(tc['id'])
 847
 848    return answer
 849
 850def set_random(alltests):
 851    for tidx in alltests:
 852        tidx['random'] = random.getrandbits(32)
 853
 854def get_test_cases(args):
 855    """
 856    If a test case file is specified, retrieve tests from that file.
 857    Otherwise, glob for all json files in subdirectories and load from
 858    each one.
 859    Also, if requested, filter by category, and add tests matching
 860    certain ids.
 861    """
 862    import fnmatch
 863
 864    flist = []
 865    testdirs = ['tc-tests']
 866
 867    if args.file:
 868        # at least one file was specified - remove the default directory
 869        testdirs = []
 870
 871        for ff in args.file:
 872            if not os.path.isfile(ff):
 873                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
 874            else:
 875                flist.append(os.path.abspath(ff))
 876
 877    if args.directory:
 878        testdirs = args.directory
 879
 880    for testdir in testdirs:
 881        for root, dirnames, filenames in os.walk(testdir):
 882            for filename in fnmatch.filter(filenames, '*.json'):
 883                candidate = os.path.abspath(os.path.join(root, filename))
 884                if candidate not in testdirs:
 885                    flist.append(candidate)
 886
 887    alltestcases = list()
 888    for casefile in flist:
 889        alltestcases = alltestcases + (load_from_file(casefile))
 890
 891    allcatlist = get_test_categories(alltestcases)
 892    allidlist = get_id_list(alltestcases)
 893
 894    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
 895    idtestcases = filter_tests_by_id(args, alltestcases)
 896    cattestcases = filter_tests_by_category(args, alltestcases)
 897
 898    cat_ids = [x['id'] for x in cattestcases]
 899    if args.execute:
 900        if args.category:
 901            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
 902        else:
 903            alltestcases = idtestcases
 904    else:
 905        if cat_ids:
 906            alltestcases = cattestcases
 907        else:
 908            # just accept the existing value of alltestcases,
 909            # which has been filtered by file/directory
 910            pass
 911
 912    return allcatlist, allidlist, testcases_by_cats, alltestcases
 913
 914
 915def set_operation_mode(pm, parser, args, remaining):
 916    """
 917    Load the test case data and process remaining arguments to determine
 918    what the script should do for this run, and call the appropriate
 919    function.
 920    """
 921    ucat, idlist, testcases, alltests = get_test_cases(args)
 922
 923    if args.gen_id:
 924        if (has_blank_ids(idlist)):
 925            alltests = generate_case_ids(alltests)
 926        else:
 927            print("No empty ID fields found in test files.")
 928        exit(0)
 929
 930    duplicate_ids = check_case_id(alltests)
 931    if (len(duplicate_ids) > 0):
 932        print("The following test case IDs are not unique:")
 933        print(str(set(duplicate_ids)))
 934        print("Please correct them before continuing.")
 935        exit(1)
 936
 937    if args.showID:
 938        for atest in alltests:
 939            print_test_case(atest)
 940        exit(0)
 941
 942    if isinstance(args.category, list) and (len(args.category) == 0):
 943        print("Available categories:")
 944        print_sll(ucat)
 945        exit(0)
 946
 947    if args.list:
 948        list_test_cases(alltests)
 949        exit(0)
 
 950
 951    set_random(alltests)
 952
 953    exit_code = 0 # KSFT_PASS
 954    if len(alltests):
 955        req_plugins = pm.get_required_plugins(alltests)
 956        try:
 957            args = pm.load_required_plugins(req_plugins, parser, args, remaining)
 958        except PluginDependencyException as pde:
 959            print('The following plugins were not found:')
 960            print('{}'.format(pde.missing_pg))
 961
 962        if args.mp > 1:
 963            catresults = test_runner_mp(pm, args, alltests)
 964        else:
 965            catresults = test_runner_serial(pm, args, alltests)
 966
 967        if catresults.count_failures() != 0:
 968            exit_code = 1 # KSFT_FAIL
 969        if args.format == 'none':
 970            print('Test results output suppression requested\n')
 971        else:
 972            print('\nAll test results: \n')
 973            if args.format == 'xunit':
 974                suffix = 'xml'
 975                res = catresults.format_xunit()
 976            elif args.format == 'tap':
 977                suffix = 'tap'
 978                res = catresults.format_tap()
 979            print(res)
 980            print('\n\n')
 981            if not args.outfile:
 982                fname = 'test-results.{}'.format(suffix)
 983            else:
 984                fname = args.outfile
 985            with open(fname, 'w') as fh:
 986                fh.write(res)
 987                fh.close()
 988                if os.getenv('SUDO_UID') is not None:
 989                    os.chown(fname, uid=int(os.getenv('SUDO_UID')),
 990                        gid=int(os.getenv('SUDO_GID')))
 991    else:
 992        print('No tests found\n')
 993        exit_code = 4 # KSFT_SKIP
 994    exit(exit_code)
 
 
 995
 996def main():
 997    """
 998    Start of execution; set up argument parser and get the arguments,
 999    and start operations.
1000    """
1001    import resource
1002
1003    if sys.version_info.major < 3 or sys.version_info.minor < 8:
1004        sys.exit("tdc requires at least python 3.8")
1005
1006    resource.setrlimit(resource.RLIMIT_NOFILE, (1048576, 1048576))
1007
1008    parser = args_parse()
1009    parser = set_args(parser)
1010    pm = PluginMgr(parser)
1011    parser = pm.call_add_args(parser)
1012    (args, remaining) = parser.parse_known_args()
1013    args.NAMES = NAMES
1014    args.mp = min(args.mp, 4)
1015    pm.set_args(args)
1016    check_default_settings(args, remaining, pm)
1017    if args.verbose > 2:
1018        print('args is {}'.format(args))
1019
1020    try:
1021        set_operation_mode(pm, parser, args, remaining)
1022    except KeyboardInterrupt:
1023        # Cleanup on Ctrl-C
1024        pm.call_post_suite(None)
1025
1026if __name__ == "__main__":
1027    main()
v4.17
  1#!/usr/bin/env python3
  2# SPDX-License-Identifier: GPL-2.0
  3
  4"""
  5tdc.py - Linux tc (Traffic Control) unit test driver
  6
  7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
  8"""
  9
 10import re
 11import os
 12import sys
 13import argparse
 14import importlib
 15import json
 16import subprocess
 17import time
 18import traceback
 
 
 19from collections import OrderedDict
 20from string import Template
 21
 22from tdc_config import *
 23from tdc_helper import *
 24
 25import TdcPlugin
 
 26
 
 
 
 27
 28class PluginMgrTestFail(Exception):
 29    def __init__(self, stage, output, message):
 30        self.stage = stage
 31        self.output = output
 32        self.message = message
 33
 34class PluginMgr:
 35    def __init__(self, argparser):
 36        super().__init__()
 37        self.plugins = {}
 38        self.plugin_instances = []
 39        self.args = []
 40        self.argparser = argparser
 41
 42        # TODO, put plugins in order
 43        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
 44        for dirpath, dirnames, filenames in os.walk(plugindir):
 45            for fn in filenames:
 46                if (fn.endswith('.py') and
 47                    not fn == '__init__.py' and
 48                    not fn.startswith('#') and
 49                    not fn.startswith('.#')):
 50                    mn = fn[0:-3]
 51                    foo = importlib.import_module('plugins.' + mn)
 52                    self.plugins[mn] = foo
 53                    self.plugin_instances.append(foo.SubPlugin())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 54
 55    def call_pre_suite(self, testcount, testidlist):
 56        for pgn_inst in self.plugin_instances:
 57            pgn_inst.pre_suite(testcount, testidlist)
 58
 59    def call_post_suite(self, index):
 60        for pgn_inst in reversed(self.plugin_instances):
 61            pgn_inst.post_suite(index)
 62
 63    def call_pre_case(self, test_ordinal, testid):
 64        for pgn_inst in self.plugin_instances:
 
 
 65            try:
 66                pgn_inst.pre_case(test_ordinal, testid)
 67            except Exception as ee:
 68                print('exception {} in call to pre_case for {} plugin'.
 69                      format(ee, pgn_inst.__class__))
 70                print('test_ordinal is {}'.format(test_ordinal))
 71                print('testid is {}'.format(testid))
 72                raise
 73
 74    def call_post_case(self):
 75        for pgn_inst in reversed(self.plugin_instances):
 
 
 76            pgn_inst.post_case()
 77
 78    def call_pre_execute(self):
 79        for pgn_inst in self.plugin_instances:
 
 
 80            pgn_inst.pre_execute()
 81
 82    def call_post_execute(self):
 83        for pgn_inst in reversed(self.plugin_instances):
 
 
 84            pgn_inst.post_execute()
 85
 86    def call_add_args(self, parser):
 87        for pgn_inst in self.plugin_instances:
 88            parser = pgn_inst.add_args(parser)
 89        return parser
 90
 91    def call_check_args(self, args, remaining):
 92        for pgn_inst in self.plugin_instances:
 93            pgn_inst.check_args(args, remaining)
 94
 95    def call_adjust_command(self, stage, command):
 96        for pgn_inst in self.plugin_instances:
 
 
 97            command = pgn_inst.adjust_command(stage, command)
 98        return command
 99
 
 
 
100    @staticmethod
101    def _make_argparser(args):
102        self.argparser = argparse.ArgumentParser(
103            description='Linux TC unit tests')
104
105
106def replace_keywords(cmd):
107    """
108    For a given executable command, substitute any known
109    variables contained within NAMES with the correct values
110    """
111    tcmd = Template(cmd)
112    subcmd = tcmd.safe_substitute(NAMES)
113    return subcmd
114
115
116def exec_cmd(args, pm, stage, command):
117    """
118    Perform any required modifications on an executable command, then run
119    it in a subprocess and return the results.
120    """
121    if len(command.strip()) == 0:
122        return None, None
123    if '$' in command:
124        command = replace_keywords(command)
125
126    command = pm.call_adjust_command(stage, command)
127    if args.verbose > 0:
128        print('command "{}"'.format(command))
 
129    proc = subprocess.Popen(command,
130        shell=True,
131        stdout=subprocess.PIPE,
132        stderr=subprocess.PIPE,
133        env=ENVIR)
134    (rawout, serr) = proc.communicate()
135
136    if proc.returncode != 0 and len(serr) > 0:
137        foutput = serr.decode("utf-8")
138    else:
139        foutput = rawout.decode("utf-8")
 
 
 
 
 
140
141    proc.stdout.close()
142    proc.stderr.close()
143    return proc, foutput
144
145
146def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
147    """
148    Execute the setup/teardown commands for a test case.
149    Optionally terminate test execution if the command fails.
150    """
151    if args.verbose > 0:
152        print('{}'.format(prefix))
153    for cmdinfo in cmdlist:
154        if isinstance(cmdinfo, list):
155            exit_codes = cmdinfo[1:]
156            cmd = cmdinfo[0]
157        else:
158            exit_codes = [0]
159            cmd = cmdinfo
160
161        if not cmd:
162            continue
163
164        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
165
166        if proc and (proc.returncode not in exit_codes):
167            print('', file=sys.stderr)
168            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
169                  file=sys.stderr)
170            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
171                  file=sys.stderr)
 
 
172            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
173            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
174            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
175            raise PluginMgrTestFail(
176                stage, output,
177                '"{}" did not complete successfully'.format(prefix))
178
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179def run_one_test(pm, args, index, tidx):
180    global NAMES
 
 
 
 
181    result = True
182    tresult = ""
183    tap = ""
 
184    if args.verbose > 0:
185        print("\t====================\n=====> ", end="")
186    print("Test " + tidx["id"] + ": " + tidx["name"])
187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
188    # populate NAMES with TESTID for this test
189    NAMES['TESTID'] = tidx['id']
 
 
 
 
190
191    pm.call_pre_case(index, tidx['id'])
192    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
193
194    if (args.verbose > 0):
195        print('-----> execute stage')
196    pm.call_pre_execute()
197    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
198    exit_code = p.returncode
199    pm.call_post_execute()
200
201    if (exit_code != int(tidx["expExitCode"])):
202        result = False
203        print("exit:", exit_code, int(tidx["expExitCode"]))
 
 
 
 
 
 
 
204        print(procout)
205    else:
206        if args.verbose > 0:
207            print('-----> verify stage')
208        match_pattern = re.compile(
209            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
210        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
211        if procout:
212            match_index = re.findall(match_pattern, procout)
213            if len(match_index) != int(tidx["matchCount"]):
214                result = False
 
 
 
 
 
 
 
 
 
 
 
215        elif int(tidx["matchCount"]) != 0:
216            result = False
217
218    if not result:
219        tresult += 'not '
220    tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
221    tap += tresult
222
223    if result == False:
224        if procout:
225            tap += procout
226        else:
227            tap += 'No output!\n'
228
229    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
230    pm.call_post_case()
231
232    index += 1
233
234    # remove TESTID from NAMES
235    del(NAMES['TESTID'])
236    return tap
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
237
238def test_runner(pm, args, filtered_tests):
239    """
240    Driver function for the unit tests.
241
242    Prints information about the tests being run, executes the setup and
243    teardown commands and the command under test itself. Also determines
244    success/failure based on the information in the test case and generates
245    TAP output accordingly.
246    """
247    testlist = filtered_tests
248    tcount = len(testlist)
249    index = 1
250    tap = ''
251    badtest = None
252    stage = None
253    emergency_exit = False
254    emergency_exit_message = ''
255
256    if args.notap:
257        if args.verbose:
258            tap = 'notap requested:  omitting test plan\n'
259    else:
260        tap = str(index) + ".." + str(tcount) + "\n"
261    try:
262        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
263    except Exception as ee:
264        ex_type, ex, ex_tb = sys.exc_info()
265        print('Exception {} {} (caught in pre_suite).'.
266              format(ex_type, ex))
267        # when the extra print statements are uncommented,
268        # the traceback does not appear between them
269        # (it appears way earlier in the tdc.py output)
270        # so don't bother ...
271        # print('--------------------(')
272        # print('traceback')
273        traceback.print_tb(ex_tb)
274        # print('--------------------)')
275        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
276        emergency_exit = True
277        stage = 'pre-SUITE'
278
279    if emergency_exit:
280        pm.call_post_suite(index)
281        return emergency_exit_message
282    if args.verbose > 1:
283        print('give test rig 2 seconds to stabilize')
284    time.sleep(2)
285    for tidx in testlist:
286        if "flower" in tidx["category"] and args.device == None:
 
 
 
287            if args.verbose > 1:
288                print('Not executing test {} {} because DEV2 not defined'.
289                      format(tidx['id'], tidx['name']))
 
 
 
 
290            continue
291        try:
292            badtest = tidx  # in case it goes bad
293            tap += run_one_test(pm, args, index, tidx)
 
294        except PluginMgrTestFail as pmtf:
295            ex_type, ex, ex_tb = sys.exc_info()
296            stage = pmtf.stage
297            message = pmtf.message
298            output = pmtf.output
 
 
 
 
 
 
299            print(message)
300            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
301                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
302            print('---------------')
303            print('traceback')
304            traceback.print_tb(ex_tb)
305            print('---------------')
306            if stage == 'teardown':
307                print('accumulated output for this test:')
308                if pmtf.output:
309                    print(pmtf.output)
310            print('---------------')
311            break
312        index += 1
313
314    # if we failed in setup or teardown,
315    # fill in the remaining tests with ok-skipped
316    count = index
317    if not args.notap:
318        tap += 'about to flush the tap output if tests need to be skipped\n'
319        if tcount + 1 != index:
320            for tidx in testlist[index - 1:]:
321                msg = 'skipped - previous {} failed'.format(stage)
322                tap += 'ok {} - {} # {} {} {}\n'.format(
323                    count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
324                count += 1
325
326        tap += 'done flushing skipped test tap output\n'
 
 
 
 
 
 
 
 
327
328    if args.pause:
329        print('Want to pause\nPress enter to continue ...')
330        if input(sys.stdin):
331            print('got something on stdin')
332
333    pm.call_post_suite(index)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
334
335    return tap
336
337def has_blank_ids(idlist):
338    """
339    Search the list for empty ID fields and return true/false accordingly.
340    """
341    return not(all(k for k in idlist))
342
343
344def load_from_file(filename):
345    """
346    Open the JSON file containing the test cases and return them
347    as list of ordered dictionary objects.
348    """
349    try:
350        with open(filename) as test_data:
351            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
352    except json.JSONDecodeError as jde:
353        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
354        testlist = list()
355    else:
356        idlist = get_id_list(testlist)
357        if (has_blank_ids(idlist)):
358            for k in testlist:
359                k['filename'] = filename
360    return testlist
361
 
 
362
363def args_parse():
364    """
365    Create the argument parser.
366    """
367    parser = argparse.ArgumentParser(description='Linux TC unit tests')
 
368    return parser
369
370
371def set_args(parser):
372    """
373    Set the command line arguments for tdc.
374    """
375    parser.add_argument(
 
 
 
 
376        '-p', '--path', type=str,
377        help='The full path to the tc executable to use')
378    sg = parser.add_argument_group(
379        'selection', 'select which test cases: ' +
380        'files plus directories; filtered by categories plus testids')
381    ag = parser.add_argument_group(
382        'action', 'select action to perform on selected test cases')
383
384    sg.add_argument(
385        '-D', '--directory', nargs='+', metavar='DIR',
386        help='Collect tests from the specified directory(ies) ' +
387        '(default [tc-tests])')
388    sg.add_argument(
389        '-f', '--file', nargs='+', metavar='FILE',
390        help='Run tests from the specified file(s)')
391    sg.add_argument(
392        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
393        help='Run tests only from the specified category/ies, ' +
394        'or if no category/ies is/are specified, list known categories.')
395    sg.add_argument(
396        '-e', '--execute', nargs='+', metavar='ID',
397        help='Execute the specified test cases with specified IDs')
398    ag.add_argument(
399        '-l', '--list', action='store_true',
400        help='List all test cases, or those only within the specified category')
401    ag.add_argument(
402        '-s', '--show', action='store_true', dest='showID',
403        help='Display the selected test cases')
404    ag.add_argument(
405        '-i', '--id', action='store_true', dest='gen_id',
406        help='Generate ID numbers for new test cases')
407    parser.add_argument(
408        '-v', '--verbose', action='count', default=0,
409        help='Show the commands that are being run')
410    parser.add_argument(
411        '-N', '--notap', action='store_true',
412        help='Suppress tap results for command under test')
 
413    parser.add_argument('-d', '--device',
414                        help='Execute the test case in flower category')
 
 
415    parser.add_argument(
416        '-P', '--pause', action='store_true',
417        help='Pause execution just before post-suite stage')
 
 
 
418    return parser
419
420
421def check_default_settings(args, remaining, pm):
422    """
423    Process any arguments overriding the default settings,
424    and ensure the settings are correct.
425    """
426    # Allow for overriding specific settings
427    global NAMES
428
429    if args.path != None:
430        NAMES['TC'] = args.path
431    if args.device != None:
432        NAMES['DEV2'] = args.device
 
 
433    if not os.path.isfile(NAMES['TC']):
434        print("The specified tc path " + NAMES['TC'] + " does not exist.")
435        exit(1)
436
437    pm.call_check_args(args, remaining)
438
439
440def get_id_list(alltests):
441    """
442    Generate a list of all IDs in the test cases.
443    """
444    return [x["id"] for x in alltests]
445
446
447def check_case_id(alltests):
448    """
449    Check for duplicate test case IDs.
450    """
451    idl = get_id_list(alltests)
452    return [x for x in idl if idl.count(x) > 1]
453
454
455def does_id_exist(alltests, newid):
456    """
457    Check if a given ID already exists in the list of test cases.
458    """
459    idl = get_id_list(alltests)
460    return (any(newid == x for x in idl))
461
462
463def generate_case_ids(alltests):
464    """
465    If a test case has a blank ID field, generate a random hex ID for it
466    and then write the test cases back to disk.
467    """
468    import random
469    for c in alltests:
470        if (c["id"] == ""):
471            while True:
472                newid = str('{:04x}'.format(random.randrange(16**4)))
473                if (does_id_exist(alltests, newid)):
474                    continue
475                else:
476                    c['id'] = newid
477                    break
478
479    ufilename = []
480    for c in alltests:
481        if ('filename' in c):
482            ufilename.append(c['filename'])
483    ufilename = get_unique_item(ufilename)
484    for f in ufilename:
485        testlist = []
486        for t in alltests:
487            if 'filename' in t:
488                if t['filename'] == f:
489                    del t['filename']
490                    testlist.append(t)
491        outfile = open(f, "w")
492        json.dump(testlist, outfile, indent=4)
493        outfile.write("\n")
494        outfile.close()
495
496def filter_tests_by_id(args, testlist):
497    '''
498    Remove tests from testlist that are not in the named id list.
499    If id list is empty, return empty list.
500    '''
501    newlist = list()
502    if testlist and args.execute:
503        target_ids = args.execute
504
505        if isinstance(target_ids, list) and (len(target_ids) > 0):
506            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
507    return newlist
508
509def filter_tests_by_category(args, testlist):
510    '''
511    Remove tests from testlist that are not in a named category.
512    '''
513    answer = list()
514    if args.category and testlist:
515        test_ids = list()
516        for catg in set(args.category):
517            if catg == '+c':
518                continue
519            print('considering category {}'.format(catg))
520            for tc in testlist:
521                if catg in tc['category'] and tc['id'] not in test_ids:
522                    answer.append(tc)
523                    test_ids.append(tc['id'])
524
525    return answer
526
 
 
 
 
527def get_test_cases(args):
528    """
529    If a test case file is specified, retrieve tests from that file.
530    Otherwise, glob for all json files in subdirectories and load from
531    each one.
532    Also, if requested, filter by category, and add tests matching
533    certain ids.
534    """
535    import fnmatch
536
537    flist = []
538    testdirs = ['tc-tests']
539
540    if args.file:
541        # at least one file was specified - remove the default directory
542        testdirs = []
543
544        for ff in args.file:
545            if not os.path.isfile(ff):
546                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
547            else:
548                flist.append(os.path.abspath(ff))
549
550    if args.directory:
551        testdirs = args.directory
552
553    for testdir in testdirs:
554        for root, dirnames, filenames in os.walk(testdir):
555            for filename in fnmatch.filter(filenames, '*.json'):
556                candidate = os.path.abspath(os.path.join(root, filename))
557                if candidate not in testdirs:
558                    flist.append(candidate)
559
560    alltestcases = list()
561    for casefile in flist:
562        alltestcases = alltestcases + (load_from_file(casefile))
563
564    allcatlist = get_test_categories(alltestcases)
565    allidlist = get_id_list(alltestcases)
566
567    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
568    idtestcases = filter_tests_by_id(args, alltestcases)
569    cattestcases = filter_tests_by_category(args, alltestcases)
570
571    cat_ids = [x['id'] for x in cattestcases]
572    if args.execute:
573        if args.category:
574            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
575        else:
576            alltestcases = idtestcases
577    else:
578        if cat_ids:
579            alltestcases = cattestcases
580        else:
581            # just accept the existing value of alltestcases,
582            # which has been filtered by file/directory
583            pass
584
585    return allcatlist, allidlist, testcases_by_cats, alltestcases
586
587
588def set_operation_mode(pm, args):
589    """
590    Load the test case data and process remaining arguments to determine
591    what the script should do for this run, and call the appropriate
592    function.
593    """
594    ucat, idlist, testcases, alltests = get_test_cases(args)
595
596    if args.gen_id:
597        if (has_blank_ids(idlist)):
598            alltests = generate_case_ids(alltests)
599        else:
600            print("No empty ID fields found in test files.")
601        exit(0)
602
603    duplicate_ids = check_case_id(alltests)
604    if (len(duplicate_ids) > 0):
605        print("The following test case IDs are not unique:")
606        print(str(set(duplicate_ids)))
607        print("Please correct them before continuing.")
608        exit(1)
609
610    if args.showID:
611        for atest in alltests:
612            print_test_case(atest)
613        exit(0)
614
615    if isinstance(args.category, list) and (len(args.category) == 0):
616        print("Available categories:")
617        print_sll(ucat)
618        exit(0)
619
620    if args.list:
621        if args.list:
622            list_test_cases(alltests)
623            exit(0)
624
 
 
 
625    if len(alltests):
626        catresults = test_runner(pm, args, alltests)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
627    else:
628        catresults = 'No tests found\n'
629    if args.notap:
630        print('Tap output suppression requested\n')
631    else:
632        print('All test results: \n\n{}'.format(catresults))
633
634def main():
635    """
636    Start of execution; set up argument parser and get the arguments,
637    and start operations.
638    """
 
 
 
 
 
 
 
639    parser = args_parse()
640    parser = set_args(parser)
641    pm = PluginMgr(parser)
642    parser = pm.call_add_args(parser)
643    (args, remaining) = parser.parse_known_args()
644    args.NAMES = NAMES
 
 
645    check_default_settings(args, remaining, pm)
646    if args.verbose > 2:
647        print('args is {}'.format(args))
648
649    set_operation_mode(pm, args)
650
651    exit(0)
652
 
653
654if __name__ == "__main__":
655    main()