diff --git a/README.md b/README.md index 38b7660..e3d7537 100644 --- a/README.md +++ b/README.md @@ -3,12 +3,24 @@ Convert your dbt test results into jUnit XML format so that CI/CD platforms (such as Jenkins, CircleCI, etc.) can better report on tests in their UI. +## About this fork + +This is the fork repository based on https://github.com/chasleslr/dbt-junitxml/ version 0.1.5 +On top of that here were added: +1. Support of DBT Core 1.3+ (originally it supported only up to 1.2). Versions 0.2.x Tested on DBT 1.5 +2. In case of test failures Junit XML contains additional information regarding Stored Results and original test SQL. Details can be found below. +3. Test name in the resulted xml is more specific rather than in original version . +4. Supported integration with https://reportportal.io/ + ## Installation +Publishing as a regular pip module is considered + ```shell -pip install dbt-junitxml +pip install "git+https://github.com/SOVALINUX/dbt-junitxml@0.2.1#egg=dbt-junitxml" ``` +We recommend you to stick to some specific version, since newer versions might contain changes that may impact your operations (not being backward incompatible at all, but rather change some visualizations you might be used to). ## Usage @@ -19,6 +31,54 @@ to parse your run results and output a jUnit XML formatted report named `report. dbt-junitxml parse target/run_results.json report.xml ``` +## Features description + +### Rich XML output in case of test failure + +In order to help you handle test failures right where you see it we're adding supporting information into Junit XML in case of test failure +It's even more than you see in the DBT CLI console output! +For example: + +``` +Got 19 results, configured to fail if != 0 +2023-06-08 10:47:02 +------------------------------------------------------------------------------------------------ +select * from db_dbt_test__audit.not_null_table_reporter_employee_id +------------------------------------------------------------------------------------------------ + +select * +from (select * from "datacatalog"."db"."table" where NOT regexp_like(reporter_email_address, 'auto_.*?@company.com') AND reporter_email_address NOT IN ('exclude@company.com') AND reporter_email_address IS NOT NULL) dbt_subquery +where reporter_employee_id is null +``` + +### Saving test SQL files for further analysis + +Sometimes it's handy to see the exact SQL that was executed and tested by DBT without repeating compilation steps. +To achieve it we suggest you to save compiled tests SQL during your test run. +Below you can find a reference script: +```shell +dbt test --store-failures +mkdir -p target/compiled_all_sql && find target/compiled/ -name *.sql -print0 | xargs -0 cp -t target/compiled_all_sql/ +zip -r -q compiled_all_sql.zip target/compiled_all_sql +``` + +### Integration with Report Portal + +https://reportportal.io/ helps you to manage your test launches. Here at EPAM we're using this tool to manage over 4,000 DBT tests + +In order to upload your test run to reportportal you can use the following script: +```shell +dbt-junitxml parse target/run_results.json target/manifest.json dbt_test_report.xml +zip dbt_test_report.zip dbt_test_report.xml +REPORT_PORTAL_TOKEN=`Your token for Report Portal` +RESPONSE=`curl -X POST "https://reportportal.io/api/v1/epm-plxd/launch/import" -H "accept: */*" -H "Content-Type: multipart/form-data" -H "Authorization: bearer ${REPORT_PORTAL_TOKEN}" -F "file=@dbt_test_report.zip;type=application/x-zip-compressed"` +LAUNCH_ID=`echo "${RESPONSE}" | sed 's/.*Launch with id = \(.*\) is successfully imported.*/\1/'` +``` + ## Limitations Currently, only v4 of the [Run Results](https://docs.getdbt.com/reference/artifacts/run-results-json) specifications is supported. + +## Contribution + +Development of this fork was partially sponsored by EPAM Systems Inc. https://www.epam.com/ diff --git a/pyproject.toml b/pyproject.toml index d5274dc..d59125d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,8 @@ [tool.poetry] name = "dbt-junitxml" -version = "0.0.0" -description = "" -authors = ["Charles Lariviere "] +version = "0.2.1" +description = "Utility to convert DBT test results into Junit XML format" +authors = ["Charles Lariviere ", "Siarhei Nekhviadovich ", "Aliaksandra Sidarenka "] readme = "README.md" license = "MIT" repository = "https://github.com/chasleslr/dbt-junitxml" diff --git a/src/dbt_junitxml/__init__.py b/src/dbt_junitxml/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/dbt_junitxml/dbt_junit_xml.py b/src/dbt_junitxml/dbt_junit_xml.py new file mode 100644 index 0000000..90f96fb --- /dev/null +++ b/src/dbt_junitxml/dbt_junit_xml.py @@ -0,0 +1,229 @@ +from junit_xml import TestSuite, TestCase, decode +import xml.etree.ElementTree as ET + + +class DBTTestCase(TestCase): + """A JUnit test case with a result and possibly some stdout or stderr""" + + def __init__( + self, + name, + classname=None, + elapsed_sec=None, + stdout=None, + stderr=None, + assertions=None, + timestamp=None, + status=None, + category=None, + file=None, + line=None, + log=None, + url=None, + allow_multiple_subelements=False, + ): + self.name = name + self.assertions = assertions + self.elapsed_sec = elapsed_sec + self.timestamp = timestamp + self.classname = classname + self.status = status + self.category = category + self.file = file + self.line = line + self.log = log + self.url = url + self.stdout = stdout + self.stderr = stderr + + self.is_enabled = True + self.errors = [] + self.failures = [] + self.skipped = [] + self.allow_multiple_subalements = allow_multiple_subelements + + +class DBTTestSuite(TestSuite): + def __init__(self, + name, + test_cases=None, + hostname=None, + id=None, + package=None, + timestamp=None, + properties=None, + file=None, + log=None, + url=None, + stdout=None, + stderr=None, + time=None): + super(DBTTestSuite, self).__init__(name, + test_cases=None, + hostname=None, + id=None, + package=None, + timestamp=None, + properties=None, + file=None, + log=None, + url=None, + stdout=None, + stderr=None) + self.name = name + if not test_cases: + test_cases = [] + try: + iter(test_cases) + except TypeError: + raise TypeError("test_cases must be a list of test cases") + self.test_cases = test_cases + self.timestamp = timestamp + self.hostname = hostname + self.id = id + self.package = package + self.file = file + self.log = log + self.url = url + self.stdout = stdout + self.stderr = stderr + self.properties = properties + self.time = time + + def build_xml_doc(self, encoding=None): + super(DBTTestSuite, self).build_xml_doc(encoding=None) + """ + Builds the XML document for the JUnit test suite. + Produces clean unicode strings and decodes non-unicode with the help of encoding. + @param encoding: Used to decode encoded strings. + @return: XML document with unicode string elements + """ + + # build the test suite element + test_suite_attributes = dict() + if any(c.assertions for c in self.test_cases): + test_suite_attributes["assertions"] = str( + sum([int(c.assertions) for c in self.test_cases if c.assertions])) + test_suite_attributes["disabled"] = str( + len([c for c in self.test_cases if not c.is_enabled])) + test_suite_attributes["errors"] = str(len([c for c in self.test_cases if c.is_error()])) + test_suite_attributes["failures"] = str(len([c for c in self.test_cases if c.is_failure()])) + test_suite_attributes["name"] = decode(self.name, encoding) + test_suite_attributes["skipped"] = str(len([c for c in self.test_cases if c.is_skipped()])) + test_suite_attributes["tests"] = str(len(self.test_cases)) + test_suite_attributes["time"] = str( + sum(c.elapsed_sec for c in self.test_cases if c.elapsed_sec)) + + if self.hostname: + test_suite_attributes["hostname"] = decode(self.hostname, encoding) + if self.id: + test_suite_attributes["id"] = decode(self.id, encoding) + if self.package: + test_suite_attributes["package"] = decode(self.package, encoding) + if self.timestamp: + test_suite_attributes["timestamp"] = decode(self.timestamp, encoding) + if self.file: + test_suite_attributes["file"] = decode(self.file, encoding) + if self.log: + test_suite_attributes["log"] = decode(self.log, encoding) + if self.url: + test_suite_attributes["url"] = decode(self.url, encoding) + if self.time: + test_suite_attributes["time"] = decode(self.time, encoding) + + xml_element = ET.Element("testsuite", test_suite_attributes) + + # add any properties + if self.properties: + props_element = ET.SubElement(xml_element, "properties") + for k, v in self.properties.items(): + attrs = {"name": decode(k, encoding), "value": decode(v, encoding)} + ET.SubElement(props_element, "property", attrs) + + # add test suite stdout + if self.stdout: + stdout_element = ET.SubElement(xml_element, "system-out") + stdout_element.text = decode(self.stdout, encoding) + + # add test suite stderr + if self.stderr: + stderr_element = ET.SubElement(xml_element, "system-err") + stderr_element.text = decode(self.stderr, encoding) + + # test cases + for case in self.test_cases: + test_case_attributes = dict() + test_case_attributes["name"] = decode(case.name, encoding) + if case.assertions: + # Number of assertions in the test case + test_case_attributes["assertions"] = "%d" % case.assertions + if case.elapsed_sec: + test_case_attributes["time"] = "%f" % case.elapsed_sec + if case.timestamp: + test_case_attributes["timestamp"] = decode(case.timestamp, encoding) + if case.classname: + test_case_attributes["classname"] = decode(case.classname, encoding) + if case.status: + test_case_attributes["status"] = decode(case.status, encoding) + if case.category: + test_case_attributes["class"] = decode(case.category, encoding) + if case.file: + test_case_attributes["file"] = decode(case.file, encoding) + if case.line: + test_case_attributes["line"] = decode(case.line, encoding) + if case.log: + test_case_attributes["log"] = decode(case.log, encoding) + if case.url: + test_case_attributes["url"] = decode(case.url, encoding) + + test_case_element = ET.SubElement(xml_element, "testcase", test_case_attributes) + + # failures + for failure in case.failures: + if failure["output"] or failure["message"]: + attrs = {"type": "failure"} + if failure["message"]: + attrs["message"] = decode(failure["message"], encoding) + if failure["type"]: + attrs["type"] = decode(failure["type"], encoding) + failure_element = ET.Element("failure", attrs) + if failure["output"]: + failure_element.text = decode(failure["output"], encoding) + test_case_element.append(failure_element) + + # errors + for error in case.errors: + if error["message"] or error["output"]: + attrs = {"type": "error"} + if error["message"]: + attrs["message"] = decode(error["message"], encoding) + if error["type"]: + attrs["type"] = decode(error["type"], encoding) + error_element = ET.Element("error", attrs) + if error["output"]: + error_element.text = decode(error["output"], encoding) + test_case_element.append(error_element) + + # skipped + for skipped in case.skipped: + attrs = {"type": "skipped"} + if skipped["message"]: + attrs["message"] = decode(skipped["message"], encoding) + skipped_element = ET.Element("skipped", attrs) + if skipped["output"]: + skipped_element.text = decode(skipped["output"], encoding) + test_case_element.append(skipped_element) + + # test stdout + if case.stdout: + stdout_element = ET.Element("system-out") + stdout_element.text = decode(case.stdout, encoding) + test_case_element.append(stdout_element) + + # test stderr + if case.stderr: + stderr_element = ET.Element("system-err") + stderr_element.text = decode(case.stderr, encoding) + test_case_element.append(stderr_element) + + return xml_element diff --git a/src/dbt_junitxml/main.py b/src/dbt_junitxml/main.py index 2d602bd..060a22c 100644 --- a/src/dbt_junitxml/main.py +++ b/src/dbt_junitxml/main.py @@ -1,13 +1,21 @@ import click import json -from junit_xml import TestCase, TestSuite, to_xml_report_string +from junit_xml import to_xml_report_string +from dbt_junitxml.dbt_junit_xml import DBTTestSuite, DBTTestCase +from datetime import datetime +import os class InvalidRunResult(Exception): pass +def convert_timestamp_to_isoformat(timestamp: str) -> str: + return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ').strftime( + '%Y-%m-%dT%H:%M:%S') + + @click.group() def cli(): pass @@ -18,50 +26,94 @@ def cli(): "run_result", type=click.Path(exists=True) ) +@click.argument( + "manifest", + type=click.Path(exists=True) +) @click.argument( "output", type=click.Path(exists=False) ) -def parse(run_result, output): +def parse(run_result, manifest, output): with open(run_result) as f: run_result = json.load(f) + with open(manifest) as m: + manifest = json.load(m)['nodes'] + try: - rpc_method = run_result["args"]["rpc_method"] + executed_command = run_result["args"]["which"] if 'which' in run_result["args"].keys() else run_result["args"]["rpc_method"] schema_version = run_result["metadata"]["dbt_schema_version"] - if not schema_version == "https://schemas.getdbt.com/dbt/run-results/v4.json": - raise InvalidRunResult("run_result.json other than v4 are not supported.") + if schema_version not in [ + "https://schemas.getdbt.com/dbt/run-results/v4.json", + "https://schemas.getdbt.com/dbt/run-results/v5.json", + "https://schemas.getdbt.com/dbt/run-results/v6.json", + ]: + raise InvalidRunResult("run_result.json other than (v4-v6) are not supported.") - if not rpc_method == "test": - raise InvalidRunResult(f"run_result.json must be from the output of `dbt test`. Got dbt {rpc_method}.") + if not executed_command == "test": + raise InvalidRunResult( + f"run_result.json must be from the output of `dbt test`. Got dbt {executed_command}.") except KeyError as e: raise InvalidRunResult(e) tests = run_result["results"] + total_elapsed_time = run_result["elapsed_time"] + test_suite_timestamp = convert_timestamp_to_isoformat(run_result["metadata"]["generated_at"]) + + tests_manifest = {} + for key, config in manifest.items(): + if config['resource_type'] == 'test': + test_name = key.split('.')[2] + tests_manifest[test_name] = config + sql_log = \ + f"""select * from {tests_manifest[test_name]['schema']}.{tests_manifest[test_name]['alias'] + if tests_manifest[test_name]['alias'] else tests_manifest[test_name]['name']}""" + sql_log_format = "\n" + '-'*96 + "\n" + sql_log + "\n" + '-'*96 + if 'compiled_sql' in config.keys(): + sql_text = config['compiled_sql'] + elif 'compiled_code' in config.keys(): + sql_text = config['compiled_code'] + elif 'raw_code' in config.keys(): + sql_text = config['raw_code'] + else: + sql_text = config['raw_sql'] + sql_text = [sql_log_format, sql_text] + tests_manifest[test_name]['sql'] = str.join('', sql_text) + test_cases = [] for test in tests: - test_case = TestCase( + test_name = test["unique_id"].split('.')[2] + test_timestamp = test['timing'][0]["started_at"] if ["status"] == 'pass' \ + else test_suite_timestamp + test_sql = tests_manifest[test_name]["sql"] if test_name in tests_manifest.keys() else 'N/A' + test_case = DBTTestCase( classname=test["unique_id"], - name=test["unique_id"].split(".")[-2], + name=test["unique_id"].split(".")[2], elapsed_sec=test["execution_time"], status=test["status"], + timestamp=test_timestamp, + stdout=test_sql ) if test["status"] == "fail": - test_case.add_failure_info(message=test["message"]) + test_case.add_failure_info(message=test["message"], output=test["message"]) if test["status"] == "error": - test_case.add_error_info(message=test["message"]) + test_case.add_error_info(message=test["message"], output=test["message"]) if test["status"] == "skipped": - test_case.add_skipped_info(message=test["message"]) + test_case.add_skipped_info(message=test["message"], output=test["message"]) test_cases.append(test_case) - test_suite = TestSuite("Tests", test_cases=test_cases) + test_suite = DBTTestSuite(f"Tests", + test_cases=test_cases, + time=total_elapsed_time, + timestamp=test_suite_timestamp) xml_report = to_xml_report_string([test_suite])