Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN sed -i s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g /etc/apk/repositories \
&& echo "http://mirrors.aliyun.com/alpine/latest-stable/community/" >> /etc/apk/repositories \
&& apk update \
&& apk add --no-cache git wget readline-dev bash cloc file curl openssl \
tzdata zlib zlib-dev git-lfs gojq sqlite-dev \
tzdata zlib zlib-dev git-lfs gojq sqlite-dev build-base libffi-dev \
&& apk cache clean \
&& cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

Expand Down Expand Up @@ -50,12 +50,15 @@ RUN wget -P /tmp https://github.com/astral-sh/ruff/releases/download/0.6.4/ruff-
&& cd ../ \
&& rm -rf ruff-x86_64-unknown-linux-gnu/ ruff-x86_64-unknown-linux-gnu.tar.gz

#### add module package
RUN python3.8 -m pip install --no-cache-dir -U setuptools==59.6.0 wheel==0.37.1 pip==21.3.1 requests==2.22.0 pylint==3.2.7 lizard==1.17.31
RUN wget https://github.com/hadolint/hadolint/releases/download/v2.12.0/hadolint-Linux-x86_64 -O /usr/local/bin/hadolint \
&& chmod +x /usr/local/bin/hadolint

#### add project
COPY . /sast/

#### add module package
RUN python3.8 -m pip install --no-cache-dir -U -r /sast/requirements.txt

WORKDIR /app

RUN chmod -R 777 /app && chmod -R 777 /sast/
Expand Down
124 changes: 124 additions & 0 deletions checkers/bandit_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3
#
# Copyright 2023-2025 Enflame. All Rights Reserved.
#

import subprocess
import os
import re
import sys
import json
import tempfile
from pathlib import Path

CHECKERS_DIR = Path(__file__).resolve().parent
REPO_DIR = CHECKERS_DIR.parent
sys.path.append(str(REPO_DIR))
from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck
from common.config_parser import *


def excepthook(exctype, value, traceback):
if exctype == AssertionError:
pass
else:
sys.__excepthook__(exctype, value, traceback)


sys.excepthook = excepthook


def _norm_path(path):
return path.lstrip("./").replace("//", "/") if path else path


class CIChecker(CICheckerCommon):
def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck):
self.check_name = "bandit check"
super().__init__(api_init, args, check_api_type, self.check_name, static_check)
self.local_ci_check = True
self.local_workspace_check = True
self.command_output = {}

def filter_file(self):
return [
x for x in self.add_or_changed_files
if re.match(self.check_files, x) and os.path.isfile(x)
and not any(re.match(y, x) for y in self.exclude_files)
]

def check_func(self):
self.check_files_list = self.filter_file()
if not self.check_files_list:
return self.check_report()
self.diff_info = self.get_diff_info()
for file_path in self.check_files_list:
self.files_static_check_status[file_path] = {"check_status": True}
self.command_output[file_path] = ""

with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
report_path = f.name
try:
cmd = "bandit --severity-level high -s B602 -f json -o {} {}".format(
report_path,
" ".join(repr(p) for p in self.check_files_list)
)
pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
shell=True, executable="/bin/bash")
pipe.communicate()

if not os.path.isfile(report_path):
return self.check_report()

try:
with open(report_path, 'r', encoding='utf-8', errors='ignore') as rf:
data = json.load(rf)
except (json.JSONDecodeError, OSError):
return self.check_report()
results = data.get("results", [])
for r in results:
fpath = _norm_path(r.get("filename", ""))
if fpath not in self.files_static_check_status:
continue
line_number = r.get("line_number")
add_lines = [x[0] for x in self.diff_info.get(fpath, {}).get("add", [])]
if (line_number == 0 and fpath in self.add_files) or (line_number and line_number in add_lines):
self.files_static_check_status[fpath]["check_status"] = False
self.pass_flag = False
msg = "{}:{}: {} [{}]".format(
r.get("filename", fpath),
line_number,
r.get("issue_text", ""),
r.get("test_id", "")
)
self.command_output[fpath] = self.command_output.get(fpath, "") + msg + "\n"
finally:
if os.path.isfile(report_path):
os.unlink(report_path)
return self.check_report()

def check_report(self):
QualityCodexCommitee.FormatOutputSimple(
self.check_name, self.pass_flag, self.id,
self.files_static_check_status, CHECK_LEVEL
)
for file_path, check_status in self.files_static_check_status.items():
if not check_status['check_status']:
hook_data_item = {"file": file_path, "message": [], "result": "fail"}
print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path))
msg = self.command_output.get(file_path, "")
for one_msg in msg.split("\n"):
if one_msg:
hook_data_item["message"].append(one_msg)
print("\t\t" + one_msg.replace("\n", ""))
self.hook_data.append(hook_data_item)
else:
self.hook_data.append({"file": file_path, "message": [], "result": "pass"})
if not self.pass_flag:
print("\tPlease review guide link:{}".format(self.guide_link))
assert self.pass_flag, "Failed {}".format(self.id)


if __name__ == "__main__":
checker = CIChecker(None, None, None)
checker.check()
111 changes: 111 additions & 0 deletions checkers/detect_secrets_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/usr/bin/env python3
#
# Copyright 2023-2025 Enflame. All Rights Reserved.
#

import subprocess
import os
import re
import sys
import json
from pathlib import Path

CHECKERS_DIR = Path(__file__).resolve().parent
REPO_DIR = CHECKERS_DIR.parent
sys.path.append(str(REPO_DIR))
from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck
from common.config_parser import *


def excepthook(exctype, value, traceback):
if exctype == AssertionError:
pass
else:
sys.__excepthook__(exctype, value, traceback)


sys.excepthook = excepthook


def _norm_path(path):
return path.lstrip("./").replace("//", "/") if path else path


class CIChecker(CICheckerCommon):
def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck):
self.check_name = "detect-secrets check"
super().__init__(api_init, args, check_api_type, self.check_name, static_check)
self.local_ci_check = True
self.local_workspace_check = True
self.command_output = {}

def check_func(self):
if not self.add_or_changed_files:
return self.check_report()
self.diff_info = self.get_diff_info()
for file_path in self.add_or_changed_files:
if not os.path.isfile(file_path):
continue
self.files_static_check_status[file_path] = {"check_status": True}
self.command_output[file_path] = ""

files_arg = " ".join(repr(p) for p in self.add_or_changed_files if os.path.isfile(p))
if not files_arg:
return self.check_report()
cmd = "detect-secrets scan {} 2>/dev/null".format(files_arg)
pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
shell=True, executable="/bin/bash")
stdout, _ = pipe.communicate()
out = stdout.decode('utf-8', errors='ignore').strip()
if not out:
return self.check_report()
try:
data = json.loads(out)
except json.JSONDecodeError:
return self.check_report()
results = data.get("results", {})
for fpath, secrets in results.items():
fpath_norm = _norm_path(fpath)
if fpath_norm not in self.files_static_check_status:
continue
add_lines = [x[0] for x in self.diff_info.get(fpath_norm, {}).get("add", [])]
for s in secrets:
if not isinstance(s, dict):
continue
line_number = s.get("line_number")
if line_number is None:
line_number = s.get("line")
if (line_number == 0 and fpath_norm in self.add_files) or (line_number and line_number in add_lines):
self.files_static_check_status[fpath_norm]["check_status"] = False
self.pass_flag = False
msg = "{}:{}: potential secret (type: {})".format(
fpath_norm, line_number, s.get("type", "unknown")
)
self.command_output[fpath_norm] = self.command_output.get(fpath_norm, "") + msg + "\n"
return self.check_report()

def check_report(self):
QualityCodexCommitee.FormatOutputSimple(
self.check_name, self.pass_flag, self.id,
self.files_static_check_status, CHECK_LEVEL
)
for file_path, check_status in self.files_static_check_status.items():
if not check_status['check_status']:
hook_data_item = {"file": file_path, "message": [], "result": "fail"}
print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path))
msg = self.command_output.get(file_path, "")
for one_msg in msg.split("\n"):
if one_msg:
hook_data_item["message"].append(one_msg)
print("\t\t" + one_msg.replace("\n", ""))
self.hook_data.append(hook_data_item)
else:
self.hook_data.append({"file": file_path, "message": [], "result": "pass"})
if not self.pass_flag:
print("\tPlease review guide link:{}".format(self.guide_link))
assert self.pass_flag, "Failed {}".format(self.id)


if __name__ == "__main__":
checker = CIChecker(None, None, None)
checker.check()
122 changes: 122 additions & 0 deletions checkers/hadolint_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python3
#
# Copyright 2023-2025 Enflame. All Rights Reserved.
#

import subprocess
import os
import re
import sys
import json
from pathlib import Path

CHECKERS_DIR = Path(__file__).resolve().parent
REPO_DIR = CHECKERS_DIR.parent
sys.path.append(str(REPO_DIR))
from common.static_check_common import QualityCodexCommitee, CICheckerCommon, StaticCheck
from common.config_parser import *


def excepthook(exctype, value, traceback):
if exctype == AssertionError:
pass
else:
sys.__excepthook__(exctype, value, traceback)


sys.excepthook = excepthook


class CIChecker(CICheckerCommon):
def __init__(self, api_init=None, args=None, check_api_type=None, static_check=StaticCheck):
self.check_name = "hadolint check"
super().__init__(api_init, args, check_api_type, self.check_name, static_check)
self.local_ci_check = True
self.local_workspace_check = True
self.command_output = {}

def filter_file(self):
return [
x for x in self.add_or_changed_files
if re.match(self.check_files, x) and os.path.isfile(x)
and not any(re.match(y, x) for y in self.exclude_files)
]

def check_func(self):
self.check_files_list = self.filter_file()
if not self.check_files_list:
return self.check_report()
self.diff_info = self.get_diff_info()
for file_path in self.check_files_list:
self.files_static_check_status[file_path] = {"check_status": True}
self.command_output[file_path] = ""

for file_path in self.check_files_list:
cmd = "hadolint -f json {} 2>/dev/null || true".format(repr(file_path))
pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
shell=True, executable="/bin/bash")
stdout, _ = pipe.communicate()
out = stdout.decode('utf-8', errors='ignore').strip()
add_lines_number = [x[0] for x in self.diff_info.get(file_path, {}).get("add", [])]
if not out:
continue
try:
data = json.loads(out)
except json.JSONDecodeError:
continue
# hadolint JSON: array of { "line": N, "message": "...", "code": "..." } or similar
if isinstance(data, list):
issues = data
elif isinstance(data, dict):
issues = data.get("", data.get("result", []))
if not isinstance(issues, list):
issues = []
else:
issues = []
for issue in issues:
if not isinstance(issue, dict):
continue
line_number = issue.get("line") or issue.get("lineNumber") or issue.get("startLine")
if line_number is None:
continue
try:
line_number = int(line_number)
except (ValueError, TypeError):
continue
if (line_number == 0 and file_path in self.add_files) or (line_number in add_lines_number):
self.files_static_check_status[file_path]["check_status"] = False
self.pass_flag = False
msg = "{}:{}: {} [{}]".format(
file_path, line_number,
issue.get("message", issue.get("description", "")),
issue.get("code", "")
)
self.command_output[file_path] = self.command_output.get(file_path, "") + msg + "\n"

return self.check_report()

def check_report(self):
QualityCodexCommitee.FormatOutputSimple(
self.check_name, self.pass_flag, self.id,
self.files_static_check_status, CHECK_LEVEL
)
for file_path, check_status in self.files_static_check_status.items():
if not check_status['check_status']:
hook_data_item = {"file": file_path, "message": [], "result": "fail"}
print("\t" + CRED + CHECK_LEVEL + CEND + ": {}".format(file_path))
msg = self.command_output.get(file_path, "")
for one_msg in msg.split("\n"):
if one_msg:
hook_data_item["message"].append(one_msg)
print("\t\t" + one_msg.replace("\n", ""))
self.hook_data.append(hook_data_item)
else:
self.hook_data.append({"file": file_path, "message": [], "result": "pass"})
if not self.pass_flag:
print("\tPlease review guide link:{}".format(self.guide_link))
assert self.pass_flag, "Failed {}".format(self.id)


if __name__ == "__main__":
checker = CIChecker(None, None, None)
checker.check()
Loading