From 5fa2b29359873026eda9a17efbd97b45a71688d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=9B=E6=B5=B7=E8=8B=B1=5FShying?= Date: Thu, 26 Feb 2026 15:14:36 +0800 Subject: [PATCH 1/2] Add external tools checker and config Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- checkers/external_tools_check.py | 79 ++++++++++++++++++++++++++++++++ config/sast.json | 49 +++++++++++++++++++- requirements.txt | 9 ++++ 3 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 checkers/external_tools_check.py diff --git a/checkers/external_tools_check.py b/checkers/external_tools_check.py new file mode 100644 index 0000000..68ca629 --- /dev/null +++ b/checkers/external_tools_check.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +# +# Copyright 2026 Enflame. All Rights Reserved. +# + +import subprocess +import os +import re +import sys +import json +from pathlib import Path + +CHECKERS_DIR = Path(__file__).resolve().parent +REPO_DIR = CHECKERS_DIR.parent +sys.path.append(str(REPO_DIR)) +from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck + +CONFIG_PATH = REPO_DIR / 'config' / 'sast.json' + +class CIChecker(CICheckerCommon): + def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): + self.check_name = "external tools check" + super().__init__(api_init, args, check_api_type, self.check_name, static_check) + self.local_ci_check = True + self.local_workspace_check = True + self.pass_flag = True + self.files_static_check_status = {} + self.command_output = {} + + def _load_tools(self): + try: + with open(CONFIG_PATH, 'r') as f: + cfg = json.load(f) + return cfg.get('external_tools', []) + except Exception: + return [] + + def check_func(self): + tools = self._load_tools() + # run each configured tool command; commands should be shell commands (bash) + for tool in tools: + name = tool.get('name') + cmd = tool.get('command') + guide = tool.get('guide_link','') + if not cmd or not name: + continue + print("Running {}: {}".format(name, cmd)) + pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, executable='/bin/bash') + stdout, _ = pipe.communicate() + out = stdout.decode('utf-8', errors='ignore') if stdout else '' + ret = pipe.returncode + if ret != 0: + self.pass_flag = False + self.command_output[name] = out + else: + # still capture output for information + self.command_output[name] = out + + return self.check_report() + + def check_report(self): + # Prepare a synthetic files_static_check_status entry to integrate with existing output format + self.files_static_check_status['external_tools'] = {"check_status": self.pass_flag} + QualityCodexCommitee.FormatOutputSimple(self.check_name, self.pass_flag, self.id, self.files_static_check_status, CHECK_LEVEL) + + # Print outputs for failed tools + for tool_name, output in self.command_output.items(): + if output: + print('\t=== {} output ==='.format(tool_name)) + for line in output.splitlines(): + print('\t\t' + line) + + if not self.pass_flag: + print('\tPlease review guide links in config/sast.json for remediation.') + assert self.pass_flag, "Failed {}".format(self.id) + +if __name__ == '__main__': + c = CIChecker(None, None, None) + c.check() diff --git a/config/sast.json b/config/sast.json index e564348..906de7a 100644 --- a/config/sast.json +++ b/config/sast.json @@ -221,7 +221,54 @@ }, "checks_group":{ "external_checks":[ - "codespell_check","cpplint_check","gitleaks_check","hardcode_check","jsonlint_check","line_terminators_check" + "codespell_check","cpplint_check","gitleaks_check","hardcode_check","jsonlint_check","line_terminators_check","external_tools_check" ] + }, + "external_tools":[ + { + "name":"bandit", + "command":"bandit -r . -f json -o bandit_report.json", + "guide_link":"https://bandit.readthedocs.io/" + }, + { + "name":"mypy", + "command":"mypy . --ignore-missing-imports --show-error-codes", + "guide_link":"https://mypy-lang.org/" + }, + { + "name":"pylint", + "command":"pylint $(git ls-files '*.py') || true", + "guide_link":"https://pylint.pycqa.org/" + }, + { + "name":"safety", + "command":"safety check -r requirements.txt --full-report || true", + "guide_link":"https://pyup.io/safety/" + }, + { + "name":"detect-secrets", + "command":"detect-secrets scan --all-files --json > .secrets.json || true", + "guide_link":"https://github.com/Yelp/detect-secrets" + }, + { + "name":"checkov", + "command":"checkov -d . -o json || true", + "guide_link":"https://www.checkov.io/" + }, + { + "name":"hadolint", + "command":"hadolint Dockerfile -f json || true", + "guide_link":"https://github.com/hadolint/hadolint" + }, + { + "name":"tfsec", + "command":"tfsec . --format json --out tfsec_report.json || true", + "guide_link":"https://tfsec.dev/" + }, + { + "name":"yamllint", + "command":"yamllint -f parsable . || true", + "guide_link":"https://yamllint.readthedocs.io/" } + ] } diff --git a/requirements.txt b/requirements.txt index 068d49c..7e36e35 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,12 @@ chardet==4.0.0 Requests==2.32.4 tomli==2.2.1 + +bandit +mypy +pylint +safety +detect-secrets +checkov +yamllint + From 5b13a08a7c64077479412bf04cc97d8a115a2ca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A1=82=E5=B0=91=E7=AB=8B?= Date: Fri, 27 Feb 2026 15:16:26 +0800 Subject: [PATCH 2/2] add checks --- Dockerfile | 9 ++- checkers/bandit_check.py | 124 ++++++++++++++++++++++++++++ checkers/detect_secrets_check.py | 111 +++++++++++++++++++++++++ checkers/external_tools_check.py | 79 ------------------ checkers/hadolint_check.py | 122 ++++++++++++++++++++++++++++ checkers/mypy_check.py | 106 ++++++++++++++++++++++++ checkers/pylint_check.py | 108 +++++++++++++++++++++++++ checkers/safety_check.py | 135 +++++++++++++++++++++++++++++++ checkers/yamllint_check.py | 103 +++++++++++++++++++++++ common/static_check_common.py | 4 +- config/sast.json | 93 ++++++++++----------- requirements.txt | 22 +++-- 12 files changed, 870 insertions(+), 146 deletions(-) create mode 100644 checkers/bandit_check.py create mode 100644 checkers/detect_secrets_check.py delete mode 100644 checkers/external_tools_check.py create mode 100644 checkers/hadolint_check.py create mode 100644 checkers/mypy_check.py create mode 100644 checkers/pylint_check.py create mode 100644 checkers/safety_check.py create mode 100644 checkers/yamllint_check.py diff --git a/Dockerfile b/Dockerfile index 7aeae14..40da314 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ RUN sed -i s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g /etc/apk/repositories \ && echo "http://mirrors.aliyun.com/alpine/latest-stable/community/" >> /etc/apk/repositories \ && apk update \ && apk add --no-cache git wget readline-dev bash cloc file curl openssl \ - tzdata zlib zlib-dev git-lfs gojq sqlite-dev \ + tzdata zlib zlib-dev git-lfs gojq sqlite-dev build-base libffi-dev \ && apk cache clean \ && cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime @@ -50,12 +50,15 @@ RUN wget -P /tmp https://github.com/astral-sh/ruff/releases/download/0.6.4/ruff- && cd ../ \ && rm -rf ruff-x86_64-unknown-linux-gnu/ ruff-x86_64-unknown-linux-gnu.tar.gz -#### add module package -RUN python3.8 -m pip install --no-cache-dir -U setuptools==59.6.0 wheel==0.37.1 pip==21.3.1 requests==2.22.0 pylint==3.2.7 lizard==1.17.31 +RUN wget https://github.com/hadolint/hadolint/releases/download/v2.12.0/hadolint-Linux-x86_64 -O /usr/local/bin/hadolint \ + && chmod +x /usr/local/bin/hadolint #### add project COPY . /sast/ +#### add module package +RUN python3.8 -m pip install --no-cache-dir -U -r /sast/requirements.txt + WORKDIR /app RUN chmod -R 777 /app && chmod -R 777 /sast/ diff --git a/checkers/bandit_check.py b/checkers/bandit_check.py new file mode 100644 index 0000000..51bedc5 --- /dev/null +++ b/checkers/bandit_check.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +# +# Copyright 2023-2025 Enflame. All Rights Reserved. +# + +import subprocess +import os +import re +import sys +import json +import tempfile +from pathlib import Path + +CHECKERS_DIR = Path(__file__).resolve().parent +REPO_DIR = CHECKERS_DIR.parent +sys.path.append(str(REPO_DIR)) +from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck +from common.config_parser import * + + +def excepthook(exctype, value, traceback): + if exctype == AssertionError: + pass + else: + sys.__excepthook__(exctype, value, traceback) + + +sys.excepthook = excepthook + + +def _norm_path(path): + return path.lstrip("./").replace("//", "/") if path else path + + +class CIChecker(CICheckerCommon): + def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): + self.check_name = "bandit check" + super().__init__(api_init, args, check_api_type, self.check_name, static_check) + self.local_ci_check = True + self.local_workspace_check = True + self.command_output = {} + + def filter_file(self): + return [ + x for x in self.add_or_changed_files + if re.match(self.check_files, x) and os.path.isfile(x) + and not any(re.match(y, x) for y in self.exclude_files) + ] + + def check_func(self): + self.check_files_list = self.filter_file() + if not self.check_files_list: + return self.check_report() + self.diff_info = self.get_diff_info() + for file_path in self.check_files_list: + self.files_static_check_status[file_path] = {"check_status": True} + self.command_output[file_path] = "" + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + report_path = f.name + try: + cmd = "bandit --severity-level high -s B602 -f json -o {} {}".format( + report_path, + " ".join(repr(p) for p in self.check_files_list) + ) + pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + shell=True, executable="/bin/bash") + pipe.communicate() + + if not os.path.isfile(report_path): + return self.check_report() + + try: + with open(report_path, 'r', encoding='utf-8', errors='ignore') as rf: + data = json.load(rf) + except (json.JSONDecodeError, OSError): + return self.check_report() + results = data.get("results", []) + for r in results: + fpath = _norm_path(r.get("filename", "")) + if fpath not in self.files_static_check_status: + continue + line_number = r.get("line_number") + add_lines = [x[0] for x in self.diff_info.get(fpath, {}).get("add", [])] + if (line_number == 0 and fpath in self.add_files) or (line_number and line_number in add_lines): + self.files_static_check_status[fpath]["check_status"] = False + self.pass_flag = False + msg = "{}:{}: {} [{}]".format( + r.get("filename", fpath), + line_number, + r.get("issue_text", ""), + r.get("test_id", "") + ) + self.command_output[fpath] = self.command_output.get(fpath, "") + msg + "\n" + finally: + if os.path.isfile(report_path): + os.unlink(report_path) + return self.check_report() + + def check_report(self): + QualityCodexCommitee.FormatOutputSimple( + self.check_name, self.pass_flag, self.id, + self.files_static_check_status, CHECK_LEVEL + ) + for file_path, check_status in self.files_static_check_status.items(): + if not check_status['check_status']: + hook_data_item = {"file": file_path, "message": [], "result": "fail"} + print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path)) + msg = self.command_output.get(file_path, "") + for one_msg in msg.split("\n"): + if one_msg: + hook_data_item["message"].append(one_msg) + print("\t\t" + one_msg.replace("\n", "")) + self.hook_data.append(hook_data_item) + else: + self.hook_data.append({"file": file_path, "message": [], "result": "pass"}) + if not self.pass_flag: + print("\tPlease review guide link:{}".format(self.guide_link)) + assert self.pass_flag, "Failed {}".format(self.id) + + +if __name__ == "__main__": + checker = CIChecker(None, None, None) + checker.check() diff --git a/checkers/detect_secrets_check.py b/checkers/detect_secrets_check.py new file mode 100644 index 0000000..df4275d --- /dev/null +++ b/checkers/detect_secrets_check.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +# +# Copyright 2023-2025 Enflame. All Rights Reserved. +# + +import subprocess +import os +import re +import sys +import json +from pathlib import Path + +CHECKERS_DIR = Path(__file__).resolve().parent +REPO_DIR = CHECKERS_DIR.parent +sys.path.append(str(REPO_DIR)) +from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck +from common.config_parser import * + + +def excepthook(exctype, value, traceback): + if exctype == AssertionError: + pass + else: + sys.__excepthook__(exctype, value, traceback) + + +sys.excepthook = excepthook + + +def _norm_path(path): + return path.lstrip("./").replace("//", "/") if path else path + + +class CIChecker(CICheckerCommon): + def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): + self.check_name = "detect-secrets check" + super().__init__(api_init, args, check_api_type, self.check_name, static_check) + self.local_ci_check = True + self.local_workspace_check = True + self.command_output = {} + + def check_func(self): + if not self.add_or_changed_files: + return self.check_report() + self.diff_info = self.get_diff_info() + for file_path in self.add_or_changed_files: + if not os.path.isfile(file_path): + continue + self.files_static_check_status[file_path] = {"check_status": True} + self.command_output[file_path] = "" + + files_arg = " ".join(repr(p) for p in self.add_or_changed_files if os.path.isfile(p)) + if not files_arg: + return self.check_report() + cmd = "detect-secrets scan {} 2>/dev/null".format(files_arg) + pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + shell=True, executable="/bin/bash") + stdout, _ = pipe.communicate() + out = stdout.decode('utf-8', errors='ignore').strip() + if not out: + return self.check_report() + try: + data = json.loads(out) + except json.JSONDecodeError: + return self.check_report() + results = data.get("results", {}) + for fpath, secrets in results.items(): + fpath_norm = _norm_path(fpath) + if fpath_norm not in self.files_static_check_status: + continue + add_lines = [x[0] for x in self.diff_info.get(fpath_norm, {}).get("add", [])] + for s in secrets: + if not isinstance(s, dict): + continue + line_number = s.get("line_number") + if line_number is None: + line_number = s.get("line") + if (line_number == 0 and fpath_norm in self.add_files) or (line_number and line_number in add_lines): + self.files_static_check_status[fpath_norm]["check_status"] = False + self.pass_flag = False + msg = "{}:{}: potential secret (type: {})".format( + fpath_norm, line_number, s.get("type", "unknown") + ) + self.command_output[fpath_norm] = self.command_output.get(fpath_norm, "") + msg + "\n" + return self.check_report() + + def check_report(self): + QualityCodexCommitee.FormatOutputSimple( + self.check_name, self.pass_flag, self.id, + self.files_static_check_status, CHECK_LEVEL + ) + for file_path, check_status in self.files_static_check_status.items(): + if not check_status['check_status']: + hook_data_item = {"file": file_path, "message": [], "result": "fail"} + print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path)) + msg = self.command_output.get(file_path, "") + for one_msg in msg.split("\n"): + if one_msg: + hook_data_item["message"].append(one_msg) + print("\t\t" + one_msg.replace("\n", "")) + self.hook_data.append(hook_data_item) + else: + self.hook_data.append({"file": file_path, "message": [], "result": "pass"}) + if not self.pass_flag: + print("\tPlease review guide link:{}".format(self.guide_link)) + assert self.pass_flag, "Failed {}".format(self.id) + + +if __name__ == "__main__": + checker = CIChecker(None, None, None) + checker.check() diff --git a/checkers/external_tools_check.py b/checkers/external_tools_check.py deleted file mode 100644 index 68ca629..0000000 --- a/checkers/external_tools_check.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2026 Enflame. All Rights Reserved. -# - -import subprocess -import os -import re -import sys -import json -from pathlib import Path - -CHECKERS_DIR = Path(__file__).resolve().parent -REPO_DIR = CHECKERS_DIR.parent -sys.path.append(str(REPO_DIR)) -from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck - -CONFIG_PATH = REPO_DIR / 'config' / 'sast.json' - -class CIChecker(CICheckerCommon): - def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): - self.check_name = "external tools check" - super().__init__(api_init, args, check_api_type, self.check_name, static_check) - self.local_ci_check = True - self.local_workspace_check = True - self.pass_flag = True - self.files_static_check_status = {} - self.command_output = {} - - def _load_tools(self): - try: - with open(CONFIG_PATH, 'r') as f: - cfg = json.load(f) - return cfg.get('external_tools', []) - except Exception: - return [] - - def check_func(self): - tools = self._load_tools() - # run each configured tool command; commands should be shell commands (bash) - for tool in tools: - name = tool.get('name') - cmd = tool.get('command') - guide = tool.get('guide_link','') - if not cmd or not name: - continue - print("Running {}: {}".format(name, cmd)) - pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, executable='/bin/bash') - stdout, _ = pipe.communicate() - out = stdout.decode('utf-8', errors='ignore') if stdout else '' - ret = pipe.returncode - if ret != 0: - self.pass_flag = False - self.command_output[name] = out - else: - # still capture output for information - self.command_output[name] = out - - return self.check_report() - - def check_report(self): - # Prepare a synthetic files_static_check_status entry to integrate with existing output format - self.files_static_check_status['external_tools'] = {"check_status": self.pass_flag} - QualityCodexCommitee.FormatOutputSimple(self.check_name, self.pass_flag, self.id, self.files_static_check_status, CHECK_LEVEL) - - # Print outputs for failed tools - for tool_name, output in self.command_output.items(): - if output: - print('\t=== {} output ==='.format(tool_name)) - for line in output.splitlines(): - print('\t\t' + line) - - if not self.pass_flag: - print('\tPlease review guide links in config/sast.json for remediation.') - assert self.pass_flag, "Failed {}".format(self.id) - -if __name__ == '__main__': - c = CIChecker(None, None, None) - c.check() diff --git a/checkers/hadolint_check.py b/checkers/hadolint_check.py new file mode 100644 index 0000000..ed6428c --- /dev/null +++ b/checkers/hadolint_check.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# +# Copyright 2023-2025 Enflame. All Rights Reserved. +# + +import subprocess +import os +import re +import sys +import json +from pathlib import Path + +CHECKERS_DIR = Path(__file__).resolve().parent +REPO_DIR = CHECKERS_DIR.parent +sys.path.append(str(REPO_DIR)) +from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck +from common.config_parser import * + + +def excepthook(exctype, value, traceback): + if exctype == AssertionError: + pass + else: + sys.__excepthook__(exctype, value, traceback) + + +sys.excepthook = excepthook + + +class CIChecker(CICheckerCommon): + def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): + self.check_name = "hadolint check" + super().__init__(api_init, args, check_api_type, self.check_name, static_check) + self.local_ci_check = True + self.local_workspace_check = True + self.command_output = {} + + def filter_file(self): + return [ + x for x in self.add_or_changed_files + if re.match(self.check_files, x) and os.path.isfile(x) + and not any(re.match(y, x) for y in self.exclude_files) + ] + + def check_func(self): + self.check_files_list = self.filter_file() + if not self.check_files_list: + return self.check_report() + self.diff_info = self.get_diff_info() + for file_path in self.check_files_list: + self.files_static_check_status[file_path] = {"check_status": True} + self.command_output[file_path] = "" + + for file_path in self.check_files_list: + cmd = "hadolint -f json {} 2>/dev/null || true".format(repr(file_path)) + pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + shell=True, executable="/bin/bash") + stdout, _ = pipe.communicate() + out = stdout.decode('utf-8', errors='ignore').strip() + add_lines_number = [x[0] for x in self.diff_info.get(file_path, {}).get("add", [])] + if not out: + continue + try: + data = json.loads(out) + except json.JSONDecodeError: + continue + # hadolint JSON: array of { "line": N, "message": "...", "code": "..." } or similar + if isinstance(data, list): + issues = data + elif isinstance(data, dict): + issues = data.get("", data.get("result", [])) + if not isinstance(issues, list): + issues = [] + else: + issues = [] + for issue in issues: + if not isinstance(issue, dict): + continue + line_number = issue.get("line") or issue.get("lineNumber") or issue.get("startLine") + if line_number is None: + continue + try: + line_number = int(line_number) + except (ValueError, TypeError): + continue + if (line_number == 0 and file_path in self.add_files) or (line_number in add_lines_number): + self.files_static_check_status[file_path]["check_status"] = False + self.pass_flag = False + msg = "{}:{}: {} [{}]".format( + file_path, line_number, + issue.get("message", issue.get("description", "")), + issue.get("code", "") + ) + self.command_output[file_path] = self.command_output.get(file_path, "") + msg + "\n" + + return self.check_report() + + def check_report(self): + QualityCodexCommitee.FormatOutputSimple( + self.check_name, self.pass_flag, self.id, + self.files_static_check_status, CHECK_LEVEL + ) + for file_path, check_status in self.files_static_check_status.items(): + if not check_status['check_status']: + hook_data_item = {"file": file_path, "message": [], "result": "fail"} + print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path)) + msg = self.command_output.get(file_path, "") + for one_msg in msg.split("\n"): + if one_msg: + hook_data_item["message"].append(one_msg) + print("\t\t" + one_msg.replace("\n", "")) + self.hook_data.append(hook_data_item) + else: + self.hook_data.append({"file": file_path, "message": [], "result": "pass"}) + if not self.pass_flag: + print("\tPlease review guide link:{}".format(self.guide_link)) + assert self.pass_flag, "Failed {}".format(self.id) + + +if __name__ == "__main__": + checker = CIChecker(None, None, None) + checker.check() diff --git a/checkers/mypy_check.py b/checkers/mypy_check.py new file mode 100644 index 0000000..c19be99 --- /dev/null +++ b/checkers/mypy_check.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +# +# Copyright 2023-2025 Enflame. All Rights Reserved. +# + +import subprocess +import os +import re +import sys +import json +from pathlib import Path + +CHECKERS_DIR = Path(__file__).resolve().parent +REPO_DIR = CHECKERS_DIR.parent +sys.path.append(str(REPO_DIR)) +from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck +from common.config_parser import * + + +def excepthook(exctype, value, traceback): + if exctype == AssertionError: + pass + else: + sys.__excepthook__(exctype, value, traceback) + + +sys.excepthook = excepthook + + +class CIChecker(CICheckerCommon): + def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): + self.check_name = "mypy check" + super().__init__(api_init, args, check_api_type, self.check_name, static_check) + self.local_ci_check = True + self.local_workspace_check = True + self.command_output = {} + + def filter_file(self): + return [ + x for x in self.add_or_changed_files + if re.match(self.check_files, x) and os.path.isfile(x) + and not any(re.match(y, x) for y in self.exclude_files) + ] + + def check_func(self): + self.check_files_list = self.filter_file() + if not self.check_files_list: + return self.check_report() + self.diff_info = self.get_diff_info() + for file_path in self.check_files_list: + self.files_static_check_status[file_path] = {"check_status": True} + self.command_output[file_path] = "" + + cmd = "mypy --ignore-missing-imports --show-error-codes {} 2>&1".format( + " ".join(repr(p) for p in self.check_files_list) + ) + pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + shell=True, executable="/bin/bash") + stdout, _ = pipe.communicate() + out = stdout.decode('utf-8', errors='ignore') + # mypy format: file:line: error: message or file:line:column: error: message + for line in out.split("\n"): + if not line.strip(): + continue + m = re.match(r"^(.+?):(\d+)(?::(\d+))?:\s*error:", line) + if m: + fpath = m.group(1).strip().lstrip("./") + try: + line_number = int(m.group(2)) + except ValueError: + continue + if fpath not in self.files_static_check_status: + continue + add_lines = [x[0] for x in self.diff_info.get(fpath, {}).get("add", [])] + if (line_number == 0 and fpath in self.add_files) or (line_number and line_number in add_lines): + self.files_static_check_status[fpath]["check_status"] = False + self.pass_flag = False + self.command_output[fpath] = self.command_output.get(fpath, "") + line + "\n" + + return self.check_report() + + def check_report(self): + QualityCodexCommitee.FormatOutputSimple( + self.check_name, self.pass_flag, self.id, + self.files_static_check_status, CHECK_LEVEL + ) + for file_path, check_status in self.files_static_check_status.items(): + if not check_status['check_status']: + hook_data_item = {"file": file_path, "message": [], "result": "fail"} + print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path)) + msg = self.command_output.get(file_path, "") + for one_msg in msg.split("\n"): + if one_msg: + hook_data_item["message"].append(one_msg) + print("\t\t" + one_msg.replace("\n", "")) + self.hook_data.append(hook_data_item) + else: + self.hook_data.append({"file": file_path, "message": [], "result": "pass"}) + if not self.pass_flag: + print("\tPlease review guide link:{}".format(self.guide_link)) + assert self.pass_flag, "Failed {}".format(self.id) + + +if __name__ == "__main__": + checker = CIChecker(None, None, None) + checker.check() diff --git a/checkers/pylint_check.py b/checkers/pylint_check.py new file mode 100644 index 0000000..f512c4a --- /dev/null +++ b/checkers/pylint_check.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# +# Copyright 2023-2025 Enflame. All Rights Reserved. +# + +import subprocess +import os +import re +import sys +import json +from pathlib import Path + +CHECKERS_DIR = Path(__file__).resolve().parent +REPO_DIR = CHECKERS_DIR.parent +sys.path.append(str(REPO_DIR)) +from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck +from common.config_parser import * + + +def excepthook(exctype, value, traceback): + if exctype == AssertionError: + pass + else: + sys.__excepthook__(exctype, value, traceback) + + +sys.excepthook = excepthook + + +class CIChecker(CICheckerCommon): + def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): + self.check_name = "pylint check" + super().__init__(api_init, args, check_api_type, self.check_name, static_check) + self.local_ci_check = True + self.local_workspace_check = True + self.command_output = {} + + def filter_file(self): + return [ + x for x in self.add_or_changed_files + if re.match(self.check_files, x) and os.path.isfile(x) + and not any(re.match(y, x) for y in self.exclude_files) + ] + + def check_func(self): + self.check_files_list = self.filter_file() + if not self.check_files_list: + return self.check_report() + self.diff_info = self.get_diff_info() + for file_path in self.check_files_list: + self.files_static_check_status[file_path] = {"check_status": True} + self.command_output[file_path] = "" + + for file_path in self.check_files_list: + disable = ( + "consider-using-f-string,import-error,wrong-import-position," + "too-few-public-methods,attribute-defined-outside-init," + "missing-module-docstring,wrong-import-order,wildcard-import," + "unused-import,unused-wildcard-import" + ) + cmd = "pylint -E --disable={} {} --output-format=text 2>&1 || true".format( + disable, repr(file_path) + ) + pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + shell=True, executable="/bin/bash") + stdout, _ = pipe.communicate() + out = stdout.decode('utf-8', errors='ignore') + add_lines_number = [x[0] for x in self.diff_info.get(file_path, {}).get("add", [])] + for line in out.split("\n"): + # pylint: file:line:column: message (code) + m = re.match(r"^(.+?):(\d+):(\d+):\s*(.+)", line) + if m: + try: + line_number = int(m.group(2)) + except ValueError: + continue + if (line_number == 0 and file_path in self.add_files) or (line_number in add_lines_number): + self.files_static_check_status[file_path]["check_status"] = False + self.pass_flag = False + self.command_output[file_path] = self.command_output.get(file_path, "") + line + "\n" + + return self.check_report() + + def check_report(self): + QualityCodexCommitee.FormatOutputSimple( + self.check_name, self.pass_flag, self.id, + self.files_static_check_status, CHECK_LEVEL + ) + for file_path, check_status in self.files_static_check_status.items(): + if not check_status['check_status']: + hook_data_item = {"file": file_path, "message": [], "result": "fail"} + print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path)) + msg = self.command_output.get(file_path, "") + for one_msg in msg.split("\n"): + if one_msg: + hook_data_item["message"].append(one_msg) + print("\t\t" + one_msg.replace("\n", "")) + self.hook_data.append(hook_data_item) + else: + self.hook_data.append({"file": file_path, "message": [], "result": "pass"}) + if not self.pass_flag: + print("\tPlease review guide link:{}".format(self.guide_link)) + assert self.pass_flag, "Failed {}".format(self.id) + + +if __name__ == "__main__": + checker = CIChecker(None, None, None) + checker.check() diff --git a/checkers/safety_check.py b/checkers/safety_check.py new file mode 100644 index 0000000..7dc9c9b --- /dev/null +++ b/checkers/safety_check.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +# +# Copyright 2023-2025 Enflame. All Rights Reserved. +# + +import subprocess +import os +import re +import sys +import json +from pathlib import Path + +CHECKERS_DIR = Path(__file__).resolve().parent +REPO_DIR = CHECKERS_DIR.parent +sys.path.append(str(REPO_DIR)) +from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck +from common.config_parser import * + + +def excepthook(exctype, value, traceback): + if exctype == AssertionError: + pass + else: + sys.__excepthook__(exctype, value, traceback) + + +sys.excepthook = excepthook + + +class CIChecker(CICheckerCommon): + def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): + self.check_name = "safety check" + super().__init__(api_init, args, check_api_type, self.check_name, static_check) + self.local_ci_check = True + self.local_workspace_check = True + self.command_output = {} + + def get_check_files(self): + return [ + x for x in self.add_or_changed_files + if any(re.match(r, x) for r in self.check_files_regex) and os.path.isfile(x) + ] + + def _package_from_line(self, line): + """Parse package name from requirements line (e.g. 'foo==1.0' or 'foo>=1.0').""" + line = line.strip() + if not line or line.startswith("#") or line.startswith("-"): + return None + for sep in ["==", ">=", "<=", ">", "<", "~=", "!="]: + if sep in line: + return line.split(sep)[0].strip().lower() + return line.split()[0].strip().lower() if line else None + + def check_func(self): + self.check_files_list = self.get_check_files() + if not self.check_files_list: + return self.check_report() + self.diff_info = self.get_diff_info() + for file_path in self.check_files_list: + self.files_static_check_status[file_path] = {"check_status": True} + self.command_output[file_path] = "" + + for req_file in self.check_files_list: + add_lines_with_num = self.diff_info.get(req_file, {}).get("add", []) + add_lines_number = [x[0] for x in add_lines_with_num] + added_packages = set() + for line_no, content in add_lines_with_num: + pkg = self._package_from_line(content) + if pkg: + added_packages.add((line_no, pkg)) + + if not added_packages: + continue + + pipe = subprocess.Popen( + "safety check -r {} --json 2>/dev/null || true".format(repr(req_file)), + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, executable="/bin/bash" + ) + stdout, _ = pipe.communicate() + out = stdout.decode('utf-8', errors='ignore').strip() + if not out: + continue + # safety output may include banner + DEPRECATED text + JSON; extract JSON only + start = out.find("{") + end = out.rfind("}") + if start < 0 or end <= start: + continue + try: + data = json.loads(out[start : end + 1]) + except json.JSONDecodeError: + continue + # safety JSON: list of vuln or {"vulnerabilities": [...]} depending on version + vulns = data if isinstance(data, list) else data.get("vulnerabilities", data.get("affected_packages", [])) + if not isinstance(vulns, list): + continue + for v in vulns: + if isinstance(v, dict): + pkg_name = (v.get("package") or v.get("name") or "").lower() + else: + continue + for line_no, pkg in added_packages: + if pkg in pkg_name or pkg_name in pkg: + self.files_static_check_status[req_file]["check_status"] = False + self.pass_flag = False + msg = "{}:{}: vulnerable package '{}'".format(req_file, line_no, pkg_name) + self.command_output[req_file] = self.command_output.get(req_file, "") + msg + "\n" + break + + return self.check_report() + + def check_report(self): + QualityCodexCommitee.FormatOutputSimple( + self.check_name, self.pass_flag, self.id, + self.files_static_check_status, CHECK_LEVEL + ) + for file_path, check_status in self.files_static_check_status.items(): + if not check_status['check_status']: + hook_data_item = {"file": file_path, "message": [], "result": "fail"} + print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path)) + msg = self.command_output.get(file_path, "") + for one_msg in msg.split("\n"): + if one_msg: + hook_data_item["message"].append(one_msg) + print("\t\t" + one_msg.replace("\n", "")) + self.hook_data.append(hook_data_item) + else: + self.hook_data.append({"file": file_path, "message": [], "result": "pass"}) + if not self.pass_flag: + print("\tPlease review guide link:{}".format(self.guide_link)) + assert self.pass_flag, "Failed {}".format(self.id) + + +if __name__ == "__main__": + checker = CIChecker(None, None, None) + checker.check() diff --git a/checkers/yamllint_check.py b/checkers/yamllint_check.py new file mode 100644 index 0000000..e466841 --- /dev/null +++ b/checkers/yamllint_check.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +# +# Copyright 2023-2025 Enflame. All Rights Reserved. +# + +import subprocess +import os +import re +import sys +import json +from pathlib import Path + +CHECKERS_DIR = Path(__file__).resolve().parent +REPO_DIR = CHECKERS_DIR.parent +sys.path.append(str(REPO_DIR)) +from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck +from common.config_parser import * + + +def excepthook(exctype, value, traceback): + if exctype == AssertionError: + pass + else: + sys.__excepthook__(exctype, value, traceback) + + +sys.excepthook = excepthook + + +class CIChecker(CICheckerCommon): + def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck): + self.check_name = "yamllint check" + super().__init__(api_init, args, check_api_type, self.check_name, static_check) + self.local_ci_check = True + self.local_workspace_check = True + self.command_output = {} + + def filter_file(self): + return [ + x for x in self.add_or_changed_files + if re.match(self.check_files, x) and os.path.isfile(x) + and not any(re.match(y, x) for y in self.exclude_files) + ] + + def check_func(self): + self.check_files_list = self.filter_file() + if not self.check_files_list: + return self.check_report() + self.diff_info = self.get_diff_info() + for file_path in self.check_files_list: + self.files_static_check_status[file_path] = {"check_status": True} + self.command_output[file_path] = "" + + for file_path in self.check_files_list: + cmd = "yamllint -f parsable {} 2>&1 || true".format(repr(file_path)) + pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + shell=True, executable="/bin/bash") + stdout, _ = pipe.communicate() + out = stdout.decode('utf-8', errors='ignore') + # parsable format: file:line:col: level: message + add_lines_number = [x[0] for x in self.diff_info.get(file_path, {}).get("add", [])] + for line in out.split("\n"): + if not line.strip(): + continue + parts = line.split(":", 4) + if len(parts) < 4: + continue + try: + line_number = int(parts[1]) + except (ValueError, IndexError): + continue + if (line_number == 0 and file_path in self.add_files) or (line_number in add_lines_number): + self.files_static_check_status[file_path]["check_status"] = False + self.pass_flag = False + self.command_output[file_path] = self.command_output.get(file_path, "") + line + "\n" + + return self.check_report() + + def check_report(self): + QualityCodexCommitee.FormatOutputSimple( + self.check_name, self.pass_flag, self.id, + self.files_static_check_status, CHECK_LEVEL + ) + for file_path, check_status in self.files_static_check_status.items(): + if not check_status['check_status']: + hook_data_item = {"file": file_path, "message": [], "result": "fail"} + print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path)) + msg = self.command_output.get(file_path, "") + for one_msg in msg.split("\n"): + if one_msg: + hook_data_item["message"].append(one_msg) + print("\t\t" + one_msg.replace("\n", "")) + self.hook_data.append(hook_data_item) + else: + self.hook_data.append({"file": file_path, "message": [], "result": "pass"}) + if not self.pass_flag: + print("\tPlease review guide link:{}".format(self.guide_link)) + assert self.pass_flag, "Failed {}".format(self.id) + + +if __name__ == "__main__": + checker = CIChecker(None, None, None) + checker.check() diff --git a/common/static_check_common.py b/common/static_check_common.py index 38e6db3..290c30b 100644 --- a/common/static_check_common.py +++ b/common/static_check_common.py @@ -41,7 +41,7 @@ def __init__(self, api_init, args, check_api_type, cache_file = SERIALIZED_FILE_ self.timestamp = datetime.now() if os.path.exists(cache_file): with open(cache_file, 'rb') as load_file: - self.cache = pickle.load(load_file, encoding='utf-8') + self.cache = pickle.load(load_file, encoding='utf-8') # nosec B301 - local cache only self.cache.timestamp = datetime.now() else: if not api_init or not args or not check_api_type: @@ -82,7 +82,7 @@ def __init__(self, api_init, args, check_api_type, cache_file = SERIALIZED_FILE_ if self.check_api_type != API_TYPE_LOCALGIT: with open(SERIALIZED_FILE_NAME, 'wb') as dump_file: - pickle.dump(self, dump_file, protocol=pickle.DEFAULT_PROTOCOL) + pickle.dump(self, dump_file, protocol=pickle.DEFAULT_PROTOCOL) # nosec B301 - local cache only def _patchset_files(self): diff --git a/config/sast.json b/config/sast.json index 906de7a..a011e8f 100644 --- a/config/sast.json +++ b/config/sast.json @@ -167,9 +167,10 @@ }, "pylint check":{ "id":"SP0027", - "guide_link":"", + "guide_link":"https://pylint.pycqa.org/", "ignore_admin":[], - "check_files":".*\\.py" + "check_files":".*\\.py$", + "exclude_files":[] }, "lizard check":{ "id":"SX0029", @@ -201,6 +202,45 @@ "guide_link":"", "ignore_admin":[] }, + "bandit check":{ + "id":"SX0061", + "guide_link":"https://bandit.readthedocs.io/", + "ignore_admin":[], + "check_files":".*\\.py$", + "exclude_files":[] + }, + "mypy check":{ + "id":"SX0062", + "guide_link":"https://mypy-lang.org/", + "ignore_admin":[], + "check_files":".*\\.py$", + "exclude_files":[] + }, + "safety check":{ + "id":"SX0063", + "guide_link":"https://pyup.io/safety/", + "ignore_admin":[], + "check_files_regex":[".*requirements.*\\.txt$"] + }, + "detect-secrets check":{ + "id":"SX0064", + "guide_link":"https://github.com/Yelp/detect-secrets", + "ignore_admin":[] + }, + "hadolint check":{ + "id":"SX0065", + "guide_link":"https://github.com/hadolint/hadolint", + "ignore_admin":[], + "check_files":".*Dockerfile.*", + "exclude_files":[] + }, + "yamllint check":{ + "id":"SX0066", + "guide_link":"https://yamllint.readthedocs.io/", + "ignore_admin":[], + "check_files":".*\\.(yml|yaml)$", + "exclude_files":[] + }, "cpp17 feature check":{ "id":"SC0007", "guide_link":"", @@ -221,54 +261,7 @@ }, "checks_group":{ "external_checks":[ - "codespell_check","cpplint_check","gitleaks_check","hardcode_check","jsonlint_check","line_terminators_check","external_tools_check" + "codespell_check","cpplint_check","gitleaks_check","hardcode_check","jsonlint_check","line_terminators_check" ] - }, - "external_tools":[ - { - "name":"bandit", - "command":"bandit -r . -f json -o bandit_report.json", - "guide_link":"https://bandit.readthedocs.io/" - }, - { - "name":"mypy", - "command":"mypy . --ignore-missing-imports --show-error-codes", - "guide_link":"https://mypy-lang.org/" - }, - { - "name":"pylint", - "command":"pylint $(git ls-files '*.py') || true", - "guide_link":"https://pylint.pycqa.org/" - }, - { - "name":"safety", - "command":"safety check -r requirements.txt --full-report || true", - "guide_link":"https://pyup.io/safety/" - }, - { - "name":"detect-secrets", - "command":"detect-secrets scan --all-files --json > .secrets.json || true", - "guide_link":"https://github.com/Yelp/detect-secrets" - }, - { - "name":"checkov", - "command":"checkov -d . -o json || true", - "guide_link":"https://www.checkov.io/" - }, - { - "name":"hadolint", - "command":"hadolint Dockerfile -f json || true", - "guide_link":"https://github.com/hadolint/hadolint" - }, - { - "name":"tfsec", - "command":"tfsec . --format json --out tfsec_report.json || true", - "guide_link":"https://tfsec.dev/" - }, - { - "name":"yamllint", - "command":"yamllint -f parsable . || true", - "guide_link":"https://yamllint.readthedocs.io/" } - ] } diff --git a/requirements.txt b/requirements.txt index 7e36e35..a8ebcee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,10 @@ -chardet==4.0.0 -Requests==2.32.4 -tomli==2.2.1 - -bandit -mypy -pylint -safety -detect-secrets -checkov -yamllint - +setuptools==65.5.1 +wheel==0.37.1 +pip==21.3.1 +requests==2.22.0 +pylint==3.2.7 +bandit==1.7.10 +mypy==1.14.1 +safety==3.6.2 +detect-secrets==1.5.0 +yamllint==1.35.1