Linux Audio

Check our new training course

Loading...
v5.4
  1#!/usr/bin/env python3
  2# SPDX-License-Identifier: GPL-2.0
  3
  4"""
  5tdc.py - Linux tc (Traffic Control) unit test driver
  6
  7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
  8"""
  9
 10import re
 11import os
 12import sys
 13import argparse
 14import importlib
 15import json
 16import subprocess
 17import time
 18import traceback
 19from collections import OrderedDict
 20from string import Template
 21
 22from tdc_config import *
 23from tdc_helper import *
 24
 25import TdcPlugin
 26from TdcResults import *
 27
 28class PluginDependencyException(Exception):
 29    def __init__(self, missing_pg):
 30        self.missing_pg = missing_pg
 31
 32class PluginMgrTestFail(Exception):
 33    def __init__(self, stage, output, message):
 34        self.stage = stage
 35        self.output = output
 36        self.message = message
 37
 38class PluginMgr:
 39    def __init__(self, argparser):
 40        super().__init__()
 41        self.plugins = {}
 42        self.plugin_instances = []
 43        self.failed_plugins = {}
 44        self.argparser = argparser
 45
 46        # TODO, put plugins in order
 47        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
 48        for dirpath, dirnames, filenames in os.walk(plugindir):
 49            for fn in filenames:
 50                if (fn.endswith('.py') and
 51                    not fn == '__init__.py' and
 52                    not fn.startswith('#') and
 53                    not fn.startswith('.#')):
 54                    mn = fn[0:-3]
 55                    foo = importlib.import_module('plugins.' + mn)
 56                    self.plugins[mn] = foo
 57                    self.plugin_instances.append(foo.SubPlugin())
 58
 59    def load_plugin(self, pgdir, pgname):
 60        pgname = pgname[0:-3]
 61        foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
 62        self.plugins[pgname] = foo
 63        self.plugin_instances.append(foo.SubPlugin())
 64        self.plugin_instances[-1].check_args(self.args, None)
 65
 66    def get_required_plugins(self, testlist):
 67        '''
 68        Get all required plugins from the list of test cases and return
 69        all unique items.
 70        '''
 71        reqs = []
 72        for t in testlist:
 73            try:
 74                if 'requires' in t['plugins']:
 75                    if isinstance(t['plugins']['requires'], list):
 76                        reqs.extend(t['plugins']['requires'])
 77                    else:
 78                        reqs.append(t['plugins']['requires'])
 79            except KeyError:
 80                continue
 81        reqs = get_unique_item(reqs)
 82        return reqs
 83
 84    def load_required_plugins(self, reqs, parser, args, remaining):
 85        '''
 86        Get all required plugins from the list of test cases and load any plugin
 87        that is not already enabled.
 88        '''
 89        pgd = ['plugin-lib', 'plugin-lib-custom']
 90        pnf = []
 91
 92        for r in reqs:
 93            if r not in self.plugins:
 94                fname = '{}.py'.format(r)
 95                source_path = []
 96                for d in pgd:
 97                    pgpath = '{}/{}'.format(d, fname)
 98                    if os.path.isfile(pgpath):
 99                        source_path.append(pgpath)
100                if len(source_path) == 0:
101                    print('ERROR: unable to find required plugin {}'.format(r))
102                    pnf.append(fname)
103                    continue
104                elif len(source_path) > 1:
105                    print('WARNING: multiple copies of plugin {} found, using version found')
106                    print('at {}'.format(source_path[0]))
107                pgdir = source_path[0]
108                pgdir = pgdir.split('/')[0]
109                self.load_plugin(pgdir, fname)
110        if len(pnf) > 0:
111            raise PluginDependencyException(pnf)
112
113        parser = self.call_add_args(parser)
114        (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
115        return args
116
117    def call_pre_suite(self, testcount, testidlist):
118        for pgn_inst in self.plugin_instances:
119            pgn_inst.pre_suite(testcount, testidlist)
120
121    def call_post_suite(self, index):
122        for pgn_inst in reversed(self.plugin_instances):
123            pgn_inst.post_suite(index)
124
125    def call_pre_case(self, caseinfo, *, test_skip=False):
126        for pgn_inst in self.plugin_instances:
127            try:
128                pgn_inst.pre_case(caseinfo, test_skip)
129            except Exception as ee:
130                print('exception {} in call to pre_case for {} plugin'.
131                      format(ee, pgn_inst.__class__))
132                print('test_ordinal is {}'.format(test_ordinal))
133                print('testid is {}'.format(caseinfo['id']))
134                raise
135
136    def call_post_case(self):
137        for pgn_inst in reversed(self.plugin_instances):
138            pgn_inst.post_case()
139
140    def call_pre_execute(self):
141        for pgn_inst in self.plugin_instances:
142            pgn_inst.pre_execute()
143
144    def call_post_execute(self):
145        for pgn_inst in reversed(self.plugin_instances):
146            pgn_inst.post_execute()
147
148    def call_add_args(self, parser):
149        for pgn_inst in self.plugin_instances:
150            parser = pgn_inst.add_args(parser)
151        return parser
152
153    def call_check_args(self, args, remaining):
154        for pgn_inst in self.plugin_instances:
155            pgn_inst.check_args(args, remaining)
156
157    def call_adjust_command(self, stage, command):
158        for pgn_inst in self.plugin_instances:
159            command = pgn_inst.adjust_command(stage, command)
160        return command
161
162    def set_args(self, args):
163        self.args = args
164
165    @staticmethod
166    def _make_argparser(args):
167        self.argparser = argparse.ArgumentParser(
168            description='Linux TC unit tests')
169
 
170def replace_keywords(cmd):
171    """
172    For a given executable command, substitute any known
173    variables contained within NAMES with the correct values
174    """
175    tcmd = Template(cmd)
176    subcmd = tcmd.safe_substitute(NAMES)
177    return subcmd
178
179
180def exec_cmd(args, pm, stage, command):
181    """
182    Perform any required modifications on an executable command, then run
183    it in a subprocess and return the results.
184    """
185    if len(command.strip()) == 0:
186        return None, None
187    if '$' in command:
188        command = replace_keywords(command)
189
190    command = pm.call_adjust_command(stage, command)
191    if args.verbose > 0:
192        print('command "{}"'.format(command))
193    proc = subprocess.Popen(command,
194        shell=True,
195        stdout=subprocess.PIPE,
196        stderr=subprocess.PIPE,
197        env=ENVIR)
 
198
199    try:
200        (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
201        if proc.returncode != 0 and len(serr) > 0:
202            foutput = serr.decode("utf-8", errors="ignore")
203        else:
204            foutput = rawout.decode("utf-8", errors="ignore")
205    except subprocess.TimeoutExpired:
206        foutput = "Command \"{}\" timed out\n".format(command)
207        proc.returncode = 255
208
209    proc.stdout.close()
210    proc.stderr.close()
211    return proc, foutput
212
213
214def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
215    """
216    Execute the setup/teardown commands for a test case.
217    Optionally terminate test execution if the command fails.
218    """
219    if args.verbose > 0:
220        print('{}'.format(prefix))
221    for cmdinfo in cmdlist:
222        if isinstance(cmdinfo, list):
223            exit_codes = cmdinfo[1:]
224            cmd = cmdinfo[0]
225        else:
226            exit_codes = [0]
227            cmd = cmdinfo
228
229        if not cmd:
230            continue
231
232        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
233
234        if proc and (proc.returncode not in exit_codes):
235            print('', file=sys.stderr)
236            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
237                  file=sys.stderr)
238            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
239                  file=sys.stderr)
240            print("returncode {}; expected {}".format(proc.returncode,
241                                                      exit_codes))
242            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
243            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
244            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
245            raise PluginMgrTestFail(
246                stage, output,
247                '"{}" did not complete successfully'.format(prefix))
248
249def run_one_test(pm, args, index, tidx):
250    global NAMES
251    result = True
252    tresult = ""
253    tap = ""
254    res = TestResult(tidx['id'], tidx['name'])
255    if args.verbose > 0:
256        print("\t====================\n=====> ", end="")
257    print("Test " + tidx["id"] + ": " + tidx["name"])
258
259    if 'skip' in tidx:
260        if tidx['skip'] == 'yes':
261            res = TestResult(tidx['id'], tidx['name'])
262            res.set_result(ResultState.skip)
263            res.set_errormsg('Test case designated as skipped.')
264            pm.call_pre_case(tidx, test_skip=True)
265            pm.call_post_execute()
266            return res
267
268    # populate NAMES with TESTID for this test
269    NAMES['TESTID'] = tidx['id']
270
271    pm.call_pre_case(tidx)
272    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
273
274    if (args.verbose > 0):
275        print('-----> execute stage')
276    pm.call_pre_execute()
277    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
278    if p:
279        exit_code = p.returncode
280    else:
281        exit_code = None
282
283    pm.call_post_execute()
284
285    if (exit_code is None or exit_code != int(tidx["expExitCode"])):
286        print("exit: {!r}".format(exit_code))
287        print("exit: {}".format(int(tidx["expExitCode"])))
288        #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
289        res.set_result(ResultState.fail)
290        res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
291        print(procout)
292    else:
293        if args.verbose > 0:
294            print('-----> verify stage')
295        match_pattern = re.compile(
296            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
297        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
298        if procout:
299            match_index = re.findall(match_pattern, procout)
300            if len(match_index) != int(tidx["matchCount"]):
301                res.set_result(ResultState.fail)
302                res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
303            else:
304                res.set_result(ResultState.success)
305        elif int(tidx["matchCount"]) != 0:
306            res.set_result(ResultState.fail)
307            res.set_failmsg('No output generated by verify command.')
 
 
 
 
 
 
 
 
308        else:
309            res.set_result(ResultState.success)
310
311    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
312    pm.call_post_case()
313
314    index += 1
315
316    # remove TESTID from NAMES
317    del(NAMES['TESTID'])
318    return res
319
320def test_runner(pm, args, filtered_tests):
321    """
322    Driver function for the unit tests.
323
324    Prints information about the tests being run, executes the setup and
325    teardown commands and the command under test itself. Also determines
326    success/failure based on the information in the test case and generates
327    TAP output accordingly.
328    """
329    testlist = filtered_tests
330    tcount = len(testlist)
331    index = 1
332    tap = ''
333    badtest = None
334    stage = None
335    emergency_exit = False
336    emergency_exit_message = ''
337
338    tsr = TestSuiteReport()
339
 
 
 
340    try:
341        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
342    except Exception as ee:
343        ex_type, ex, ex_tb = sys.exc_info()
344        print('Exception {} {} (caught in pre_suite).'.
345              format(ex_type, ex))
 
 
 
 
 
 
346        traceback.print_tb(ex_tb)
 
347        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
348        emergency_exit = True
349        stage = 'pre-SUITE'
350
351    if emergency_exit:
352        pm.call_post_suite(index)
353        return emergency_exit_message
354    if args.verbose > 1:
355        print('give test rig 2 seconds to stabilize')
356    time.sleep(2)
357    for tidx in testlist:
358        if "flower" in tidx["category"] and args.device == None:
359            errmsg = "Tests using the DEV2 variable must define the name of a "
360            errmsg += "physical NIC with the -d option when running tdc.\n"
361            errmsg += "Test has been skipped."
362            if args.verbose > 1:
363                print(errmsg)
364            res = TestResult(tidx['id'], tidx['name'])
365            res.set_result(ResultState.skip)
366            res.set_errormsg(errmsg)
367            tsr.add_resultdata(res)
368            continue
369        try:
370            badtest = tidx  # in case it goes bad
371            res = run_one_test(pm, args, index, tidx)
372            tsr.add_resultdata(res)
373        except PluginMgrTestFail as pmtf:
374            ex_type, ex, ex_tb = sys.exc_info()
375            stage = pmtf.stage
376            message = pmtf.message
377            output = pmtf.output
378            res = TestResult(tidx['id'], tidx['name'])
379            res.set_result(ResultState.skip)
380            res.set_errormsg(pmtf.message)
381            res.set_failmsg(pmtf.output)
382            tsr.add_resultdata(res)
383            index += 1
384            print(message)
385            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
386                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
387            print('---------------')
388            print('traceback')
389            traceback.print_tb(ex_tb)
390            print('---------------')
391            if stage == 'teardown':
392                print('accumulated output for this test:')
393                if pmtf.output:
394                    print(pmtf.output)
395            print('---------------')
396            break
397        index += 1
398
399    # if we failed in setup or teardown,
400    # fill in the remaining tests with ok-skipped
401    count = index
 
 
 
 
 
 
 
 
402
403    if tcount + 1 != count:
404        for tidx in testlist[count - 1:]:
405            res = TestResult(tidx['id'], tidx['name'])
406            res.set_result(ResultState.skip)
407            msg = 'skipped - previous {} failed {} {}'.format(stage,
408                index, badtest.get('id', '--Unknown--'))
409            res.set_errormsg(msg)
410            tsr.add_resultdata(res)
411            count += 1
412
413    if args.pause:
414        print('Want to pause\nPress enter to continue ...')
415        if input(sys.stdin):
416            print('got something on stdin')
417
418    pm.call_post_suite(index)
419
420    return tsr
421
422def has_blank_ids(idlist):
423    """
424    Search the list for empty ID fields and return true/false accordingly.
425    """
426    return not(all(k for k in idlist))
427
428
429def load_from_file(filename):
430    """
431    Open the JSON file containing the test cases and return them
432    as list of ordered dictionary objects.
433    """
434    try:
435        with open(filename) as test_data:
436            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
437    except json.JSONDecodeError as jde:
438        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
439        testlist = list()
440    else:
441        idlist = get_id_list(testlist)
442        if (has_blank_ids(idlist)):
443            for k in testlist:
444                k['filename'] = filename
445    return testlist
446
447
448def args_parse():
449    """
450    Create the argument parser.
451    """
452    parser = argparse.ArgumentParser(description='Linux TC unit tests')
453    return parser
454
455
456def set_args(parser):
457    """
458    Set the command line arguments for tdc.
459    """
460    parser.add_argument(
461        '--outfile', type=str,
462        help='Path to the file in which results should be saved. ' +
463        'Default target is the current directory.')
464    parser.add_argument(
465        '-p', '--path', type=str,
466        help='The full path to the tc executable to use')
467    sg = parser.add_argument_group(
468        'selection', 'select which test cases: ' +
469        'files plus directories; filtered by categories plus testids')
470    ag = parser.add_argument_group(
471        'action', 'select action to perform on selected test cases')
472
473    sg.add_argument(
474        '-D', '--directory', nargs='+', metavar='DIR',
475        help='Collect tests from the specified directory(ies) ' +
476        '(default [tc-tests])')
477    sg.add_argument(
478        '-f', '--file', nargs='+', metavar='FILE',
479        help='Run tests from the specified file(s)')
480    sg.add_argument(
481        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
482        help='Run tests only from the specified category/ies, ' +
483        'or if no category/ies is/are specified, list known categories.')
484    sg.add_argument(
485        '-e', '--execute', nargs='+', metavar='ID',
486        help='Execute the specified test cases with specified IDs')
487    ag.add_argument(
488        '-l', '--list', action='store_true',
489        help='List all test cases, or those only within the specified category')
490    ag.add_argument(
491        '-s', '--show', action='store_true', dest='showID',
492        help='Display the selected test cases')
493    ag.add_argument(
494        '-i', '--id', action='store_true', dest='gen_id',
495        help='Generate ID numbers for new test cases')
496    parser.add_argument(
497        '-v', '--verbose', action='count', default=0,
498        help='Show the commands that are being run')
499    parser.add_argument(
500        '--format', default='tap', const='tap', nargs='?',
501        choices=['none', 'xunit', 'tap'],
502        help='Specify the format for test results. (Default: TAP)')
503    parser.add_argument('-d', '--device',
504                        help='Execute test cases that use a physical device, ' +
505                        'where DEVICE is its name. (If not defined, tests ' +
506                        'that require a physical device will be skipped)')
507    parser.add_argument(
508        '-P', '--pause', action='store_true',
509        help='Pause execution just before post-suite stage')
510    return parser
511
512
513def check_default_settings(args, remaining, pm):
514    """
515    Process any arguments overriding the default settings,
516    and ensure the settings are correct.
517    """
518    # Allow for overriding specific settings
519    global NAMES
520
521    if args.path != None:
522        NAMES['TC'] = args.path
523    if args.device != None:
524        NAMES['DEV2'] = args.device
525    if 'TIMEOUT' not in NAMES:
526        NAMES['TIMEOUT'] = None
527    if not os.path.isfile(NAMES['TC']):
528        print("The specified tc path " + NAMES['TC'] + " does not exist.")
529        exit(1)
530
531    pm.call_check_args(args, remaining)
532
533
534def get_id_list(alltests):
535    """
536    Generate a list of all IDs in the test cases.
537    """
538    return [x["id"] for x in alltests]
539
540
541def check_case_id(alltests):
542    """
543    Check for duplicate test case IDs.
544    """
545    idl = get_id_list(alltests)
546    return [x for x in idl if idl.count(x) > 1]
547
548
549def does_id_exist(alltests, newid):
550    """
551    Check if a given ID already exists in the list of test cases.
552    """
553    idl = get_id_list(alltests)
554    return (any(newid == x for x in idl))
555
556
557def generate_case_ids(alltests):
558    """
559    If a test case has a blank ID field, generate a random hex ID for it
560    and then write the test cases back to disk.
561    """
562    import random
563    for c in alltests:
564        if (c["id"] == ""):
565            while True:
566                newid = str('{:04x}'.format(random.randrange(16**4)))
567                if (does_id_exist(alltests, newid)):
568                    continue
569                else:
570                    c['id'] = newid
571                    break
572
573    ufilename = []
574    for c in alltests:
575        if ('filename' in c):
576            ufilename.append(c['filename'])
577    ufilename = get_unique_item(ufilename)
578    for f in ufilename:
579        testlist = []
580        for t in alltests:
581            if 'filename' in t:
582                if t['filename'] == f:
583                    del t['filename']
584                    testlist.append(t)
585        outfile = open(f, "w")
586        json.dump(testlist, outfile, indent=4)
587        outfile.write("\n")
588        outfile.close()
589
590def filter_tests_by_id(args, testlist):
591    '''
592    Remove tests from testlist that are not in the named id list.
593    If id list is empty, return empty list.
594    '''
595    newlist = list()
596    if testlist and args.execute:
597        target_ids = args.execute
598
599        if isinstance(target_ids, list) and (len(target_ids) > 0):
600            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
601    return newlist
602
603def filter_tests_by_category(args, testlist):
604    '''
605    Remove tests from testlist that are not in a named category.
606    '''
607    answer = list()
608    if args.category and testlist:
609        test_ids = list()
610        for catg in set(args.category):
611            if catg == '+c':
612                continue
613            print('considering category {}'.format(catg))
614            for tc in testlist:
615                if catg in tc['category'] and tc['id'] not in test_ids:
616                    answer.append(tc)
617                    test_ids.append(tc['id'])
618
619    return answer
620
621
622def get_test_cases(args):
623    """
624    If a test case file is specified, retrieve tests from that file.
625    Otherwise, glob for all json files in subdirectories and load from
626    each one.
627    Also, if requested, filter by category, and add tests matching
628    certain ids.
629    """
630    import fnmatch
631
632    flist = []
633    testdirs = ['tc-tests']
634
635    if args.file:
636        # at least one file was specified - remove the default directory
637        testdirs = []
638
639        for ff in args.file:
640            if not os.path.isfile(ff):
641                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
642            else:
643                flist.append(os.path.abspath(ff))
644
645    if args.directory:
646        testdirs = args.directory
647
648    for testdir in testdirs:
649        for root, dirnames, filenames in os.walk(testdir):
650            for filename in fnmatch.filter(filenames, '*.json'):
651                candidate = os.path.abspath(os.path.join(root, filename))
652                if candidate not in testdirs:
653                    flist.append(candidate)
654
655    alltestcases = list()
656    for casefile in flist:
657        alltestcases = alltestcases + (load_from_file(casefile))
658
659    allcatlist = get_test_categories(alltestcases)
660    allidlist = get_id_list(alltestcases)
661
662    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
663    idtestcases = filter_tests_by_id(args, alltestcases)
664    cattestcases = filter_tests_by_category(args, alltestcases)
665
666    cat_ids = [x['id'] for x in cattestcases]
667    if args.execute:
668        if args.category:
669            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
670        else:
671            alltestcases = idtestcases
672    else:
673        if cat_ids:
674            alltestcases = cattestcases
675        else:
676            # just accept the existing value of alltestcases,
677            # which has been filtered by file/directory
678            pass
679
680    return allcatlist, allidlist, testcases_by_cats, alltestcases
681
682
683def set_operation_mode(pm, parser, args, remaining):
684    """
685    Load the test case data and process remaining arguments to determine
686    what the script should do for this run, and call the appropriate
687    function.
688    """
689    ucat, idlist, testcases, alltests = get_test_cases(args)
690
691    if args.gen_id:
692        if (has_blank_ids(idlist)):
693            alltests = generate_case_ids(alltests)
694        else:
695            print("No empty ID fields found in test files.")
696        exit(0)
697
698    duplicate_ids = check_case_id(alltests)
699    if (len(duplicate_ids) > 0):
700        print("The following test case IDs are not unique:")
701        print(str(set(duplicate_ids)))
702        print("Please correct them before continuing.")
703        exit(1)
704
705    if args.showID:
706        for atest in alltests:
707            print_test_case(atest)
708        exit(0)
709
710    if isinstance(args.category, list) and (len(args.category) == 0):
711        print("Available categories:")
712        print_sll(ucat)
713        exit(0)
714
715    if args.list:
716        if args.list:
717            list_test_cases(alltests)
718            exit(0)
719
720    if len(alltests):
721        req_plugins = pm.get_required_plugins(alltests)
722        try:
723            args = pm.load_required_plugins(req_plugins, parser, args, remaining)
724        except PluginDependencyException as pde:
725            print('The following plugins were not found:')
726            print('{}'.format(pde.missing_pg))
727        catresults = test_runner(pm, args, alltests)
728        if args.format == 'none':
729            print('Test results output suppression requested\n')
730        else:
731            print('\nAll test results: \n')
732            if args.format == 'xunit':
733                suffix = 'xml'
734                res = catresults.format_xunit()
735            elif args.format == 'tap':
736                suffix = 'tap'
737                res = catresults.format_tap()
738            print(res)
739            print('\n\n')
740            if not args.outfile:
741                fname = 'test-results.{}'.format(suffix)
742            else:
743                fname = args.outfile
744            with open(fname, 'w') as fh:
745                fh.write(res)
746                fh.close()
747                if os.getenv('SUDO_UID') is not None:
748                    os.chown(fname, uid=int(os.getenv('SUDO_UID')),
749                        gid=int(os.getenv('SUDO_GID')))
750    else:
751        print('No tests found\n')
 
 
 
 
752
753def main():
754    """
755    Start of execution; set up argument parser and get the arguments,
756    and start operations.
757    """
758    parser = args_parse()
759    parser = set_args(parser)
760    pm = PluginMgr(parser)
761    parser = pm.call_add_args(parser)
762    (args, remaining) = parser.parse_known_args()
763    args.NAMES = NAMES
764    pm.set_args(args)
765    check_default_settings(args, remaining, pm)
766    if args.verbose > 2:
767        print('args is {}'.format(args))
768
769    set_operation_mode(pm, parser, args, remaining)
770
771    exit(0)
772
773
774if __name__ == "__main__":
775    main()
v4.17
  1#!/usr/bin/env python3
  2# SPDX-License-Identifier: GPL-2.0
  3
  4"""
  5tdc.py - Linux tc (Traffic Control) unit test driver
  6
  7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
  8"""
  9
 10import re
 11import os
 12import sys
 13import argparse
 14import importlib
 15import json
 16import subprocess
 17import time
 18import traceback
 19from collections import OrderedDict
 20from string import Template
 21
 22from tdc_config import *
 23from tdc_helper import *
 24
 25import TdcPlugin
 
 26
 
 
 
 27
 28class PluginMgrTestFail(Exception):
 29    def __init__(self, stage, output, message):
 30        self.stage = stage
 31        self.output = output
 32        self.message = message
 33
 34class PluginMgr:
 35    def __init__(self, argparser):
 36        super().__init__()
 37        self.plugins = {}
 38        self.plugin_instances = []
 39        self.args = []
 40        self.argparser = argparser
 41
 42        # TODO, put plugins in order
 43        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
 44        for dirpath, dirnames, filenames in os.walk(plugindir):
 45            for fn in filenames:
 46                if (fn.endswith('.py') and
 47                    not fn == '__init__.py' and
 48                    not fn.startswith('#') and
 49                    not fn.startswith('.#')):
 50                    mn = fn[0:-3]
 51                    foo = importlib.import_module('plugins.' + mn)
 52                    self.plugins[mn] = foo
 53                    self.plugin_instances.append(foo.SubPlugin())
 54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 55    def call_pre_suite(self, testcount, testidlist):
 56        for pgn_inst in self.plugin_instances:
 57            pgn_inst.pre_suite(testcount, testidlist)
 58
 59    def call_post_suite(self, index):
 60        for pgn_inst in reversed(self.plugin_instances):
 61            pgn_inst.post_suite(index)
 62
 63    def call_pre_case(self, test_ordinal, testid):
 64        for pgn_inst in self.plugin_instances:
 65            try:
 66                pgn_inst.pre_case(test_ordinal, testid)
 67            except Exception as ee:
 68                print('exception {} in call to pre_case for {} plugin'.
 69                      format(ee, pgn_inst.__class__))
 70                print('test_ordinal is {}'.format(test_ordinal))
 71                print('testid is {}'.format(testid))
 72                raise
 73
 74    def call_post_case(self):
 75        for pgn_inst in reversed(self.plugin_instances):
 76            pgn_inst.post_case()
 77
 78    def call_pre_execute(self):
 79        for pgn_inst in self.plugin_instances:
 80            pgn_inst.pre_execute()
 81
 82    def call_post_execute(self):
 83        for pgn_inst in reversed(self.plugin_instances):
 84            pgn_inst.post_execute()
 85
 86    def call_add_args(self, parser):
 87        for pgn_inst in self.plugin_instances:
 88            parser = pgn_inst.add_args(parser)
 89        return parser
 90
 91    def call_check_args(self, args, remaining):
 92        for pgn_inst in self.plugin_instances:
 93            pgn_inst.check_args(args, remaining)
 94
 95    def call_adjust_command(self, stage, command):
 96        for pgn_inst in self.plugin_instances:
 97            command = pgn_inst.adjust_command(stage, command)
 98        return command
 99
 
 
 
100    @staticmethod
101    def _make_argparser(args):
102        self.argparser = argparse.ArgumentParser(
103            description='Linux TC unit tests')
104
105
106def replace_keywords(cmd):
107    """
108    For a given executable command, substitute any known
109    variables contained within NAMES with the correct values
110    """
111    tcmd = Template(cmd)
112    subcmd = tcmd.safe_substitute(NAMES)
113    return subcmd
114
115
116def exec_cmd(args, pm, stage, command):
117    """
118    Perform any required modifications on an executable command, then run
119    it in a subprocess and return the results.
120    """
121    if len(command.strip()) == 0:
122        return None, None
123    if '$' in command:
124        command = replace_keywords(command)
125
126    command = pm.call_adjust_command(stage, command)
127    if args.verbose > 0:
128        print('command "{}"'.format(command))
129    proc = subprocess.Popen(command,
130        shell=True,
131        stdout=subprocess.PIPE,
132        stderr=subprocess.PIPE,
133        env=ENVIR)
134    (rawout, serr) = proc.communicate()
135
136    if proc.returncode != 0 and len(serr) > 0:
137        foutput = serr.decode("utf-8")
138    else:
139        foutput = rawout.decode("utf-8")
 
 
 
 
 
140
141    proc.stdout.close()
142    proc.stderr.close()
143    return proc, foutput
144
145
146def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
147    """
148    Execute the setup/teardown commands for a test case.
149    Optionally terminate test execution if the command fails.
150    """
151    if args.verbose > 0:
152        print('{}'.format(prefix))
153    for cmdinfo in cmdlist:
154        if isinstance(cmdinfo, list):
155            exit_codes = cmdinfo[1:]
156            cmd = cmdinfo[0]
157        else:
158            exit_codes = [0]
159            cmd = cmdinfo
160
161        if not cmd:
162            continue
163
164        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
165
166        if proc and (proc.returncode not in exit_codes):
167            print('', file=sys.stderr)
168            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
169                  file=sys.stderr)
170            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
171                  file=sys.stderr)
 
 
172            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
173            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
174            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
175            raise PluginMgrTestFail(
176                stage, output,
177                '"{}" did not complete successfully'.format(prefix))
178
179def run_one_test(pm, args, index, tidx):
180    global NAMES
181    result = True
182    tresult = ""
183    tap = ""
 
184    if args.verbose > 0:
185        print("\t====================\n=====> ", end="")
186    print("Test " + tidx["id"] + ": " + tidx["name"])
187
 
 
 
 
 
 
 
 
 
188    # populate NAMES with TESTID for this test
189    NAMES['TESTID'] = tidx['id']
190
191    pm.call_pre_case(index, tidx['id'])
192    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
193
194    if (args.verbose > 0):
195        print('-----> execute stage')
196    pm.call_pre_execute()
197    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
198    exit_code = p.returncode
 
 
 
 
199    pm.call_post_execute()
200
201    if (exit_code != int(tidx["expExitCode"])):
202        result = False
203        print("exit:", exit_code, int(tidx["expExitCode"]))
 
 
 
204        print(procout)
205    else:
206        if args.verbose > 0:
207            print('-----> verify stage')
208        match_pattern = re.compile(
209            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
210        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
211        if procout:
212            match_index = re.findall(match_pattern, procout)
213            if len(match_index) != int(tidx["matchCount"]):
214                result = False
 
 
 
215        elif int(tidx["matchCount"]) != 0:
216            result = False
217
218    if not result:
219        tresult += 'not '
220    tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
221    tap += tresult
222
223    if result == False:
224        if procout:
225            tap += procout
226        else:
227            tap += 'No output!\n'
228
229    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
230    pm.call_post_case()
231
232    index += 1
233
234    # remove TESTID from NAMES
235    del(NAMES['TESTID'])
236    return tap
237
238def test_runner(pm, args, filtered_tests):
239    """
240    Driver function for the unit tests.
241
242    Prints information about the tests being run, executes the setup and
243    teardown commands and the command under test itself. Also determines
244    success/failure based on the information in the test case and generates
245    TAP output accordingly.
246    """
247    testlist = filtered_tests
248    tcount = len(testlist)
249    index = 1
250    tap = ''
251    badtest = None
252    stage = None
253    emergency_exit = False
254    emergency_exit_message = ''
255
256    if args.notap:
257        if args.verbose:
258            tap = 'notap requested:  omitting test plan\n'
259    else:
260        tap = str(index) + ".." + str(tcount) + "\n"
261    try:
262        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
263    except Exception as ee:
264        ex_type, ex, ex_tb = sys.exc_info()
265        print('Exception {} {} (caught in pre_suite).'.
266              format(ex_type, ex))
267        # when the extra print statements are uncommented,
268        # the traceback does not appear between them
269        # (it appears way earlier in the tdc.py output)
270        # so don't bother ...
271        # print('--------------------(')
272        # print('traceback')
273        traceback.print_tb(ex_tb)
274        # print('--------------------)')
275        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
276        emergency_exit = True
277        stage = 'pre-SUITE'
278
279    if emergency_exit:
280        pm.call_post_suite(index)
281        return emergency_exit_message
282    if args.verbose > 1:
283        print('give test rig 2 seconds to stabilize')
284    time.sleep(2)
285    for tidx in testlist:
286        if "flower" in tidx["category"] and args.device == None:
 
 
 
287            if args.verbose > 1:
288                print('Not executing test {} {} because DEV2 not defined'.
289                      format(tidx['id'], tidx['name']))
 
 
 
290            continue
291        try:
292            badtest = tidx  # in case it goes bad
293            tap += run_one_test(pm, args, index, tidx)
 
294        except PluginMgrTestFail as pmtf:
295            ex_type, ex, ex_tb = sys.exc_info()
296            stage = pmtf.stage
297            message = pmtf.message
298            output = pmtf.output
 
 
 
 
 
 
299            print(message)
300            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
301                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
302            print('---------------')
303            print('traceback')
304            traceback.print_tb(ex_tb)
305            print('---------------')
306            if stage == 'teardown':
307                print('accumulated output for this test:')
308                if pmtf.output:
309                    print(pmtf.output)
310            print('---------------')
311            break
312        index += 1
313
314    # if we failed in setup or teardown,
315    # fill in the remaining tests with ok-skipped
316    count = index
317    if not args.notap:
318        tap += 'about to flush the tap output if tests need to be skipped\n'
319        if tcount + 1 != index:
320            for tidx in testlist[index - 1:]:
321                msg = 'skipped - previous {} failed'.format(stage)
322                tap += 'ok {} - {} # {} {} {}\n'.format(
323                    count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
324                count += 1
325
326        tap += 'done flushing skipped test tap output\n'
 
 
 
 
 
 
 
 
327
328    if args.pause:
329        print('Want to pause\nPress enter to continue ...')
330        if input(sys.stdin):
331            print('got something on stdin')
332
333    pm.call_post_suite(index)
334
335    return tap
336
337def has_blank_ids(idlist):
338    """
339    Search the list for empty ID fields and return true/false accordingly.
340    """
341    return not(all(k for k in idlist))
342
343
344def load_from_file(filename):
345    """
346    Open the JSON file containing the test cases and return them
347    as list of ordered dictionary objects.
348    """
349    try:
350        with open(filename) as test_data:
351            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
352    except json.JSONDecodeError as jde:
353        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
354        testlist = list()
355    else:
356        idlist = get_id_list(testlist)
357        if (has_blank_ids(idlist)):
358            for k in testlist:
359                k['filename'] = filename
360    return testlist
361
362
363def args_parse():
364    """
365    Create the argument parser.
366    """
367    parser = argparse.ArgumentParser(description='Linux TC unit tests')
368    return parser
369
370
371def set_args(parser):
372    """
373    Set the command line arguments for tdc.
374    """
375    parser.add_argument(
 
 
 
 
376        '-p', '--path', type=str,
377        help='The full path to the tc executable to use')
378    sg = parser.add_argument_group(
379        'selection', 'select which test cases: ' +
380        'files plus directories; filtered by categories plus testids')
381    ag = parser.add_argument_group(
382        'action', 'select action to perform on selected test cases')
383
384    sg.add_argument(
385        '-D', '--directory', nargs='+', metavar='DIR',
386        help='Collect tests from the specified directory(ies) ' +
387        '(default [tc-tests])')
388    sg.add_argument(
389        '-f', '--file', nargs='+', metavar='FILE',
390        help='Run tests from the specified file(s)')
391    sg.add_argument(
392        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
393        help='Run tests only from the specified category/ies, ' +
394        'or if no category/ies is/are specified, list known categories.')
395    sg.add_argument(
396        '-e', '--execute', nargs='+', metavar='ID',
397        help='Execute the specified test cases with specified IDs')
398    ag.add_argument(
399        '-l', '--list', action='store_true',
400        help='List all test cases, or those only within the specified category')
401    ag.add_argument(
402        '-s', '--show', action='store_true', dest='showID',
403        help='Display the selected test cases')
404    ag.add_argument(
405        '-i', '--id', action='store_true', dest='gen_id',
406        help='Generate ID numbers for new test cases')
407    parser.add_argument(
408        '-v', '--verbose', action='count', default=0,
409        help='Show the commands that are being run')
410    parser.add_argument(
411        '-N', '--notap', action='store_true',
412        help='Suppress tap results for command under test')
 
413    parser.add_argument('-d', '--device',
414                        help='Execute the test case in flower category')
 
 
415    parser.add_argument(
416        '-P', '--pause', action='store_true',
417        help='Pause execution just before post-suite stage')
418    return parser
419
420
421def check_default_settings(args, remaining, pm):
422    """
423    Process any arguments overriding the default settings,
424    and ensure the settings are correct.
425    """
426    # Allow for overriding specific settings
427    global NAMES
428
429    if args.path != None:
430        NAMES['TC'] = args.path
431    if args.device != None:
432        NAMES['DEV2'] = args.device
 
 
433    if not os.path.isfile(NAMES['TC']):
434        print("The specified tc path " + NAMES['TC'] + " does not exist.")
435        exit(1)
436
437    pm.call_check_args(args, remaining)
438
439
440def get_id_list(alltests):
441    """
442    Generate a list of all IDs in the test cases.
443    """
444    return [x["id"] for x in alltests]
445
446
447def check_case_id(alltests):
448    """
449    Check for duplicate test case IDs.
450    """
451    idl = get_id_list(alltests)
452    return [x for x in idl if idl.count(x) > 1]
453
454
455def does_id_exist(alltests, newid):
456    """
457    Check if a given ID already exists in the list of test cases.
458    """
459    idl = get_id_list(alltests)
460    return (any(newid == x for x in idl))
461
462
463def generate_case_ids(alltests):
464    """
465    If a test case has a blank ID field, generate a random hex ID for it
466    and then write the test cases back to disk.
467    """
468    import random
469    for c in alltests:
470        if (c["id"] == ""):
471            while True:
472                newid = str('{:04x}'.format(random.randrange(16**4)))
473                if (does_id_exist(alltests, newid)):
474                    continue
475                else:
476                    c['id'] = newid
477                    break
478
479    ufilename = []
480    for c in alltests:
481        if ('filename' in c):
482            ufilename.append(c['filename'])
483    ufilename = get_unique_item(ufilename)
484    for f in ufilename:
485        testlist = []
486        for t in alltests:
487            if 'filename' in t:
488                if t['filename'] == f:
489                    del t['filename']
490                    testlist.append(t)
491        outfile = open(f, "w")
492        json.dump(testlist, outfile, indent=4)
493        outfile.write("\n")
494        outfile.close()
495
496def filter_tests_by_id(args, testlist):
497    '''
498    Remove tests from testlist that are not in the named id list.
499    If id list is empty, return empty list.
500    '''
501    newlist = list()
502    if testlist and args.execute:
503        target_ids = args.execute
504
505        if isinstance(target_ids, list) and (len(target_ids) > 0):
506            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
507    return newlist
508
509def filter_tests_by_category(args, testlist):
510    '''
511    Remove tests from testlist that are not in a named category.
512    '''
513    answer = list()
514    if args.category and testlist:
515        test_ids = list()
516        for catg in set(args.category):
517            if catg == '+c':
518                continue
519            print('considering category {}'.format(catg))
520            for tc in testlist:
521                if catg in tc['category'] and tc['id'] not in test_ids:
522                    answer.append(tc)
523                    test_ids.append(tc['id'])
524
525    return answer
526
 
527def get_test_cases(args):
528    """
529    If a test case file is specified, retrieve tests from that file.
530    Otherwise, glob for all json files in subdirectories and load from
531    each one.
532    Also, if requested, filter by category, and add tests matching
533    certain ids.
534    """
535    import fnmatch
536
537    flist = []
538    testdirs = ['tc-tests']
539
540    if args.file:
541        # at least one file was specified - remove the default directory
542        testdirs = []
543
544        for ff in args.file:
545            if not os.path.isfile(ff):
546                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
547            else:
548                flist.append(os.path.abspath(ff))
549
550    if args.directory:
551        testdirs = args.directory
552
553    for testdir in testdirs:
554        for root, dirnames, filenames in os.walk(testdir):
555            for filename in fnmatch.filter(filenames, '*.json'):
556                candidate = os.path.abspath(os.path.join(root, filename))
557                if candidate not in testdirs:
558                    flist.append(candidate)
559
560    alltestcases = list()
561    for casefile in flist:
562        alltestcases = alltestcases + (load_from_file(casefile))
563
564    allcatlist = get_test_categories(alltestcases)
565    allidlist = get_id_list(alltestcases)
566
567    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
568    idtestcases = filter_tests_by_id(args, alltestcases)
569    cattestcases = filter_tests_by_category(args, alltestcases)
570
571    cat_ids = [x['id'] for x in cattestcases]
572    if args.execute:
573        if args.category:
574            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
575        else:
576            alltestcases = idtestcases
577    else:
578        if cat_ids:
579            alltestcases = cattestcases
580        else:
581            # just accept the existing value of alltestcases,
582            # which has been filtered by file/directory
583            pass
584
585    return allcatlist, allidlist, testcases_by_cats, alltestcases
586
587
588def set_operation_mode(pm, args):
589    """
590    Load the test case data and process remaining arguments to determine
591    what the script should do for this run, and call the appropriate
592    function.
593    """
594    ucat, idlist, testcases, alltests = get_test_cases(args)
595
596    if args.gen_id:
597        if (has_blank_ids(idlist)):
598            alltests = generate_case_ids(alltests)
599        else:
600            print("No empty ID fields found in test files.")
601        exit(0)
602
603    duplicate_ids = check_case_id(alltests)
604    if (len(duplicate_ids) > 0):
605        print("The following test case IDs are not unique:")
606        print(str(set(duplicate_ids)))
607        print("Please correct them before continuing.")
608        exit(1)
609
610    if args.showID:
611        for atest in alltests:
612            print_test_case(atest)
613        exit(0)
614
615    if isinstance(args.category, list) and (len(args.category) == 0):
616        print("Available categories:")
617        print_sll(ucat)
618        exit(0)
619
620    if args.list:
621        if args.list:
622            list_test_cases(alltests)
623            exit(0)
624
625    if len(alltests):
 
 
 
 
 
 
626        catresults = test_runner(pm, args, alltests)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
627    else:
628        catresults = 'No tests found\n'
629    if args.notap:
630        print('Tap output suppression requested\n')
631    else:
632        print('All test results: \n\n{}'.format(catresults))
633
634def main():
635    """
636    Start of execution; set up argument parser and get the arguments,
637    and start operations.
638    """
639    parser = args_parse()
640    parser = set_args(parser)
641    pm = PluginMgr(parser)
642    parser = pm.call_add_args(parser)
643    (args, remaining) = parser.parse_known_args()
644    args.NAMES = NAMES
 
645    check_default_settings(args, remaining, pm)
646    if args.verbose > 2:
647        print('args is {}'.format(args))
648
649    set_operation_mode(pm, args)
650
651    exit(0)
652
653
654if __name__ == "__main__":
655    main()