From d30b88b0b23307ff51eaefd96643c7657b6b1daf Mon Sep 17 00:00:00 2001
From: Sid J <6353716+sidnz@users.noreply.github.com>
Date: Wed, 11 Mar 2026 22:31:18 +1300
Subject: [PATCH 1/3] feat: watch mode with config
---
sentinel/cli.py | 108 +++++++++++++++++++++++-
tests/integration/test_cli.py | 153 ++++++++++++++++++++++++++++++++++
2 files changed, 260 insertions(+), 1 deletion(-)
diff --git a/sentinel/cli.py b/sentinel/cli.py
index c2a7741..70f5c3a 100644
--- a/sentinel/cli.py
+++ b/sentinel/cli.py
@@ -1,4 +1,4 @@
-"""sentinel CLI — 4 commands: config, probe, container, scan."""
+"""sentinel CLI — 5 commands: config, probe, container, scan, watch."""
from __future__ import annotations
import sys
@@ -175,6 +175,112 @@ def scan(
sys.exit(1)
+@cli.command()
+@click.option("--config", "config_path", default=None, type=click.Path())
+@click.option("--endpoint", default=None)
+@click.option("--container", "container_target", default=None)
+@click.option("--interval", default=60, type=int, help="Seconds between scans.")
+@click.option("--format", "fmt", default="terminal", type=_FMT_CHOICES)
+@click.option("--output", default=None, help="Write report to file on each cycle.")
+@click.option("--fail-on", default="high")
+@click.option(
+ "--on-change",
+ is_flag=True,
+ default=False,
+ help="Only emit a report when findings change.",
+)
+@click.option("--safe-mode/--no-safe-mode", default=True)
+@click.option("--timeout", default=10)
+def watch(
+ config_path: Optional[str],
+ endpoint: Optional[str],
+ container_target: Optional[str],
+ interval: int,
+ fmt: str,
+ output: Optional[str],
+ fail_on: str,
+ on_change: bool,
+ safe_mode: bool,
+ timeout: int,
+) -> None:
+ """Continuously monitor targets, re-scanning on a fixed interval."""
+ import signal
+ import time
+ from datetime import datetime, timezone
+
+ if not any([config_path, endpoint, container_target]):
+ click.echo("No targets specified. Use --config, --endpoint, or --container.", err=True)
+ sys.exit(2)
+
+ config_scanner = ConfigScanner() if config_path else None
+
+ probe_scanner = None
+ if endpoint:
+ from sentinel.modules.probe import ProbeScanner
+
+ probe_scanner = ProbeScanner(safe_mode=safe_mode)
+
+ container_scanner = None
+ if container_target:
+ from sentinel.modules.container import ContainerScanner
+
+ container_scanner = ContainerScanner()
+
+ _stop = False
+
+ def _handle_sigint(sig, frame) -> None: # noqa: ARG001
+ nonlocal _stop
+ _stop = True
+
+ signal.signal(signal.SIGINT, _handle_sigint)
+
+ previous_fingerprint: Optional[frozenset] = None
+ cycle = 0
+
+ click.echo(f"sentinel watch — scanning every {interval}s | Ctrl-C to stop\n")
+
+ while not _stop:
+ cycle += 1
+ now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
+
+ results = []
+ if config_scanner:
+ results.append(config_scanner.scan(Path(config_path))) # type: ignore[arg-type]
+ if probe_scanner:
+ results.append(probe_scanner.scan(endpoint, timeout=timeout)) # type: ignore[arg-type]
+ if container_scanner:
+ results.append(container_scanner.scan(container_target)) # type: ignore[arg-type]
+
+ fingerprint = frozenset(
+ (f.rule_id, f.location, f.detail) for r in results for f in r.findings
+ )
+ first_run = previous_fingerprint is None
+ changed = fingerprint != previous_fingerprint
+ previous_fingerprint = fingerprint
+
+ if not on_change or changed:
+ if fmt == "terminal" and not output:
+ separator = "─" * 60
+ status_parts = [f"Cycle {cycle}", now]
+ if not first_run and changed:
+ status_parts.append("CHANGED")
+ elif not first_run:
+ status_parts.append("no change")
+ click.echo(separator)
+ click.echo(" " + " | ".join(status_parts))
+ click.echo(separator)
+ _write_output(results, fmt, output)
+ else:
+ click.echo(f"[{now}] Cycle {cycle} — no change, skipping report.")
+
+ for _ in range(interval):
+ if _stop:
+ break
+ time.sleep(1)
+
+ click.echo("\nsentinel watch stopped.")
+
+
def main() -> None:
cli()
diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py
index b658768..b47d91b 100644
--- a/tests/integration/test_cli.py
+++ b/tests/integration/test_cli.py
@@ -3,7 +3,9 @@
from __future__ import annotations
import json
+import signal as _signal
from pathlib import Path
+from unittest.mock import patch
import pytest
from click.testing import CliRunner
@@ -167,6 +169,157 @@ def test_scan_fail_on_threshold_respected(self, runner, high_only_file):
assert result.exit_code == 0
+class TestWatchCommand:
+ """Tests for `sentinel watch`
+
+ As watch runs an infinite loop all tests patch ``signal.signal``
+ to capture the SIGINT handler and ``time.sleep`` to fire it after the
+ first sleep call, giving exactly one full scan cycle per invocation
+ """
+
+ @pytest.fixture
+ def _one_cycle(self):
+ """Patch signal + sleep so the watch loop executes exactly one cycle."""
+ captured = {}
+
+ def _mock_signal(sig, handler):
+ if sig == _signal.SIGINT:
+ captured["handler"] = handler
+
+ def _mock_sleep(_secs):
+ if "handler" in captured:
+ captured["handler"](None, None)
+
+ with patch("signal.signal", side_effect=_mock_signal), patch(
+ "time.sleep", side_effect=_mock_sleep
+ ):
+ yield
+
+ @pytest.fixture
+ def _two_cycles(self):
+ """Patch signal + sleep so the watch loop executes exactly two cycles."""
+ captured = {}
+ call_count = {"n": 0}
+
+ def _mock_signal(sig, handler):
+ if sig == _signal.SIGINT:
+ captured["handler"] = handler
+
+ def _mock_sleep(_secs):
+ call_count["n"] += 1
+ if call_count["n"] >= 2 and "handler" in captured:
+ captured["handler"](None, None)
+
+ with patch("signal.signal", side_effect=_mock_signal), patch(
+ "time.sleep", side_effect=_mock_sleep
+ ):
+ yield
+
+ def test_no_targets_exits_2(self, runner):
+ result = runner.invoke(cli, ["watch"])
+ assert result.exit_code == 2
+
+ def test_no_targets_prints_usage_hint(self, runner):
+ result = runner.invoke(cli, ["watch"])
+ assert "No targets" in result.output or "No targets" in (result.stderr or "")
+
+ def test_one_cycle_outputs_cycle_header(self, runner, insecure_file, _one_cycle):
+ result = runner.invoke(cli, ["watch", "--config", insecure_file, "--interval", "1"])
+ assert result.exit_code == 0
+ assert "Cycle 1" in result.output
+
+ def test_one_cycle_prints_stop_message(self, runner, insecure_file, _one_cycle):
+ result = runner.invoke(cli, ["watch", "--config", insecure_file, "--interval", "1"])
+ assert "sentinel watch stopped" in result.output
+
+ def test_one_cycle_contains_findings(self, runner, insecure_file, _one_cycle):
+ result = runner.invoke(cli, ["watch", "--config", insecure_file, "--interval", "1"])
+ assert "CFG-001" in result.output
+
+ def test_two_cycles_second_shows_no_change(self, runner, insecure_file, _two_cycles):
+ result = runner.invoke(cli, ["watch", "--config", insecure_file, "--interval", "1"])
+ assert "Cycle 2" in result.output
+ assert "no change" in result.output
+
+ def test_two_cycles_second_shows_changed_when_findings_differ(
+ self, runner, tmp_path, _two_cycles
+ ):
+ cfg = tmp_path / "mutable.json"
+ cfg.write_text(json.dumps(INSECURE_CONFIG))
+
+ call_count = {"n": 0}
+ original_scan = None
+
+ # clean config on the second scan cycle
+ from sentinel.modules.config import ConfigScanner
+
+ original_scan = ConfigScanner.scan
+
+ def _patched_scan(self, path):
+ call_count["n"] += 1
+ if call_count["n"] == 2:
+ import json as _json
+
+ from tests.fixtures.configs import SECURE_CONFIG
+
+ path.write_text(_json.dumps(SECURE_CONFIG))
+ return original_scan(self, path)
+
+ with patch.object(ConfigScanner, "scan", _patched_scan):
+ result = runner.invoke(
+ cli, ["watch", "--config", str(cfg), "--interval", "1"]
+ )
+
+ assert "CHANGED" in result.output
+
+ def test_on_change_suppresses_report_when_no_change(self, runner, insecure_file, _two_cycles):
+ result = runner.invoke(
+ cli,
+ ["watch", "--config", insecure_file, "--interval", "1", "--on-change"],
+ )
+ assert "no change, skipping report" in result.output
+ # header should appear once (cycle 1 only — cycle 2 is suppressed)
+ assert result.output.count("Cycle 1") == 1
+ assert result.output.count("CFG-001") == 1
+
+ def test_on_change_emits_report_on_first_cycle(self, runner, insecure_file, _one_cycle):
+ result = runner.invoke(
+ cli,
+ ["watch", "--config", insecure_file, "--interval", "1", "--on-change"],
+ )
+ assert "CFG-001" in result.output
+
+ def test_output_file_written_each_cycle(self, runner, insecure_file, tmp_path, _one_cycle):
+ out = str(tmp_path / "watch_report.json")
+ runner.invoke(
+ cli,
+ [
+ "watch",
+ "--config",
+ insecure_file,
+ "--format",
+ "json",
+ "--output",
+ out,
+ "--interval",
+ "1",
+ ],
+ )
+ assert Path(out).exists()
+ data = json.loads(Path(out).read_text())
+ assert "results" in data
+
+ def test_json_format_per_cycle(self, runner, insecure_file, tmp_path, _one_cycle):
+ out = str(tmp_path / "out.json")
+ runner.invoke(
+ cli,
+ ["watch", "--config", insecure_file, "--format", "json", "--output", out, "--interval", "1"],
+ )
+ data = json.loads(Path(out).read_text())
+ rule_ids = [f["rule_id"] for r in data["results"] for f in r["findings"]]
+ assert "CFG-001" in rule_ids
+
+
class TestVersionFlag:
def test_version_flag_exits_0(self, runner):
result = runner.invoke(cli, ["--version"])
From 5793df44d9c206be264de6a53f8dcca4b0f46d34 Mon Sep 17 00:00:00 2001
From: Sid J <6353716+sidnz@users.noreply.github.com>
Date: Wed, 11 Mar 2026 22:36:27 +1300
Subject: [PATCH 2/3] docs: update readme with watch mode details
---
README.md | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/README.md b/README.md
index a959c5a..a7835a0 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,7 @@ sentinel scans Model Context Protocol (MCP) server configurations, live endpoint
- **4 output formats** — terminal (Rich), HTML, JSON, SARIF 2.1
- **GitHub Action** — drop-in CI integration with SARIF upload support
- **Fail-on threshold** — block PRs on HIGH/CRITICAL findings
+- 👁️ **Watch mode** — continuous monitoring with configurable interval and change detection
---
@@ -51,6 +52,9 @@ sentinel container my-mcp-image:latest
# Run all scanners in one pass
sentinel scan --config mcp.json --endpoint https://mcp.example.com --container my-image:latest
+# Continuously monitor — re-scan every 60s, alert on change
+sentinel watch --config mcp.json --endpoint https://mcp.example.com --interval 60 --on-change
+
# Output as SARIF for GitHub Code Scanning
sentinel config mcp.json --format sarif --output sentinel.sarif.json
```
From e8667b38652747cd8ae2ad670574743bdeb7f45e Mon Sep 17 00:00:00 2001
From: Sid J <6353716+sidnz@users.noreply.github.com>
Date: Thu, 12 Mar 2026 18:27:32 +1300
Subject: [PATCH 3/3] lint: ruff format
---
sentinel/__init__.py | 1 +
sentinel/cli.py | 1 +
sentinel/core.py | 1 +
sentinel/modules/config.py | 37 +++++++++++------
sentinel/modules/container.py | 44 ++++++++++++--------
sentinel/modules/probe.py | 49 ++++++++++++++---------
sentinel/report/html.py | 10 ++---
sentinel/report/sarif.py | 75 ++++++++++++++++++++---------------
sentinel/report/terminal.py | 1 +
sentinel/rules/registry.py | 1 +
tests/fixtures/configs.py | 2 +-
tests/integration/test_cli.py | 28 ++++++++-----
tests/unit/test_config.py | 1 +
tests/unit/test_core.py | 1 +
tests/unit/test_report.py | 4 ++
tests/unit/test_rules.py | 1 +
16 files changed, 160 insertions(+), 97 deletions(-)
diff --git a/sentinel/__init__.py b/sentinel/__init__.py
index 8e9d104..1150131 100644
--- a/sentinel/__init__.py
+++ b/sentinel/__init__.py
@@ -1,2 +1,3 @@
"""sentinel — MCP security scanner by Helixar."""
+
__version__ = "0.1.0"
diff --git a/sentinel/cli.py b/sentinel/cli.py
index 70f5c3a..d08d6a3 100644
--- a/sentinel/cli.py
+++ b/sentinel/cli.py
@@ -1,4 +1,5 @@
"""sentinel CLI — 5 commands: config, probe, container, scan, watch."""
+
from __future__ import annotations
import sys
diff --git a/sentinel/core.py b/sentinel/core.py
index be1867a..040d840 100644
--- a/sentinel/core.py
+++ b/sentinel/core.py
@@ -1,4 +1,5 @@
"""Core data models for sentinel."""
+
from __future__ import annotations
from dataclasses import dataclass, field
diff --git a/sentinel/modules/config.py b/sentinel/modules/config.py
index 655ef94..4e03a6b 100644
--- a/sentinel/modules/config.py
+++ b/sentinel/modules/config.py
@@ -1,4 +1,5 @@
"""Static analysis of MCP server configuration files."""
+
from __future__ import annotations
import json
@@ -103,7 +104,8 @@ def _check_no_auth(self, config: dict, result: ScanResult, path: Path) -> None:
auth = _get_nested(config, "auth") or _get_nested(config, "authentication")
if auth is None:
f = self._make_finding(
- "no_auth", str(path),
+ "no_auth",
+ str(path),
"No 'auth' or 'authentication' block found in config.",
)
if f:
@@ -119,7 +121,8 @@ def _check_plaintext_secrets(self, config: dict, result: ScanResult, path: Path)
if value.startswith("$") or value.startswith("${"):
continue
f = self._make_finding(
- "plaintext_secrets", str(path),
+ "plaintext_secrets",
+ str(path),
f"Possible plaintext secret at key '{key}'.",
)
if f:
@@ -136,14 +139,16 @@ def _check_wildcard_permissions(self, config: dict, result: ScanResult, path: Pa
return
if isinstance(perms, str) and perms in ("*", "all"):
f = self._make_finding(
- "wildcard_permissions", str(path),
+ "wildcard_permissions",
+ str(path),
f"Tool permissions set to wildcard value '{perms}'.",
)
if f:
result.add_finding(f)
elif isinstance(perms, list) and ("*" in perms or "all" in perms):
f = self._make_finding(
- "wildcard_permissions", str(path),
+ "wildcard_permissions",
+ str(path),
"Tool permissions list contains wildcard entry.",
)
if f:
@@ -158,7 +163,8 @@ def _check_no_rate_limiting(self, config: dict, result: ScanResult, path: Path)
)
if rl is None:
f = self._make_finding(
- "no_rate_limiting", str(path),
+ "no_rate_limiting",
+ str(path),
"No rate_limit or throttle block found in config.",
)
if f:
@@ -169,7 +175,8 @@ def _check_debug_mode(self, config: dict, result: ScanResult, path: Path) -> Non
debug_on = ("true", "1", "yes", "on")
if debug is True or (isinstance(debug, str) and debug.lower() in debug_on):
f = self._make_finding(
- "debug_mode_enabled", str(path),
+ "debug_mode_enabled",
+ str(path),
f"'debug' is set to {debug!r} — disable in production.",
)
if f:
@@ -184,7 +191,8 @@ def _check_no_tls(self, config: dict, result: ScanResult, path: Path) -> None:
)
if tls is None:
f = self._make_finding(
- "no_tls_config", str(path),
+ "no_tls_config",
+ str(path),
"No tls/ssl block found in config. Transport encryption not configured.",
)
if f:
@@ -204,14 +212,16 @@ def _check_wildcard_cors(self, config: dict, result: ScanResult, path: Path) ->
if origins == "*":
f = self._make_finding(
- "wildcard_cors", str(path),
+ "wildcard_cors",
+ str(path),
"CORS allowed_origins is set to '*', permitting all origins.",
)
if f:
result.add_finding(f)
elif isinstance(origins, list) and "*" in origins:
f = self._make_finding(
- "wildcard_cors", str(path),
+ "wildcard_cors",
+ str(path),
"CORS allowed_origins list contains wildcard '*'.",
)
if f:
@@ -225,7 +235,8 @@ def _check_no_input_validation(self, config: dict, result: ScanResult, path: Pat
)
if iv is None:
f = self._make_finding(
- "no_input_validation", str(path),
+ "no_input_validation",
+ str(path),
"No input_validation block found in config.",
)
if f:
@@ -247,7 +258,8 @@ def _check_sensitive_logging(self, config: dict, result: ScanResult, path: Path)
flag_on = ("true", "1", "yes")
if flag_val is True or (isinstance(flag_val, str) and flag_val.lower() in flag_on):
f = self._make_finding(
- "sensitive_logging", str(path),
+ "sensitive_logging",
+ str(path),
f"Logging config has '{flag_name}' enabled — may expose sensitive data.",
)
if f:
@@ -262,7 +274,8 @@ def _check_no_timeout(self, config: dict, result: ScanResult, path: Path) -> Non
)
if timeout is None:
f = self._make_finding(
- "no_timeout", str(path),
+ "no_timeout",
+ str(path),
"No timeout or timeout_seconds field found in config.",
)
if f:
diff --git a/sentinel/modules/container.py b/sentinel/modules/container.py
index f68c64e..f7ba5d1 100644
--- a/sentinel/modules/container.py
+++ b/sentinel/modules/container.py
@@ -1,4 +1,5 @@
"""Docker container and image security inspection."""
+
from __future__ import annotations
import re
@@ -58,13 +59,15 @@ def scan(self, target: str) -> ScanResult:
pass
if container_obj is None and image_obj is None:
- result.add_finding(Finding(
- rule_id="CTR-ERR",
- severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO,
- title="Container/image not found",
- detail=f"Could not find container or image: {target}",
- location=target,
- ))
+ result.add_finding(
+ Finding(
+ rule_id="CTR-ERR",
+ severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO,
+ title="Container/image not found",
+ detail=f"Could not find container or image: {target}",
+ location=target,
+ )
+ )
return result
if container_obj is not None:
@@ -103,7 +106,8 @@ def _check_running_as_root(self, config: dict, result: ScanResult, target: str)
user = config.get("User", "")
if user in ("", "0", "root"):
f = self._make_finding(
- "running_as_root", target,
+ "running_as_root",
+ target,
f"Container User is {user!r} — process runs as UID 0 (root).",
)
if f:
@@ -112,7 +116,8 @@ def _check_running_as_root(self, config: dict, result: ScanResult, target: str)
def _check_privileged(self, host_config: dict, result: ScanResult, target: str) -> None:
if host_config.get("Privileged", False):
f = self._make_finding(
- "privileged_container", target,
+ "privileged_container",
+ target,
"Container is running with --privileged flag.",
)
if f:
@@ -124,7 +129,8 @@ def _check_resource_limits(self, host_config: dict, result: ScanResult, target:
cpu_quota = host_config.get("CpuQuota", 0)
if not memory and not nano_cpus and not cpu_quota:
f = self._make_finding(
- "no_resource_limits", target,
+ "no_resource_limits",
+ target,
"No memory or CPU limits set on container.",
)
if f:
@@ -138,7 +144,8 @@ def _check_sensitive_env_vars(self, config: dict, result: ScanResult, target: st
key, _, value = entry.partition("=")
if _SECRET_KEY_RE.search(key) and value and not value.startswith("$"):
f = self._make_finding(
- "sensitive_env_vars", target,
+ "sensitive_env_vars",
+ target,
f"Environment variable '{key}' appears to contain a plaintext secret.",
)
if f:
@@ -151,7 +158,8 @@ def _check_writable_filesystem(
read_only = host_config.get("ReadonlyRootfs", False)
if not read_only:
f = self._make_finding(
- "writable_filesystem", target,
+ "writable_filesystem",
+ target,
"Container root filesystem is writable (ReadonlyRootfs=false).",
)
if f:
@@ -161,7 +169,8 @@ def _check_health_check(self, config: dict, result: ScanResult, target: str) ->
health = config.get("Healthcheck")
if health is None:
f = self._make_finding(
- "no_health_check", target,
+ "no_health_check",
+ target,
"No HEALTHCHECK configured in container/image.",
)
if f:
@@ -178,7 +187,8 @@ def _check_outdated_image(
pass # image has OCI metadata, considered maintained
elif not created:
f = self._make_finding(
- "outdated_base_image", target,
+ "outdated_base_image",
+ target,
"Image has no creation timestamp — may be using an outdated or untracked base.",
)
if f:
@@ -191,7 +201,8 @@ def _check_dangerous_ports(self, host_config: dict, result: ScanResult, target:
port_num = int(port_proto.split("/")[0])
if port_num in _DANGEROUS_PORTS:
f = self._make_finding(
- "dangerous_ports", target,
+ "dangerous_ports",
+ target,
f"Dangerous port {port_num} is exposed on the container.",
)
if f:
@@ -207,7 +218,8 @@ def _check_dangerous_ports_image(self, config: dict, result: ScanResult, target:
port_num = int(port_proto.split("/")[0])
if port_num in _DANGEROUS_PORTS:
f = self._make_finding(
- "dangerous_ports", target,
+ "dangerous_ports",
+ target,
f"Image exposes dangerous port {port_num}.",
)
if f:
diff --git a/sentinel/modules/probe.py b/sentinel/modules/probe.py
index 2b5badf..60d6114 100644
--- a/sentinel/modules/probe.py
+++ b/sentinel/modules/probe.py
@@ -1,4 +1,5 @@
"""Live MCP endpoint security analysis."""
+
from __future__ import annotations
import socket
@@ -57,13 +58,15 @@ def scan(self, endpoint: str, timeout: int = 10) -> ScanResult:
try:
resp = requests.get(endpoint, timeout=timeout, verify=False, allow_redirects=False)
except RequestException as exc:
- result.add_finding(Finding(
- rule_id="PRB-ERR",
- severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO,
- title="Endpoint unreachable",
- detail=str(exc),
- location=endpoint,
- ))
+ result.add_finding(
+ Finding(
+ rule_id="PRB-ERR",
+ severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO,
+ title="Endpoint unreachable",
+ detail=str(exc),
+ location=endpoint,
+ )
+ )
return result
self._check_no_auth(resp, result, endpoint)
@@ -88,12 +91,11 @@ def _check_tls_certificate(self, parsed, result: ScanResult, timeout: int) -> No
not_after = cert.get("notAfter", "")
if not_after:
cert_fmt = "%b %d %H:%M:%S %Y %Z"
- expiry = datetime.strptime(not_after, cert_fmt).replace(
- tzinfo=timezone.utc
- )
+ expiry = datetime.strptime(not_after, cert_fmt).replace(tzinfo=timezone.utc)
if expiry < datetime.now(timezone.utc):
f = self._make_finding(
- "tls_cert_invalid", f"{host}:{port}",
+ "tls_cert_invalid",
+ f"{host}:{port}",
f"Certificate expired on {not_after}.",
)
if f:
@@ -121,7 +123,8 @@ def _check_tls_version(self, parsed, result: ScanResult, timeout: int) -> None:
version = ssock.version()
if version in ("SSLv3", "TLSv1", "TLSv1.1"):
f = self._make_finding(
- "weak_tls_version", f"{host}:{port}",
+ "weak_tls_version",
+ f"{host}:{port}",
f"Server negotiated {version} — upgrade to TLS 1.2+.",
)
if f:
@@ -132,7 +135,8 @@ def _check_tls_version(self, parsed, result: ScanResult, timeout: int) -> None:
def _check_no_auth(self, resp, result: ScanResult, endpoint: str) -> None:
if resp.status_code not in (401, 403):
f = self._make_finding(
- "no_auth_endpoint", endpoint,
+ "no_auth_endpoint",
+ endpoint,
f"Unauthenticated GET returned HTTP {resp.status_code} (expected 401/403).",
)
if f:
@@ -143,7 +147,8 @@ def _check_info_disclosure_headers(self, resp, result: ScanResult, endpoint: str
value = resp.headers.get(header, "")
if value and any(char.isdigit() for char in value):
f = self._make_finding(
- "info_disclosure_headers", endpoint,
+ "info_disclosure_headers",
+ endpoint,
f"Header '{header}: {value}' discloses version information.",
)
if f:
@@ -155,7 +160,8 @@ def _check_missing_security_headers(self, resp, result: ScanResult, endpoint: st
missing = [h for h in _SECURITY_HEADERS if h not in lower_headers]
if missing:
f = self._make_finding(
- "missing_security_headers", endpoint,
+ "missing_security_headers",
+ endpoint,
f"Missing security headers: {', '.join(missing)}.",
)
if f:
@@ -164,12 +170,14 @@ def _check_missing_security_headers(self, resp, result: ScanResult, endpoint: st
def _check_tool_listing_exposed(self, endpoint: str, result: ScanResult, timeout: int) -> None:
try:
import requests
+
base = endpoint.rstrip("/")
list_url = f"{base}/tools/list"
resp = requests.get(list_url, timeout=timeout, verify=False)
if resp.status_code == 200:
f = self._make_finding(
- "tool_listing_exposed", list_url,
+ "tool_listing_exposed",
+ list_url,
f"GET {list_url} returned 200 without authentication.",
)
if f:
@@ -182,13 +190,15 @@ def _check_verbose_errors(self, endpoint: str, result: ScanResult, timeout: int)
return
try:
import requests
+
bad_url = endpoint.rstrip("/") + "/nonexistent-sentinel-probe-12345"
resp = requests.get(bad_url, timeout=timeout, verify=False)
body = resp.text.lower()
- verbose_markers = ["traceback", "stack trace", "exception", "at line", "file \""]
+ verbose_markers = ["traceback", "stack trace", "exception", "at line", 'file "']
if any(marker in body for marker in verbose_markers):
f = self._make_finding(
- "verbose_errors", bad_url,
+ "verbose_errors",
+ bad_url,
"Error response contains stack trace or verbose debug information.",
)
if f:
@@ -204,7 +214,8 @@ def _check_rate_limiting(self, resp, result: ScanResult, endpoint: str) -> None:
)
if not has_rl and resp.status_code != 429:
f = self._make_finding(
- "no_rate_limiting_probe", endpoint,
+ "no_rate_limiting_probe",
+ endpoint,
"No rate-limiting headers detected in response (X-RateLimit-Limit, Retry-After).",
)
if f:
diff --git a/sentinel/report/html.py b/sentinel/report/html.py
index 0c5cc38..de776c3 100644
--- a/sentinel/report/html.py
+++ b/sentinel/report/html.py
@@ -1,4 +1,5 @@
"""Standalone HTML report renderer."""
+
from __future__ import annotations
from datetime import datetime, timezone
@@ -102,8 +103,7 @@ def render(results: List[ScanResult]) -> str:
for f in sorted(result.findings, key=lambda x: x.severity, reverse=True):
color = _SEVERITY_CSS.get(f.severity, "#6b7280")
ref_link = (
- f''
- f'{f.rule_id}'
+ f'{f.rule_id}'
if f.reference
else f.rule_id
)
@@ -150,9 +150,5 @@ def render(results: List[ScanResult]) -> str:
def _html_escape(text: str) -> str:
return (
- text
- .replace("&", "&")
- .replace("<", "<")
- .replace(">", ">")
- .replace('"', """)
+ text.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """)
)
diff --git a/sentinel/report/sarif.py b/sentinel/report/sarif.py
index 715a8e6..4635844 100644
--- a/sentinel/report/sarif.py
+++ b/sentinel/report/sarif.py
@@ -1,4 +1,5 @@
"""SARIF 2.1 and plain JSON output renderers."""
+
from __future__ import annotations
import json
@@ -16,7 +17,9 @@
}
SENTINEL_VERSION = "0.1.0"
-SARIF_SCHEMA = "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json"
+SARIF_SCHEMA = (
+ "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json"
+)
def render_sarif(results: List[ScanResult]) -> Dict[str, Any]:
@@ -40,23 +43,27 @@ def render_sarif(results: List[ScanResult]) -> Dict[str, Any]:
},
}
level = _SARIF_LEVEL.get(f.severity.value, "warning")
- run_results.append({
- "ruleId": f.rule_id,
- "level": level,
- "message": {"text": f.detail},
- "locations": [
- {
- "physicalLocation": {
- "artifactLocation": {"uri": f.location},
+ run_results.append(
+ {
+ "ruleId": f.rule_id,
+ "level": level,
+ "message": {"text": f.detail},
+ "locations": [
+ {
+ "physicalLocation": {
+ "artifactLocation": {"uri": f.location},
+ }
}
- }
- ],
- "fixes": [
- {
- "description": {"text": f.remediation},
- }
- ] if f.remediation else [],
- })
+ ],
+ "fixes": [
+ {
+ "description": {"text": f.remediation},
+ }
+ ]
+ if f.remediation
+ else [],
+ }
+ )
sarif = {
"$schema": SARIF_SCHEMA,
@@ -97,22 +104,24 @@ def render_json(results: List[ScanResult]) -> Dict[str, Any]:
"results": [],
}
for result in results:
- output["results"].append({
- "module": result.module,
- "target": result.target,
- "findings": [
- {
- "rule_id": f.rule_id,
- "severity": f.severity.value,
- "title": f.title,
- "detail": f.detail,
- "location": f.location,
- "remediation": f.remediation,
- "reference": f.reference,
- }
- for f in result.findings
- ],
- })
+ output["results"].append(
+ {
+ "module": result.module,
+ "target": result.target,
+ "findings": [
+ {
+ "rule_id": f.rule_id,
+ "severity": f.severity.value,
+ "title": f.title,
+ "detail": f.detail,
+ "location": f.location,
+ "remediation": f.remediation,
+ "reference": f.reference,
+ }
+ for f in result.findings
+ ],
+ }
+ )
return output
diff --git a/sentinel/report/terminal.py b/sentinel/report/terminal.py
index 70748e3..11b7ccc 100644
--- a/sentinel/report/terminal.py
+++ b/sentinel/report/terminal.py
@@ -1,4 +1,5 @@
"""Terminal output renderer — Rich if available, plain fallback."""
+
from __future__ import annotations
from typing import List
diff --git a/sentinel/rules/registry.py b/sentinel/rules/registry.py
index c92a27c..beb4ce3 100644
--- a/sentinel/rules/registry.py
+++ b/sentinel/rules/registry.py
@@ -1,4 +1,5 @@
"""Rule registry — loads and indexes rules.yaml."""
+
from __future__ import annotations
from pathlib import Path
diff --git a/tests/fixtures/configs.py b/tests/fixtures/configs.py
index bcaeb9f..746d9fc 100644
--- a/tests/fixtures/configs.py
+++ b/tests/fixtures/configs.py
@@ -36,7 +36,7 @@
# no auth block
# no tls block
# no rate_limit block
- "api_key": "sk-abc123supersecretkey", # plaintext secret
+ "api_key": "sk-abc123supersecretkey", # plaintext secret
"debug": True,
"cors": {
"allowed_origins": "*",
diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py
index b47d91b..c4c55eb 100644
--- a/tests/integration/test_cli.py
+++ b/tests/integration/test_cli.py
@@ -173,7 +173,7 @@ class TestWatchCommand:
"""Tests for `sentinel watch`
As watch runs an infinite loop all tests patch ``signal.signal``
- to capture the SIGINT handler and ``time.sleep`` to fire it after the
+ to capture the SIGINT handler and ``time.sleep`` to fire it after the
first sleep call, giving exactly one full scan cycle per invocation
"""
@@ -190,8 +190,9 @@ def _mock_sleep(_secs):
if "handler" in captured:
captured["handler"](None, None)
- with patch("signal.signal", side_effect=_mock_signal), patch(
- "time.sleep", side_effect=_mock_sleep
+ with (
+ patch("signal.signal", side_effect=_mock_signal),
+ patch("time.sleep", side_effect=_mock_sleep),
):
yield
@@ -210,8 +211,9 @@ def _mock_sleep(_secs):
if call_count["n"] >= 2 and "handler" in captured:
captured["handler"](None, None)
- with patch("signal.signal", side_effect=_mock_signal), patch(
- "time.sleep", side_effect=_mock_sleep
+ with (
+ patch("signal.signal", side_effect=_mock_signal),
+ patch("time.sleep", side_effect=_mock_sleep),
):
yield
@@ -266,9 +268,7 @@ def _patched_scan(self, path):
return original_scan(self, path)
with patch.object(ConfigScanner, "scan", _patched_scan):
- result = runner.invoke(
- cli, ["watch", "--config", str(cfg), "--interval", "1"]
- )
+ result = runner.invoke(cli, ["watch", "--config", str(cfg), "--interval", "1"])
assert "CHANGED" in result.output
@@ -313,7 +313,17 @@ def test_json_format_per_cycle(self, runner, insecure_file, tmp_path, _one_cycle
out = str(tmp_path / "out.json")
runner.invoke(
cli,
- ["watch", "--config", insecure_file, "--format", "json", "--output", out, "--interval", "1"],
+ [
+ "watch",
+ "--config",
+ insecure_file,
+ "--format",
+ "json",
+ "--output",
+ out,
+ "--interval",
+ "1",
+ ],
)
data = json.loads(Path(out).read_text())
rule_ids = [f["rule_id"] for r in data["results"] for f in r["findings"]]
diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py
index 25601dc..be20049 100644
--- a/tests/unit/test_config.py
+++ b/tests/unit/test_config.py
@@ -1,4 +1,5 @@
"""25 tests for ConfigScanner."""
+
from __future__ import annotations
import json
diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py
index cd21ff0..22c46e2 100644
--- a/tests/unit/test_core.py
+++ b/tests/unit/test_core.py
@@ -1,4 +1,5 @@
"""9 tests for core data models: Severity, Finding, ScanResult."""
+
import pytest
from sentinel.core import Finding, ScanResult, Severity
diff --git a/tests/unit/test_report.py b/tests/unit/test_report.py
index 6e1b916..4d1b7d0 100644
--- a/tests/unit/test_report.py
+++ b/tests/unit/test_report.py
@@ -1,4 +1,5 @@
"""17 tests for HTML, SARIF, and terminal renderers."""
+
from __future__ import annotations
from sentinel.core import Finding, ScanResult, Severity
@@ -33,6 +34,7 @@ def _make_finding(
# ── Terminal renderer ──────────────────────────────────────────────────────
+
class TestTerminalRenderer:
def test_plain_output_is_string(self):
r = _make_result()
@@ -78,6 +80,7 @@ def test_severity_counts_in_summary(self):
# ── HTML renderer ──────────────────────────────────────────────────────────
+
class TestHTMLRenderer:
def test_returns_string(self):
output = html_report.render([_make_result()])
@@ -113,6 +116,7 @@ def test_severity_badge_in_html(self):
# ── SARIF renderer ─────────────────────────────────────────────────────────
+
class TestSARIFRenderer:
def test_sarif_returns_dict(self):
sarif = sarif_report.render_sarif([_make_result()])
diff --git a/tests/unit/test_rules.py b/tests/unit/test_rules.py
index 8ee3af4..c2321ac 100644
--- a/tests/unit/test_rules.py
+++ b/tests/unit/test_rules.py
@@ -1,4 +1,5 @@
"""11 tests for RuleRegistry."""
+
from sentinel.core import Severity
from sentinel.rules import RuleRegistry