Linux Audio

Check our new training course

Loading...
v6.8
  1#SPDX-License-Identifier: GPL-2.0
  2import re
  3import csv
  4import json
  5import argparse
  6from pathlib import Path
  7import subprocess
  8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
  9class Validator:
 10    def __init__(self, rulefname, reportfname='', t=5, debug=False, datafname='', fullrulefname='', workload='true', metrics=''):
 11        self.rulefname = rulefname
 12        self.reportfname = reportfname
 13        self.rules = None
 14        self.collectlist:str = metrics
 15        self.metrics = self.__set_metrics(metrics)
 16        self.skiplist = set()
 17        self.tolerance = t
 18
 19        self.workloads = [x for x in workload.split(",") if x]
 20        self.wlidx = 0 # idx of current workloads
 21        self.allresults = dict() # metric results of all workload
 22        self.allignoremetrics = dict() # metrics with no results or negative results
 23        self.allfailtests = dict()
 24        self.alltotalcnt = dict()
 25        self.allpassedcnt = dict()
 26        self.allerrlist = dict()
 27
 28        self.results = dict() # metric results of current workload
 29        # vars for test pass/failure statistics
 30        self.ignoremetrics= set() # metrics with no results or negative results, neg result counts as a failed test
 31        self.failtests = dict()
 32        self.totalcnt = 0
 33        self.passedcnt = 0
 34        # vars for errors
 35        self.errlist = list()
 36
 37        # vars for Rule Generator
 38        self.pctgmetrics = set() # Percentage rule
 39
 40        # vars for debug
 41        self.datafname = datafname
 42        self.debug = debug
 43        self.fullrulefname = fullrulefname
 44
 45    def __set_metrics(self, metrics=''):
 46        if metrics != '':
 47            return set(metrics.split(","))
 48        else:
 49            return set()
 50
 51    def read_json(self, filename: str) -> dict:
 52        try:
 53            with open(Path(filename).resolve(), "r") as f:
 54                data = json.loads(f.read())
 55        except OSError as e:
 56            print(f"Error when reading file {e}")
 57            sys.exit()
 58
 59        return data
 60
 61    def json_dump(self, data, output_file):
 62        parent = Path(output_file).parent
 63        if not parent.exists():
 64            parent.mkdir(parents=True)
 65
 66        with open(output_file, "w+") as output_file:
 67            json.dump(data,
 68                      output_file,
 69                      ensure_ascii=True,
 70                      indent=4)
 71
 72    def get_results(self, idx:int = 0):
 73        return self.results[idx]
 74
 75    def get_bounds(self, lb, ub, error, alias={}, ridx:int = 0) -> list:
 76        """
 77        Get bounds and tolerance from lb, ub, and error.
 78        If missing lb, use 0.0; missing ub, use float('inf); missing error, use self.tolerance.
 79
 80        @param lb: str/float, lower bound
 81        @param ub: str/float, upper bound
 82        @param error: float/str, error tolerance
 83        @returns: lower bound, return inf if the lower bound is a metric value and is not collected
 84                  upper bound, return -1 if the upper bound is a metric value and is not collected
 85                  tolerance, denormalized base on upper bound value
 86        """
 87        # init ubv and lbv to invalid values
 88        def get_bound_value (bound, initval, ridx):
 89            val = initval
 90            if isinstance(bound, int) or isinstance(bound, float):
 91                val = bound
 92            elif isinstance(bound, str):
 93                if bound == '':
 94                    val = float("inf")
 95                elif bound in alias:
 96                    vall = self.get_value(alias[ub], ridx)
 97                    if vall:
 98                        val = vall[0]
 99                elif bound.replace('.', '1').isdigit():
100                    val = float(bound)
101                else:
102                    print("Wrong bound: {0}".format(bound))
103            else:
104                print("Wrong bound: {0}".format(bound))
105            return val
106
107        ubv = get_bound_value(ub, -1, ridx)
108        lbv = get_bound_value(lb, float('inf'), ridx)
109        t = get_bound_value(error, self.tolerance, ridx)
110
111        # denormalize error threshold
112        denormerr = t * ubv / 100 if ubv != 100 and ubv > 0 else t
113
114        return lbv, ubv, denormerr
115
116    def get_value(self, name:str, ridx:int = 0) -> list:
117        """
118        Get value of the metric from self.results.
119        If result of this metric is not provided, the metric name will be added into self.ignoremetics and self.errlist.
120        All future test(s) on this metric will fail.
121
122        @param name: name of the metric
123        @returns: list with value found in self.results; list is empty when value is not found.
124        """
125        results = []
126        data = self.results[ridx] if ridx in self.results else self.results[0]
127        if name not in self.ignoremetrics:
128            if name in data:
129                results.append(data[name])
130            elif name.replace('.', '1').isdigit():
131                results.append(float(name))
132            else:
133                self.ignoremetrics.add(name)
134        return results
135
136    def check_bound(self, val, lb, ub, err):
137        return True if val <= ub + err and val >= lb - err else False
138
139    # Positive Value Sanity check
140    def pos_val_test(self):
141        """
142        Check if metrics value are non-negative.
143        One metric is counted as one test.
144        Failure: when metric value is negative or not provided.
145        Metrics with negative value will be added into the self.failtests['PositiveValueTest'] and self.ignoremetrics.
146        """
147        negmetric = dict()
148        pcnt = 0
149        tcnt = 0
150        rerun = list()
151        for name, val in self.get_results().items():
 
 
 
152            if val < 0:
153                negmetric[name] = val
154                rerun.append(name)
155            else:
156                pcnt += 1
157            tcnt += 1
 
 
 
158        if len(rerun) > 0 and len(rerun) < 20:
159            second_results = dict()
160            self.second_test(rerun, second_results)
161            for name, val in second_results.items():
162                if name not in negmetric: continue
 
163                if val >= 0:
164                    del negmetric[name]
165                    pcnt += 1
166
167        self.failtests['PositiveValueTest']['Total Tests'] = tcnt
168        self.failtests['PositiveValueTest']['Passed Tests'] = pcnt
169        if len(negmetric.keys()):
170            self.ignoremetrics.update(negmetric.keys())
171            negmessage = ["{0}(={1:.4f})".format(name, val) for name, val in negmetric.items()]
172            self.failtests['PositiveValueTest']['Failed Tests'].append({'NegativeValue': negmessage})
173
174        return
175
176    def evaluate_formula(self, formula:str, alias:dict, ridx:int = 0):
177        """
178        Evaluate the value of formula.
179
180        @param formula: the formula to be evaluated
181        @param alias: the dict has alias to metric name mapping
182        @returns: value of the formula is success; -1 if the one or more metric value not provided
183        """
184        stack = []
185        b = 0
186        errs = []
187        sign = "+"
188        f = str()
189
190        #TODO: support parenthesis?
191        for i in range(len(formula)):
192            if i+1 == len(formula) or formula[i] in ('+', '-', '*', '/'):
193                s = alias[formula[b:i]] if i+1 < len(formula) else alias[formula[b:]]
 
194                v = self.get_value(s, ridx)
195                if not v:
196                    errs.append(s)
197                else:
198                    f = f + "{0}(={1:.4f})".format(s, v[0])
199                    if sign == "*":
200                        stack[-1] = stack[-1] * v
201                    elif sign == "/":
202                        stack[-1] = stack[-1] / v
203                    elif sign == '-':
204                        stack.append(-v[0])
205                    else:
206                        stack.append(v[0])
207                if i + 1 < len(formula):
208                    sign = formula[i]
209                    f += sign
210                    b = i + 1
211
212        if len(errs) > 0:
213            return -1, "Metric value missing: "+','.join(errs)
214
215        val = sum(stack)
216        return val, f
217
218    # Relationships Tests
219    def relationship_test(self, rule: dict):
220        """
221        Validate if the metrics follow the required relationship in the rule.
222        eg. lower_bound <= eval(formula)<= upper_bound
223        One rule is counted as ont test.
224        Failure: when one or more metric result(s) not provided, or when formula evaluated outside of upper/lower bounds.
225
226        @param rule: dict with metric name(+alias), formula, and required upper and lower bounds.
227        """
228        alias = dict()
229        for m in rule['Metrics']:
230            alias[m['Alias']] = m['Name']
231        lbv, ubv, t = self.get_bounds(rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'], alias, ridx=rule['RuleIndex'])
232        val, f = self.evaluate_formula(rule['Formula'], alias, ridx=rule['RuleIndex'])
 
 
 
 
 
 
 
 
 
 
 
 
233        if val == -1:
234            self.failtests['RelationshipTest']['Failed Tests'].append({'RuleIndex': rule['RuleIndex'], 'Description':f})
 
235        elif not self.check_bound(val, lbv, ubv, t):
236            lb = rule['RangeLower']
237            ub = rule['RangeUpper']
238            if isinstance(lb, str):
239                if lb in alias:
240                    lb = alias[lb]
241            if isinstance(ub, str):
242                if ub in alias:
243                    ub = alias[ub]
244            self.failtests['RelationshipTest']['Failed Tests'].append({'RuleIndex': rule['RuleIndex'], 'Formula':f,
245                                                                       'RangeLower': lb, 'LowerBoundValue': self.get_value(lb),
246                                                                       'RangeUpper': ub, 'UpperBoundValue':self.get_value(ub),
247                                                                       'ErrorThreshold': t, 'CollectedValue': val})
248        else:
249            self.passedcnt += 1
250            self.failtests['RelationshipTest']['Passed Tests'] += 1
251        self.totalcnt += 1
252        self.failtests['RelationshipTest']['Total Tests'] += 1
253
254        return
255
256
257    # Single Metric Test
258    def single_test(self, rule:dict):
259        """
260        Validate if the metrics are in the required value range.
261        eg. lower_bound <= metrics_value <= upper_bound
262        One metric is counted as one test in this type of test.
263        One rule may include one or more metrics.
264        Failure: when the metric value not provided or the value is outside the bounds.
265        This test updates self.total_cnt and records failed tests in self.failtest['SingleMetricTest'].
266
267        @param rule: dict with metrics to validate and the value range requirement
268        """
269        lbv, ubv, t = self.get_bounds(rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'])
 
270        metrics = rule['Metrics']
271        passcnt = 0
272        totalcnt = 0
273        faillist = list()
274        failures = dict()
275        rerun = list()
276        for m in metrics:
277            totalcnt += 1
278            result = self.get_value(m['Name'])
279            if len(result) > 0 and self.check_bound(result[0], lbv, ubv, t) or m['Name'] in self.skiplist:
280                passcnt += 1
281            else:
282                failures[m['Name']] = result
283                rerun.append(m['Name'])
284
285        if len(rerun) > 0 and len(rerun) < 20:
286            second_results = dict()
287            self.second_test(rerun, second_results)
288            for name, val in second_results.items():
289                if name not in failures: continue
 
290                if self.check_bound(val, lbv, ubv, t):
291                    passcnt += 1
292                    del failures[name]
293                else:
294                    failures[name] = val
295                    self.results[0][name] = val
296
297        self.totalcnt += totalcnt
298        self.passedcnt += passcnt
299        self.failtests['SingleMetricTest']['Total Tests'] += totalcnt
300        self.failtests['SingleMetricTest']['Passed Tests'] += passcnt
301        if len(failures.keys()) != 0:
302            faillist = [{'MetricName':name, 'CollectedValue':val} for name, val in failures.items()]
303            self.failtests['SingleMetricTest']['Failed Tests'].append({'RuleIndex':rule['RuleIndex'],
304                                                                       'RangeLower': rule['RangeLower'],
305                                                                       'RangeUpper': rule['RangeUpper'],
306                                                                       'ErrorThreshold':rule['ErrorThreshold'],
307                                                                       'Failure':faillist})
308
309        return
310
311    def create_report(self):
312        """
313        Create final report and write into a JSON file.
314        """
315        alldata = list()
316        for i in range(0, len(self.workloads)):
317            reportstas = {"Total Rule Count": self.alltotalcnt[i], "Passed Rule Count": self.allpassedcnt[i]}
318            data = {"Metric Validation Statistics": reportstas, "Tests in Category": self.allfailtests[i],
319                    "Errors":self.allerrlist[i]}
320            alldata.append({"Workload": self.workloads[i], "Report": data})
321
322        json_str = json.dumps(alldata, indent=4)
323        print("Test validation finished. Final report: ")
324        print(json_str)
325
326        if self.debug:
327            allres = [{"Workload": self.workloads[i], "Results": self.allresults[i]} for i in range(0, len(self.workloads))]
 
328            self.json_dump(allres, self.datafname)
329
330    def check_rule(self, testtype, metric_list):
331        """
332        Check if the rule uses metric(s) that not exist in current platform.
333
334        @param metric_list: list of metrics from the rule.
335        @return: False when find one metric out in Metric file. (This rule should not skipped.)
336                 True when all metrics used in the rule are found in Metric file.
337        """
338        if testtype == "RelationshipTest":
339            for m in metric_list:
340                if m['Name'] not in self.metrics:
341                    return False
342        return True
343
344    # Start of Collector and Converter
345    def convert(self, data: list, metricvalues:dict):
346        """
347        Convert collected metric data from the -j output to dict of {metric_name:value}.
348        """
349        for json_string in data:
350            try:
351                result =json.loads(json_string)
352                if "metric-unit" in result and result["metric-unit"] != "(null)" and result["metric-unit"] != "":
353                    name = result["metric-unit"].split("  ")[1] if len(result["metric-unit"].split("  ")) > 1 \
354                        else result["metric-unit"]
355                    metricvalues[name.lower()] = float(result["metric-value"])
356            except ValueError as error:
357                continue
358        return
359
360    def _run_perf(self, metric, workload: str):
361        tool = 'perf'
362        command = [tool, 'stat', '-j', '-M', f"{metric}", "-a"]
363        wl = workload.split()
364        command.extend(wl)
365        print(" ".join(command))
366        cmd = subprocess.run(command, stderr=subprocess.PIPE, encoding='utf-8')
367        data = [x+'}' for x in cmd.stderr.split('}\n') if x]
 
 
368        return data
369
370
371    def collect_perf(self, workload: str):
372        """
373        Collect metric data with "perf stat -M" on given workload with -a and -j.
374        """
375        self.results = dict()
376        print(f"Starting perf collection")
377        print(f"Long workload: {workload}")
378        collectlist = dict()
379        if self.collectlist != "":
380            collectlist[0] = {x for x in self.collectlist.split(",")}
381        else:
382            collectlist[0] = set(list(self.metrics))
383        # Create metric set for relationship rules
384        for rule in self.rules:
385            if rule["TestType"] == "RelationshipTest":
386                metrics = [m["Name"] for m in rule["Metrics"]]
387                if not any(m not in collectlist[0] for m in metrics):
388                    collectlist[rule["RuleIndex"]] = [",".join(list(set(metrics)))]
 
389
390        for idx, metrics in collectlist.items():
391            if idx == 0: wl = "true"
392            else: wl = workload
 
 
393            for metric in metrics:
394                data = self._run_perf(metric, wl)
395                if idx not in self.results: self.results[idx] = dict()
 
396                self.convert(data, self.results[idx])
397        return
398
399    def second_test(self, collectlist, second_results):
400        workload = self.workloads[self.wlidx]
401        for metric in collectlist:
402            data = self._run_perf(metric, workload)
403            self.convert(data, second_results)
404
405    # End of Collector and Converter
406
407    # Start of Rule Generator
408    def parse_perf_metrics(self):
409        """
410        Read and parse perf metric file:
411        1) find metrics with '1%' or '100%' as ScaleUnit for Percent check
412        2) create metric name list
413        """
414        command = ['perf', 'list', '-j', '--details', 'metrics']
415        cmd = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
 
416        try:
417            data = json.loads(cmd.stdout)
418            for m in data:
419                if 'MetricName' not in m:
420                    print("Warning: no metric name")
421                    continue
422                name = m['MetricName'].lower()
423                self.metrics.add(name)
424                if 'ScaleUnit' in m and (m['ScaleUnit'] == '1%' or m['ScaleUnit'] == '100%'):
425                    self.pctgmetrics.add(name.lower())
426        except ValueError as error:
427            print(f"Error when parsing metric data")
428            sys.exit()
429
430        return
431
432    def remove_unsupported_rules(self, rules):
433        new_rules = []
434        for rule in rules:
435            add_rule = True
436            for m in rule["Metrics"]:
437                if m["Name"] in self.skiplist or m["Name"] not in self.metrics:
438                    add_rule = False
439                    break
440            if add_rule:
441                new_rules.append(rule)
442        return new_rules
443
444    def create_rules(self):
445        """
446        Create full rules which includes:
447        1) All the rules from the "relationshi_rules" file
448        2) SingleMetric rule for all the 'percent' metrics
449
450        Reindex all the rules to avoid repeated RuleIndex
451        """
452        data = self.read_json(self.rulefname)
453        rules = data['RelationshipRules']
454        self.skiplist = set([name.lower() for name in data['SkipList']])
455        self.rules = self.remove_unsupported_rules(rules)
456        pctgrule = {'RuleIndex':0,
457                    'TestType':'SingleMetricTest',
458                    'RangeLower':'0',
459                    'RangeUpper': '100',
460                    'ErrorThreshold': self.tolerance,
461                    'Description':'Metrics in percent unit have value with in [0, 100]',
462                    'Metrics': [{'Name': m.lower()} for m in self.pctgmetrics]}
463        self.rules.append(pctgrule)
464
465        # Re-index all rules to avoid repeated RuleIndex
466        idx = 1
467        for r in self.rules:
468            r['RuleIndex'] = idx
469            idx += 1
470
471        if self.debug:
472            #TODO: need to test and generate file name correctly
473            data = {'RelationshipRules':self.rules, 'SupportedMetrics': [{"MetricName": name} for name in self.metrics]}
 
474            self.json_dump(data, self.fullrulefname)
475
476        return
477    # End of Rule Generator
478
479    def _storewldata(self, key):
480        '''
481        Store all the data of one workload into the corresponding data structure for all workloads.
482        @param key: key to the dictionaries (index of self.workloads).
483        '''
484        self.allresults[key] = self.results
485        self.allignoremetrics[key] = self.ignoremetrics
486        self.allfailtests[key] = self.failtests
487        self.alltotalcnt[key] = self.totalcnt
488        self.allpassedcnt[key] = self.passedcnt
489        self.allerrlist[key] = self.errlist
490
491    #Initialize data structures before data validation of each workload
492    def _init_data(self):
493
494        testtypes = ['PositiveValueTest', 'RelationshipTest', 'SingleMetricTest']
 
495        self.results = dict()
496        self.ignoremetrics= set()
497        self.errlist = list()
498        self.failtests = {k:{'Total Tests':0, 'Passed Tests':0, 'Failed Tests':[]} for k in testtypes}
499        self.totalcnt = 0
500        self.passedcnt = 0
501
502    def test(self):
503        '''
504        The real entry point of the test framework.
505        This function loads the validation rule JSON file and Standard Metric file to create rules for
506        testing and namemap dictionaries.
507        It also reads in result JSON file for testing.
508
509        In the test process, it passes through each rule and launch correct test function bases on the
510        'TestType' field of the rule.
511
512        The final report is written into a JSON file.
513        '''
514        if not self.collectlist:
515            self.parse_perf_metrics()
 
 
 
516        self.create_rules()
517        for i in range(0, len(self.workloads)):
518            self.wlidx = i
519            self._init_data()
520            self.collect_perf(self.workloads[i])
521            # Run positive value test
522            self.pos_val_test()
523            for r in self.rules:
524                # skip rules that uses metrics not exist in this platform
525                testtype = r['TestType']
526                if not self.check_rule(testtype, r['Metrics']):
527                    continue
528                if  testtype == 'RelationshipTest':
529                    self.relationship_test(r)
530                elif testtype == 'SingleMetricTest':
531                    self.single_test(r)
532                else:
533                    print("Unsupported Test Type: ", testtype)
534                    self.errlist.append("Unsupported Test Type from rule: " + r['RuleIndex'])
535            self._storewldata(i)
536            print("Workload: ", self.workloads[i])
537            print("Total metrics collected: ", self.failtests['PositiveValueTest']['Total Tests'])
538            print("Non-negative metric count: ", self.failtests['PositiveValueTest']['Passed Tests'])
539            print("Total Test Count: ", self.totalcnt)
540            print("Passed Test Count: ", self.passedcnt)
541
542        self.create_report()
543        return sum(self.alltotalcnt.values()) != sum(self.allpassedcnt.values())
544# End of Class Validator
545
546
547def main() -> None:
548    parser = argparse.ArgumentParser(description="Launch metric value validation")
 
549
550    parser.add_argument("-rule", help="Base validation rule file", required=True)
551    parser.add_argument("-output_dir", help="Path for validator output file, report file", required=True)
552    parser.add_argument("-debug", help="Debug run, save intermediate data to files", action="store_true", default=False)
553    parser.add_argument("-wl", help="Workload to run while data collection", default="true")
 
 
 
 
554    parser.add_argument("-m", help="Metric list to validate", default="")
555    args = parser.parse_args()
556    outpath = Path(args.output_dir)
557    reportf = Path.joinpath(outpath, 'perf_report.json')
558    fullrule = Path.joinpath(outpath, 'full_rule.json')
559    datafile = Path.joinpath(outpath, 'perf_data.json')
560
561    validator = Validator(args.rule, reportf, debug=args.debug,
562                        datafname=datafile, fullrulefname=fullrule, workload=args.wl,
563                        metrics=args.m)
564    ret = validator.test()
565
566    return ret
567
568
569if __name__ == "__main__":
570    import sys
571    sys.exit(main())
572
573
574
v6.13.7
  1# SPDX-License-Identifier: GPL-2.0
  2import re
  3import csv
  4import json
  5import argparse
  6from pathlib import Path
  7import subprocess
  8
  9
 10class TestError:
 11    def __init__(self, metric: list[str], wl: str, value: list[float], low: float, up=float('nan'), description=str()):
 12        self.metric: list = metric  # multiple metrics in relationship type tests
 13        self.workloads = [wl]  # multiple workloads possible
 14        self.collectedValue: list = value
 15        self.valueLowBound = low
 16        self.valueUpBound = up
 17        self.description = description
 18
 19    def __repr__(self) -> str:
 20        if len(self.metric) > 1:
 21            return "\nMetric Relationship Error: \tThe collected value of metric {0}\n\
 22                \tis {1} in workload(s): {2} \n\
 23                \tbut expected value range is [{3}, {4}]\n\
 24                \tRelationship rule description: \'{5}\'".format(self.metric, self.collectedValue, self.workloads,
 25                                                                 self.valueLowBound, self.valueUpBound, self.description)
 26        elif len(self.collectedValue) == 0:
 27            return "\nNo Metric Value Error: \tMetric {0} returns with no value \n\
 28                    \tworkload(s): {1}".format(self.metric, self.workloads)
 29        else:
 30            return "\nWrong Metric Value Error: \tThe collected value of metric {0}\n\
 31                    \tis {1} in workload(s): {2}\n\
 32                    \tbut expected value range is [{3}, {4}]"\
 33                        .format(self.metric, self.collectedValue, self.workloads,
 34                                self.valueLowBound, self.valueUpBound)
 35
 36
 37class Validator:
 38    def __init__(self, rulefname, reportfname='', t=5, debug=False, datafname='', fullrulefname='', workload='true', metrics=''):
 39        self.rulefname = rulefname
 40        self.reportfname = reportfname
 41        self.rules = None
 42        self.collectlist: str = metrics
 43        self.metrics = self.__set_metrics(metrics)
 44        self.skiplist = set()
 45        self.tolerance = t
 46
 47        self.workloads = [x for x in workload.split(",") if x]
 48        self.wlidx = 0  # idx of current workloads
 49        self.allresults = dict()  # metric results of all workload
 
 
 50        self.alltotalcnt = dict()
 51        self.allpassedcnt = dict()
 
 52
 53        self.results = dict()  # metric results of current workload
 54        # vars for test pass/failure statistics
 55        # metrics with no results or negative results, neg result counts failed tests
 56        self.ignoremetrics = set()
 57        self.totalcnt = 0
 58        self.passedcnt = 0
 59        # vars for errors
 60        self.errlist = list()
 61
 62        # vars for Rule Generator
 63        self.pctgmetrics = set()  # Percentage rule
 64
 65        # vars for debug
 66        self.datafname = datafname
 67        self.debug = debug
 68        self.fullrulefname = fullrulefname
 69
 70    def __set_metrics(self, metrics=''):
 71        if metrics != '':
 72            return set(metrics.split(","))
 73        else:
 74            return set()
 75
 76    def read_json(self, filename: str) -> dict:
 77        try:
 78            with open(Path(filename).resolve(), "r") as f:
 79                data = json.loads(f.read())
 80        except OSError as e:
 81            print(f"Error when reading file {e}")
 82            sys.exit()
 83
 84        return data
 85
 86    def json_dump(self, data, output_file):
 87        parent = Path(output_file).parent
 88        if not parent.exists():
 89            parent.mkdir(parents=True)
 90
 91        with open(output_file, "w+") as output_file:
 92            json.dump(data,
 93                      output_file,
 94                      ensure_ascii=True,
 95                      indent=4)
 96
 97    def get_results(self, idx: int = 0):
 98        return self.results.get(idx)
 99
100    def get_bounds(self, lb, ub, error, alias={}, ridx: int = 0) -> list:
101        """
102        Get bounds and tolerance from lb, ub, and error.
103        If missing lb, use 0.0; missing ub, use float('inf); missing error, use self.tolerance.
104
105        @param lb: str/float, lower bound
106        @param ub: str/float, upper bound
107        @param error: float/str, error tolerance
108        @returns: lower bound, return inf if the lower bound is a metric value and is not collected
109                  upper bound, return -1 if the upper bound is a metric value and is not collected
110                  tolerance, denormalized base on upper bound value
111        """
112        # init ubv and lbv to invalid values
113        def get_bound_value(bound, initval, ridx):
114            val = initval
115            if isinstance(bound, int) or isinstance(bound, float):
116                val = bound
117            elif isinstance(bound, str):
118                if bound == '':
119                    val = float("inf")
120                elif bound in alias:
121                    vall = self.get_value(alias[ub], ridx)
122                    if vall:
123                        val = vall[0]
124                elif bound.replace('.', '1').isdigit():
125                    val = float(bound)
126                else:
127                    print("Wrong bound: {0}".format(bound))
128            else:
129                print("Wrong bound: {0}".format(bound))
130            return val
131
132        ubv = get_bound_value(ub, -1, ridx)
133        lbv = get_bound_value(lb, float('inf'), ridx)
134        t = get_bound_value(error, self.tolerance, ridx)
135
136        # denormalize error threshold
137        denormerr = t * ubv / 100 if ubv != 100 and ubv > 0 else t
138
139        return lbv, ubv, denormerr
140
141    def get_value(self, name: str, ridx: int = 0) -> list:
142        """
143        Get value of the metric from self.results.
144        If result of this metric is not provided, the metric name will be added into self.ignoremetics.
145        All future test(s) on this metric will fail.
146
147        @param name: name of the metric
148        @returns: list with value found in self.results; list is empty when value is not found.
149        """
150        results = []
151        data = self.results[ridx] if ridx in self.results else self.results[0]
152        if name not in self.ignoremetrics:
153            if name in data:
154                results.append(data[name])
155            elif name.replace('.', '1').isdigit():
156                results.append(float(name))
157            else:
158                self.ignoremetrics.add(name)
159        return results
160
161    def check_bound(self, val, lb, ub, err):
162        return True if val <= ub + err and val >= lb - err else False
163
164    # Positive Value Sanity check
165    def pos_val_test(self):
166        """
167        Check if metrics value are non-negative.
168        One metric is counted as one test.
169        Failure: when metric value is negative or not provided.
170        Metrics with negative value will be added into self.ignoremetrics.
171        """
172        negmetric = dict()
173        pcnt = 0
174        tcnt = 0
175        rerun = list()
176        results = self.get_results()
177        if not results:
178            return
179        for name, val in results.items():
180            if val < 0:
181                negmetric[name] = val
182                rerun.append(name)
183            else:
184                pcnt += 1
185            tcnt += 1
186        # The first round collect_perf() run these metrics with simple workload
187        # "true". We give metrics a second chance with a longer workload if less
188        # than 20 metrics failed positive test.
189        if len(rerun) > 0 and len(rerun) < 20:
190            second_results = dict()
191            self.second_test(rerun, second_results)
192            for name, val in second_results.items():
193                if name not in negmetric:
194                    continue
195                if val >= 0:
196                    del negmetric[name]
197                    pcnt += 1
198
 
 
199        if len(negmetric.keys()):
200            self.ignoremetrics.update(negmetric.keys())
201            self.errlist.extend(
202                [TestError([m], self.workloads[self.wlidx], negmetric[m], 0) for m in negmetric.keys()])
203
204        return
205
206    def evaluate_formula(self, formula: str, alias: dict, ridx: int = 0):
207        """
208        Evaluate the value of formula.
209
210        @param formula: the formula to be evaluated
211        @param alias: the dict has alias to metric name mapping
212        @returns: value of the formula is success; -1 if the one or more metric value not provided
213        """
214        stack = []
215        b = 0
216        errs = []
217        sign = "+"
218        f = str()
219
220        # TODO: support parenthesis?
221        for i in range(len(formula)):
222            if i+1 == len(formula) or formula[i] in ('+', '-', '*', '/'):
223                s = alias[formula[b:i]] if i + \
224                    1 < len(formula) else alias[formula[b:]]
225                v = self.get_value(s, ridx)
226                if not v:
227                    errs.append(s)
228                else:
229                    f = f + "{0}(={1:.4f})".format(s, v[0])
230                    if sign == "*":
231                        stack[-1] = stack[-1] * v
232                    elif sign == "/":
233                        stack[-1] = stack[-1] / v
234                    elif sign == '-':
235                        stack.append(-v[0])
236                    else:
237                        stack.append(v[0])
238                if i + 1 < len(formula):
239                    sign = formula[i]
240                    f += sign
241                    b = i + 1
242
243        if len(errs) > 0:
244            return -1, "Metric value missing: "+','.join(errs)
245
246        val = sum(stack)
247        return val, f
248
249    # Relationships Tests
250    def relationship_test(self, rule: dict):
251        """
252        Validate if the metrics follow the required relationship in the rule.
253        eg. lower_bound <= eval(formula)<= upper_bound
254        One rule is counted as ont test.
255        Failure: when one or more metric result(s) not provided, or when formula evaluated outside of upper/lower bounds.
256
257        @param rule: dict with metric name(+alias), formula, and required upper and lower bounds.
258        """
259        alias = dict()
260        for m in rule['Metrics']:
261            alias[m['Alias']] = m['Name']
262        lbv, ubv, t = self.get_bounds(
263            rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'], alias, ridx=rule['RuleIndex'])
264        val, f = self.evaluate_formula(
265            rule['Formula'], alias, ridx=rule['RuleIndex'])
266
267        lb = rule['RangeLower']
268        ub = rule['RangeUpper']
269        if isinstance(lb, str):
270            if lb in alias:
271                lb = alias[lb]
272        if isinstance(ub, str):
273            if ub in alias:
274                ub = alias[ub]
275
276        if val == -1:
277            self.errlist.append(TestError([m['Name'] for m in rule['Metrics']], self.workloads[self.wlidx], [],
278                                lb, ub, rule['Description']))
279        elif not self.check_bound(val, lbv, ubv, t):
280            self.errlist.append(TestError([m['Name'] for m in rule['Metrics']], self.workloads[self.wlidx], [val],
281                                lb, ub, rule['Description']))
 
 
 
 
 
 
 
 
 
 
282        else:
283            self.passedcnt += 1
 
284        self.totalcnt += 1
 
285
286        return
287
 
288    # Single Metric Test
289    def single_test(self, rule: dict):
290        """
291        Validate if the metrics are in the required value range.
292        eg. lower_bound <= metrics_value <= upper_bound
293        One metric is counted as one test in this type of test.
294        One rule may include one or more metrics.
295        Failure: when the metric value not provided or the value is outside the bounds.
296        This test updates self.total_cnt.
297
298        @param rule: dict with metrics to validate and the value range requirement
299        """
300        lbv, ubv, t = self.get_bounds(
301            rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'])
302        metrics = rule['Metrics']
303        passcnt = 0
304        totalcnt = 0
 
305        failures = dict()
306        rerun = list()
307        for m in metrics:
308            totalcnt += 1
309            result = self.get_value(m['Name'])
310            if len(result) > 0 and self.check_bound(result[0], lbv, ubv, t) or m['Name'] in self.skiplist:
311                passcnt += 1
312            else:
313                failures[m['Name']] = result
314                rerun.append(m['Name'])
315
316        if len(rerun) > 0 and len(rerun) < 20:
317            second_results = dict()
318            self.second_test(rerun, second_results)
319            for name, val in second_results.items():
320                if name not in failures:
321                    continue
322                if self.check_bound(val, lbv, ubv, t):
323                    passcnt += 1
324                    del failures[name]
325                else:
326                    failures[name] = [val]
327                    self.results[0][name] = val
328
329        self.totalcnt += totalcnt
330        self.passedcnt += passcnt
 
 
331        if len(failures.keys()) != 0:
332            self.errlist.extend([TestError([name], self.workloads[self.wlidx], val,
333                                rule['RangeLower'], rule['RangeUpper']) for name, val in failures.items()])
 
 
 
 
334
335        return
336
337    def create_report(self):
338        """
339        Create final report and write into a JSON file.
340        """
341        print(self.errlist)
 
 
 
 
 
 
 
 
 
342
343        if self.debug:
344            allres = [{"Workload": self.workloads[i], "Results": self.allresults[i]}
345                      for i in range(0, len(self.workloads))]
346            self.json_dump(allres, self.datafname)
347
348    def check_rule(self, testtype, metric_list):
349        """
350        Check if the rule uses metric(s) that not exist in current platform.
351
352        @param metric_list: list of metrics from the rule.
353        @return: False when find one metric out in Metric file. (This rule should not skipped.)
354                 True when all metrics used in the rule are found in Metric file.
355        """
356        if testtype == "RelationshipTest":
357            for m in metric_list:
358                if m['Name'] not in self.metrics:
359                    return False
360        return True
361
362    # Start of Collector and Converter
363    def convert(self, data: list, metricvalues: dict):
364        """
365        Convert collected metric data from the -j output to dict of {metric_name:value}.
366        """
367        for json_string in data:
368            try:
369                result = json.loads(json_string)
370                if "metric-unit" in result and result["metric-unit"] != "(null)" and result["metric-unit"] != "":
371                    name = result["metric-unit"].split("  ")[1] if len(result["metric-unit"].split("  ")) > 1 \
372                        else result["metric-unit"]
373                    metricvalues[name.lower()] = float(result["metric-value"])
374            except ValueError as error:
375                continue
376        return
377
378    def _run_perf(self, metric, workload: str):
379        tool = 'perf'
380        command = [tool, 'stat', '-j', '-M', f"{metric}", "-a"]
381        wl = workload.split()
382        command.extend(wl)
383        print(" ".join(command))
384        cmd = subprocess.run(command, stderr=subprocess.PIPE, encoding='utf-8')
385        data = [x+'}' for x in cmd.stderr.split('}\n') if x]
386        if data[0][0] != '{':
387            data[0] = data[0][data[0].find('{'):]
388        return data
389
 
390    def collect_perf(self, workload: str):
391        """
392        Collect metric data with "perf stat -M" on given workload with -a and -j.
393        """
394        self.results = dict()
395        print(f"Starting perf collection")
396        print(f"Long workload: {workload}")
397        collectlist = dict()
398        if self.collectlist != "":
399            collectlist[0] = {x for x in self.collectlist.split(",")}
400        else:
401            collectlist[0] = set(list(self.metrics))
402        # Create metric set for relationship rules
403        for rule in self.rules:
404            if rule["TestType"] == "RelationshipTest":
405                metrics = [m["Name"] for m in rule["Metrics"]]
406                if not any(m not in collectlist[0] for m in metrics):
407                    collectlist[rule["RuleIndex"]] = [
408                        ",".join(list(set(metrics)))]
409
410        for idx, metrics in collectlist.items():
411            if idx == 0:
412                wl = "true"
413            else:
414                wl = workload
415            for metric in metrics:
416                data = self._run_perf(metric, wl)
417                if idx not in self.results:
418                    self.results[idx] = dict()
419                self.convert(data, self.results[idx])
420        return
421
422    def second_test(self, collectlist, second_results):
423        workload = self.workloads[self.wlidx]
424        for metric in collectlist:
425            data = self._run_perf(metric, workload)
426            self.convert(data, second_results)
427
428    # End of Collector and Converter
429
430    # Start of Rule Generator
431    def parse_perf_metrics(self):
432        """
433        Read and parse perf metric file:
434        1) find metrics with '1%' or '100%' as ScaleUnit for Percent check
435        2) create metric name list
436        """
437        command = ['perf', 'list', '-j', '--details', 'metrics']
438        cmd = subprocess.run(command, stdout=subprocess.PIPE,
439                             stderr=subprocess.PIPE, encoding='utf-8')
440        try:
441            data = json.loads(cmd.stdout)
442            for m in data:
443                if 'MetricName' not in m:
444                    print("Warning: no metric name")
445                    continue
446                name = m['MetricName'].lower()
447                self.metrics.add(name)
448                if 'ScaleUnit' in m and (m['ScaleUnit'] == '1%' or m['ScaleUnit'] == '100%'):
449                    self.pctgmetrics.add(name.lower())
450        except ValueError as error:
451            print(f"Error when parsing metric data")
452            sys.exit()
453
454        return
455
456    def remove_unsupported_rules(self, rules):
457        new_rules = []
458        for rule in rules:
459            add_rule = True
460            for m in rule["Metrics"]:
461                if m["Name"] in self.skiplist or m["Name"] not in self.metrics:
462                    add_rule = False
463                    break
464            if add_rule:
465                new_rules.append(rule)
466        return new_rules
467
468    def create_rules(self):
469        """
470        Create full rules which includes:
471        1) All the rules from the "relationshi_rules" file
472        2) SingleMetric rule for all the 'percent' metrics
473
474        Reindex all the rules to avoid repeated RuleIndex
475        """
476        data = self.read_json(self.rulefname)
477        rules = data['RelationshipRules']
478        self.skiplist = set([name.lower() for name in data['SkipList']])
479        self.rules = self.remove_unsupported_rules(rules)
480        pctgrule = {'RuleIndex': 0,
481                    'TestType': 'SingleMetricTest',
482                    'RangeLower': '0',
483                    'RangeUpper': '100',
484                    'ErrorThreshold': self.tolerance,
485                    'Description': 'Metrics in percent unit have value with in [0, 100]',
486                    'Metrics': [{'Name': m.lower()} for m in self.pctgmetrics]}
487        self.rules.append(pctgrule)
488
489        # Re-index all rules to avoid repeated RuleIndex
490        idx = 1
491        for r in self.rules:
492            r['RuleIndex'] = idx
493            idx += 1
494
495        if self.debug:
496            # TODO: need to test and generate file name correctly
497            data = {'RelationshipRules': self.rules, 'SupportedMetrics': [
498                {"MetricName": name} for name in self.metrics]}
499            self.json_dump(data, self.fullrulefname)
500
501        return
502    # End of Rule Generator
503
504    def _storewldata(self, key):
505        '''
506        Store all the data of one workload into the corresponding data structure for all workloads.
507        @param key: key to the dictionaries (index of self.workloads).
508        '''
509        self.allresults[key] = self.results
 
 
510        self.alltotalcnt[key] = self.totalcnt
511        self.allpassedcnt[key] = self.passedcnt
 
512
513    # Initialize data structures before data validation of each workload
514    def _init_data(self):
515
516        testtypes = ['PositiveValueTest',
517                     'RelationshipTest', 'SingleMetricTest']
518        self.results = dict()
519        self.ignoremetrics = set()
520        self.errlist = list()
 
521        self.totalcnt = 0
522        self.passedcnt = 0
523
524    def test(self):
525        '''
526        The real entry point of the test framework.
527        This function loads the validation rule JSON file and Standard Metric file to create rules for
528        testing and namemap dictionaries.
529        It also reads in result JSON file for testing.
530
531        In the test process, it passes through each rule and launch correct test function bases on the
532        'TestType' field of the rule.
533
534        The final report is written into a JSON file.
535        '''
536        if not self.collectlist:
537            self.parse_perf_metrics()
538        if not self.metrics:
539            print("No metric found for testing")
540            return 0
541        self.create_rules()
542        for i in range(0, len(self.workloads)):
543            self.wlidx = i
544            self._init_data()
545            self.collect_perf(self.workloads[i])
546            # Run positive value test
547            self.pos_val_test()
548            for r in self.rules:
549                # skip rules that uses metrics not exist in this platform
550                testtype = r['TestType']
551                if not self.check_rule(testtype, r['Metrics']):
552                    continue
553                if testtype == 'RelationshipTest':
554                    self.relationship_test(r)
555                elif testtype == 'SingleMetricTest':
556                    self.single_test(r)
557                else:
558                    print("Unsupported Test Type: ", testtype)
 
 
559            print("Workload: ", self.workloads[i])
 
 
560            print("Total Test Count: ", self.totalcnt)
561            print("Passed Test Count: ", self.passedcnt)
562            self._storewldata(i)
563        self.create_report()
564        return len(self.errlist) > 0
565# End of Class Validator
566
567
568def main() -> None:
569    parser = argparse.ArgumentParser(
570        description="Launch metric value validation")
571
572    parser.add_argument(
573        "-rule", help="Base validation rule file", required=True)
574    parser.add_argument(
575        "-output_dir", help="Path for validator output file, report file", required=True)
576    parser.add_argument("-debug", help="Debug run, save intermediate data to files",
577                        action="store_true", default=False)
578    parser.add_argument(
579        "-wl", help="Workload to run while data collection", default="true")
580    parser.add_argument("-m", help="Metric list to validate", default="")
581    args = parser.parse_args()
582    outpath = Path(args.output_dir)
583    reportf = Path.joinpath(outpath, 'perf_report.json')
584    fullrule = Path.joinpath(outpath, 'full_rule.json')
585    datafile = Path.joinpath(outpath, 'perf_data.json')
586
587    validator = Validator(args.rule, reportf, debug=args.debug,
588                          datafname=datafile, fullrulefname=fullrule, workload=args.wl,
589                          metrics=args.m)
590    ret = validator.test()
591
592    return ret
593
594
595if __name__ == "__main__":
596    import sys
597    sys.exit(main())