diff --git a/README.md b/README.md index a959c5a..a7835a0 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ sentinel scans Model Context Protocol (MCP) server configurations, live endpoint - **4 output formats** — terminal (Rich), HTML, JSON, SARIF 2.1 - **GitHub Action** — drop-in CI integration with SARIF upload support - **Fail-on threshold** — block PRs on HIGH/CRITICAL findings +- 👁️ **Watch mode** — continuous monitoring with configurable interval and change detection --- @@ -51,6 +52,9 @@ sentinel container my-mcp-image:latest # Run all scanners in one pass sentinel scan --config mcp.json --endpoint https://mcp.example.com --container my-image:latest +# Continuously monitor — re-scan every 60s, alert on change +sentinel watch --config mcp.json --endpoint https://mcp.example.com --interval 60 --on-change + # Output as SARIF for GitHub Code Scanning sentinel config mcp.json --format sarif --output sentinel.sarif.json ``` diff --git a/sentinel/__init__.py b/sentinel/__init__.py index 8e9d104..1150131 100644 --- a/sentinel/__init__.py +++ b/sentinel/__init__.py @@ -1,2 +1,3 @@ """sentinel — MCP security scanner by Helixar.""" + __version__ = "0.1.0" diff --git a/sentinel/cli.py b/sentinel/cli.py index c2a7741..d08d6a3 100644 --- a/sentinel/cli.py +++ b/sentinel/cli.py @@ -1,4 +1,5 @@ -"""sentinel CLI — 4 commands: config, probe, container, scan.""" +"""sentinel CLI — 5 commands: config, probe, container, scan, watch.""" + from __future__ import annotations import sys @@ -175,6 +176,112 @@ def scan( sys.exit(1) +@cli.command() +@click.option("--config", "config_path", default=None, type=click.Path()) +@click.option("--endpoint", default=None) +@click.option("--container", "container_target", default=None) +@click.option("--interval", default=60, type=int, help="Seconds between scans.") +@click.option("--format", "fmt", default="terminal", type=_FMT_CHOICES) +@click.option("--output", default=None, help="Write report to file on each cycle.") +@click.option("--fail-on", default="high") +@click.option( + "--on-change", + is_flag=True, + default=False, + help="Only emit a report when findings change.", +) +@click.option("--safe-mode/--no-safe-mode", default=True) +@click.option("--timeout", default=10) +def watch( + config_path: Optional[str], + endpoint: Optional[str], + container_target: Optional[str], + interval: int, + fmt: str, + output: Optional[str], + fail_on: str, + on_change: bool, + safe_mode: bool, + timeout: int, +) -> None: + """Continuously monitor targets, re-scanning on a fixed interval.""" + import signal + import time + from datetime import datetime, timezone + + if not any([config_path, endpoint, container_target]): + click.echo("No targets specified. Use --config, --endpoint, or --container.", err=True) + sys.exit(2) + + config_scanner = ConfigScanner() if config_path else None + + probe_scanner = None + if endpoint: + from sentinel.modules.probe import ProbeScanner + + probe_scanner = ProbeScanner(safe_mode=safe_mode) + + container_scanner = None + if container_target: + from sentinel.modules.container import ContainerScanner + + container_scanner = ContainerScanner() + + _stop = False + + def _handle_sigint(sig, frame) -> None: # noqa: ARG001 + nonlocal _stop + _stop = True + + signal.signal(signal.SIGINT, _handle_sigint) + + previous_fingerprint: Optional[frozenset] = None + cycle = 0 + + click.echo(f"sentinel watch — scanning every {interval}s | Ctrl-C to stop\n") + + while not _stop: + cycle += 1 + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + + results = [] + if config_scanner: + results.append(config_scanner.scan(Path(config_path))) # type: ignore[arg-type] + if probe_scanner: + results.append(probe_scanner.scan(endpoint, timeout=timeout)) # type: ignore[arg-type] + if container_scanner: + results.append(container_scanner.scan(container_target)) # type: ignore[arg-type] + + fingerprint = frozenset( + (f.rule_id, f.location, f.detail) for r in results for f in r.findings + ) + first_run = previous_fingerprint is None + changed = fingerprint != previous_fingerprint + previous_fingerprint = fingerprint + + if not on_change or changed: + if fmt == "terminal" and not output: + separator = "─" * 60 + status_parts = [f"Cycle {cycle}", now] + if not first_run and changed: + status_parts.append("CHANGED") + elif not first_run: + status_parts.append("no change") + click.echo(separator) + click.echo(" " + " | ".join(status_parts)) + click.echo(separator) + _write_output(results, fmt, output) + else: + click.echo(f"[{now}] Cycle {cycle} — no change, skipping report.") + + for _ in range(interval): + if _stop: + break + time.sleep(1) + + click.echo("\nsentinel watch stopped.") + + def main() -> None: cli() diff --git a/sentinel/core.py b/sentinel/core.py index be1867a..040d840 100644 --- a/sentinel/core.py +++ b/sentinel/core.py @@ -1,4 +1,5 @@ """Core data models for sentinel.""" + from __future__ import annotations from dataclasses import dataclass, field diff --git a/sentinel/modules/config.py b/sentinel/modules/config.py index 655ef94..4e03a6b 100644 --- a/sentinel/modules/config.py +++ b/sentinel/modules/config.py @@ -1,4 +1,5 @@ """Static analysis of MCP server configuration files.""" + from __future__ import annotations import json @@ -103,7 +104,8 @@ def _check_no_auth(self, config: dict, result: ScanResult, path: Path) -> None: auth = _get_nested(config, "auth") or _get_nested(config, "authentication") if auth is None: f = self._make_finding( - "no_auth", str(path), + "no_auth", + str(path), "No 'auth' or 'authentication' block found in config.", ) if f: @@ -119,7 +121,8 @@ def _check_plaintext_secrets(self, config: dict, result: ScanResult, path: Path) if value.startswith("$") or value.startswith("${"): continue f = self._make_finding( - "plaintext_secrets", str(path), + "plaintext_secrets", + str(path), f"Possible plaintext secret at key '{key}'.", ) if f: @@ -136,14 +139,16 @@ def _check_wildcard_permissions(self, config: dict, result: ScanResult, path: Pa return if isinstance(perms, str) and perms in ("*", "all"): f = self._make_finding( - "wildcard_permissions", str(path), + "wildcard_permissions", + str(path), f"Tool permissions set to wildcard value '{perms}'.", ) if f: result.add_finding(f) elif isinstance(perms, list) and ("*" in perms or "all" in perms): f = self._make_finding( - "wildcard_permissions", str(path), + "wildcard_permissions", + str(path), "Tool permissions list contains wildcard entry.", ) if f: @@ -158,7 +163,8 @@ def _check_no_rate_limiting(self, config: dict, result: ScanResult, path: Path) ) if rl is None: f = self._make_finding( - "no_rate_limiting", str(path), + "no_rate_limiting", + str(path), "No rate_limit or throttle block found in config.", ) if f: @@ -169,7 +175,8 @@ def _check_debug_mode(self, config: dict, result: ScanResult, path: Path) -> Non debug_on = ("true", "1", "yes", "on") if debug is True or (isinstance(debug, str) and debug.lower() in debug_on): f = self._make_finding( - "debug_mode_enabled", str(path), + "debug_mode_enabled", + str(path), f"'debug' is set to {debug!r} — disable in production.", ) if f: @@ -184,7 +191,8 @@ def _check_no_tls(self, config: dict, result: ScanResult, path: Path) -> None: ) if tls is None: f = self._make_finding( - "no_tls_config", str(path), + "no_tls_config", + str(path), "No tls/ssl block found in config. Transport encryption not configured.", ) if f: @@ -204,14 +212,16 @@ def _check_wildcard_cors(self, config: dict, result: ScanResult, path: Path) -> if origins == "*": f = self._make_finding( - "wildcard_cors", str(path), + "wildcard_cors", + str(path), "CORS allowed_origins is set to '*', permitting all origins.", ) if f: result.add_finding(f) elif isinstance(origins, list) and "*" in origins: f = self._make_finding( - "wildcard_cors", str(path), + "wildcard_cors", + str(path), "CORS allowed_origins list contains wildcard '*'.", ) if f: @@ -225,7 +235,8 @@ def _check_no_input_validation(self, config: dict, result: ScanResult, path: Pat ) if iv is None: f = self._make_finding( - "no_input_validation", str(path), + "no_input_validation", + str(path), "No input_validation block found in config.", ) if f: @@ -247,7 +258,8 @@ def _check_sensitive_logging(self, config: dict, result: ScanResult, path: Path) flag_on = ("true", "1", "yes") if flag_val is True or (isinstance(flag_val, str) and flag_val.lower() in flag_on): f = self._make_finding( - "sensitive_logging", str(path), + "sensitive_logging", + str(path), f"Logging config has '{flag_name}' enabled — may expose sensitive data.", ) if f: @@ -262,7 +274,8 @@ def _check_no_timeout(self, config: dict, result: ScanResult, path: Path) -> Non ) if timeout is None: f = self._make_finding( - "no_timeout", str(path), + "no_timeout", + str(path), "No timeout or timeout_seconds field found in config.", ) if f: diff --git a/sentinel/modules/container.py b/sentinel/modules/container.py index f68c64e..f7ba5d1 100644 --- a/sentinel/modules/container.py +++ b/sentinel/modules/container.py @@ -1,4 +1,5 @@ """Docker container and image security inspection.""" + from __future__ import annotations import re @@ -58,13 +59,15 @@ def scan(self, target: str) -> ScanResult: pass if container_obj is None and image_obj is None: - result.add_finding(Finding( - rule_id="CTR-ERR", - severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO, - title="Container/image not found", - detail=f"Could not find container or image: {target}", - location=target, - )) + result.add_finding( + Finding( + rule_id="CTR-ERR", + severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO, + title="Container/image not found", + detail=f"Could not find container or image: {target}", + location=target, + ) + ) return result if container_obj is not None: @@ -103,7 +106,8 @@ def _check_running_as_root(self, config: dict, result: ScanResult, target: str) user = config.get("User", "") if user in ("", "0", "root"): f = self._make_finding( - "running_as_root", target, + "running_as_root", + target, f"Container User is {user!r} — process runs as UID 0 (root).", ) if f: @@ -112,7 +116,8 @@ def _check_running_as_root(self, config: dict, result: ScanResult, target: str) def _check_privileged(self, host_config: dict, result: ScanResult, target: str) -> None: if host_config.get("Privileged", False): f = self._make_finding( - "privileged_container", target, + "privileged_container", + target, "Container is running with --privileged flag.", ) if f: @@ -124,7 +129,8 @@ def _check_resource_limits(self, host_config: dict, result: ScanResult, target: cpu_quota = host_config.get("CpuQuota", 0) if not memory and not nano_cpus and not cpu_quota: f = self._make_finding( - "no_resource_limits", target, + "no_resource_limits", + target, "No memory or CPU limits set on container.", ) if f: @@ -138,7 +144,8 @@ def _check_sensitive_env_vars(self, config: dict, result: ScanResult, target: st key, _, value = entry.partition("=") if _SECRET_KEY_RE.search(key) and value and not value.startswith("$"): f = self._make_finding( - "sensitive_env_vars", target, + "sensitive_env_vars", + target, f"Environment variable '{key}' appears to contain a plaintext secret.", ) if f: @@ -151,7 +158,8 @@ def _check_writable_filesystem( read_only = host_config.get("ReadonlyRootfs", False) if not read_only: f = self._make_finding( - "writable_filesystem", target, + "writable_filesystem", + target, "Container root filesystem is writable (ReadonlyRootfs=false).", ) if f: @@ -161,7 +169,8 @@ def _check_health_check(self, config: dict, result: ScanResult, target: str) -> health = config.get("Healthcheck") if health is None: f = self._make_finding( - "no_health_check", target, + "no_health_check", + target, "No HEALTHCHECK configured in container/image.", ) if f: @@ -178,7 +187,8 @@ def _check_outdated_image( pass # image has OCI metadata, considered maintained elif not created: f = self._make_finding( - "outdated_base_image", target, + "outdated_base_image", + target, "Image has no creation timestamp — may be using an outdated or untracked base.", ) if f: @@ -191,7 +201,8 @@ def _check_dangerous_ports(self, host_config: dict, result: ScanResult, target: port_num = int(port_proto.split("/")[0]) if port_num in _DANGEROUS_PORTS: f = self._make_finding( - "dangerous_ports", target, + "dangerous_ports", + target, f"Dangerous port {port_num} is exposed on the container.", ) if f: @@ -207,7 +218,8 @@ def _check_dangerous_ports_image(self, config: dict, result: ScanResult, target: port_num = int(port_proto.split("/")[0]) if port_num in _DANGEROUS_PORTS: f = self._make_finding( - "dangerous_ports", target, + "dangerous_ports", + target, f"Image exposes dangerous port {port_num}.", ) if f: diff --git a/sentinel/modules/probe.py b/sentinel/modules/probe.py index 2b5badf..60d6114 100644 --- a/sentinel/modules/probe.py +++ b/sentinel/modules/probe.py @@ -1,4 +1,5 @@ """Live MCP endpoint security analysis.""" + from __future__ import annotations import socket @@ -57,13 +58,15 @@ def scan(self, endpoint: str, timeout: int = 10) -> ScanResult: try: resp = requests.get(endpoint, timeout=timeout, verify=False, allow_redirects=False) except RequestException as exc: - result.add_finding(Finding( - rule_id="PRB-ERR", - severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO, - title="Endpoint unreachable", - detail=str(exc), - location=endpoint, - )) + result.add_finding( + Finding( + rule_id="PRB-ERR", + severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO, + title="Endpoint unreachable", + detail=str(exc), + location=endpoint, + ) + ) return result self._check_no_auth(resp, result, endpoint) @@ -88,12 +91,11 @@ def _check_tls_certificate(self, parsed, result: ScanResult, timeout: int) -> No not_after = cert.get("notAfter", "") if not_after: cert_fmt = "%b %d %H:%M:%S %Y %Z" - expiry = datetime.strptime(not_after, cert_fmt).replace( - tzinfo=timezone.utc - ) + expiry = datetime.strptime(not_after, cert_fmt).replace(tzinfo=timezone.utc) if expiry < datetime.now(timezone.utc): f = self._make_finding( - "tls_cert_invalid", f"{host}:{port}", + "tls_cert_invalid", + f"{host}:{port}", f"Certificate expired on {not_after}.", ) if f: @@ -121,7 +123,8 @@ def _check_tls_version(self, parsed, result: ScanResult, timeout: int) -> None: version = ssock.version() if version in ("SSLv3", "TLSv1", "TLSv1.1"): f = self._make_finding( - "weak_tls_version", f"{host}:{port}", + "weak_tls_version", + f"{host}:{port}", f"Server negotiated {version} — upgrade to TLS 1.2+.", ) if f: @@ -132,7 +135,8 @@ def _check_tls_version(self, parsed, result: ScanResult, timeout: int) -> None: def _check_no_auth(self, resp, result: ScanResult, endpoint: str) -> None: if resp.status_code not in (401, 403): f = self._make_finding( - "no_auth_endpoint", endpoint, + "no_auth_endpoint", + endpoint, f"Unauthenticated GET returned HTTP {resp.status_code} (expected 401/403).", ) if f: @@ -143,7 +147,8 @@ def _check_info_disclosure_headers(self, resp, result: ScanResult, endpoint: str value = resp.headers.get(header, "") if value and any(char.isdigit() for char in value): f = self._make_finding( - "info_disclosure_headers", endpoint, + "info_disclosure_headers", + endpoint, f"Header '{header}: {value}' discloses version information.", ) if f: @@ -155,7 +160,8 @@ def _check_missing_security_headers(self, resp, result: ScanResult, endpoint: st missing = [h for h in _SECURITY_HEADERS if h not in lower_headers] if missing: f = self._make_finding( - "missing_security_headers", endpoint, + "missing_security_headers", + endpoint, f"Missing security headers: {', '.join(missing)}.", ) if f: @@ -164,12 +170,14 @@ def _check_missing_security_headers(self, resp, result: ScanResult, endpoint: st def _check_tool_listing_exposed(self, endpoint: str, result: ScanResult, timeout: int) -> None: try: import requests + base = endpoint.rstrip("/") list_url = f"{base}/tools/list" resp = requests.get(list_url, timeout=timeout, verify=False) if resp.status_code == 200: f = self._make_finding( - "tool_listing_exposed", list_url, + "tool_listing_exposed", + list_url, f"GET {list_url} returned 200 without authentication.", ) if f: @@ -182,13 +190,15 @@ def _check_verbose_errors(self, endpoint: str, result: ScanResult, timeout: int) return try: import requests + bad_url = endpoint.rstrip("/") + "/nonexistent-sentinel-probe-12345" resp = requests.get(bad_url, timeout=timeout, verify=False) body = resp.text.lower() - verbose_markers = ["traceback", "stack trace", "exception", "at line", "file \""] + verbose_markers = ["traceback", "stack trace", "exception", "at line", 'file "'] if any(marker in body for marker in verbose_markers): f = self._make_finding( - "verbose_errors", bad_url, + "verbose_errors", + bad_url, "Error response contains stack trace or verbose debug information.", ) if f: @@ -204,7 +214,8 @@ def _check_rate_limiting(self, resp, result: ScanResult, endpoint: str) -> None: ) if not has_rl and resp.status_code != 429: f = self._make_finding( - "no_rate_limiting_probe", endpoint, + "no_rate_limiting_probe", + endpoint, "No rate-limiting headers detected in response (X-RateLimit-Limit, Retry-After).", ) if f: diff --git a/sentinel/report/html.py b/sentinel/report/html.py index 0c5cc38..de776c3 100644 --- a/sentinel/report/html.py +++ b/sentinel/report/html.py @@ -1,4 +1,5 @@ """Standalone HTML report renderer.""" + from __future__ import annotations from datetime import datetime, timezone @@ -102,8 +103,7 @@ def render(results: List[ScanResult]) -> str: for f in sorted(result.findings, key=lambda x: x.severity, reverse=True): color = _SEVERITY_CSS.get(f.severity, "#6b7280") ref_link = ( - f'' - f'{f.rule_id}' + f'{f.rule_id}' if f.reference else f.rule_id ) @@ -150,9 +150,5 @@ def render(results: List[ScanResult]) -> str: def _html_escape(text: str) -> str: return ( - text - .replace("&", "&") - .replace("<", "<") - .replace(">", ">") - .replace('"', """) + text.replace("&", "&").replace("<", "<").replace(">", ">").replace('"', """) ) diff --git a/sentinel/report/sarif.py b/sentinel/report/sarif.py index 715a8e6..4635844 100644 --- a/sentinel/report/sarif.py +++ b/sentinel/report/sarif.py @@ -1,4 +1,5 @@ """SARIF 2.1 and plain JSON output renderers.""" + from __future__ import annotations import json @@ -16,7 +17,9 @@ } SENTINEL_VERSION = "0.1.0" -SARIF_SCHEMA = "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json" +SARIF_SCHEMA = ( + "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json" +) def render_sarif(results: List[ScanResult]) -> Dict[str, Any]: @@ -40,23 +43,27 @@ def render_sarif(results: List[ScanResult]) -> Dict[str, Any]: }, } level = _SARIF_LEVEL.get(f.severity.value, "warning") - run_results.append({ - "ruleId": f.rule_id, - "level": level, - "message": {"text": f.detail}, - "locations": [ - { - "physicalLocation": { - "artifactLocation": {"uri": f.location}, + run_results.append( + { + "ruleId": f.rule_id, + "level": level, + "message": {"text": f.detail}, + "locations": [ + { + "physicalLocation": { + "artifactLocation": {"uri": f.location}, + } } - } - ], - "fixes": [ - { - "description": {"text": f.remediation}, - } - ] if f.remediation else [], - }) + ], + "fixes": [ + { + "description": {"text": f.remediation}, + } + ] + if f.remediation + else [], + } + ) sarif = { "$schema": SARIF_SCHEMA, @@ -97,22 +104,24 @@ def render_json(results: List[ScanResult]) -> Dict[str, Any]: "results": [], } for result in results: - output["results"].append({ - "module": result.module, - "target": result.target, - "findings": [ - { - "rule_id": f.rule_id, - "severity": f.severity.value, - "title": f.title, - "detail": f.detail, - "location": f.location, - "remediation": f.remediation, - "reference": f.reference, - } - for f in result.findings - ], - }) + output["results"].append( + { + "module": result.module, + "target": result.target, + "findings": [ + { + "rule_id": f.rule_id, + "severity": f.severity.value, + "title": f.title, + "detail": f.detail, + "location": f.location, + "remediation": f.remediation, + "reference": f.reference, + } + for f in result.findings + ], + } + ) return output diff --git a/sentinel/report/terminal.py b/sentinel/report/terminal.py index 70748e3..11b7ccc 100644 --- a/sentinel/report/terminal.py +++ b/sentinel/report/terminal.py @@ -1,4 +1,5 @@ """Terminal output renderer — Rich if available, plain fallback.""" + from __future__ import annotations from typing import List diff --git a/sentinel/rules/registry.py b/sentinel/rules/registry.py index c92a27c..beb4ce3 100644 --- a/sentinel/rules/registry.py +++ b/sentinel/rules/registry.py @@ -1,4 +1,5 @@ """Rule registry — loads and indexes rules.yaml.""" + from __future__ import annotations from pathlib import Path diff --git a/tests/fixtures/configs.py b/tests/fixtures/configs.py index bcaeb9f..746d9fc 100644 --- a/tests/fixtures/configs.py +++ b/tests/fixtures/configs.py @@ -36,7 +36,7 @@ # no auth block # no tls block # no rate_limit block - "api_key": "sk-abc123supersecretkey", # plaintext secret + "api_key": "sk-abc123supersecretkey", # plaintext secret "debug": True, "cors": { "allowed_origins": "*", diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index b658768..c4c55eb 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -3,7 +3,9 @@ from __future__ import annotations import json +import signal as _signal from pathlib import Path +from unittest.mock import patch import pytest from click.testing import CliRunner @@ -167,6 +169,167 @@ def test_scan_fail_on_threshold_respected(self, runner, high_only_file): assert result.exit_code == 0 +class TestWatchCommand: + """Tests for `sentinel watch` + + As watch runs an infinite loop all tests patch ``signal.signal`` + to capture the SIGINT handler and ``time.sleep`` to fire it after the + first sleep call, giving exactly one full scan cycle per invocation + """ + + @pytest.fixture + def _one_cycle(self): + """Patch signal + sleep so the watch loop executes exactly one cycle.""" + captured = {} + + def _mock_signal(sig, handler): + if sig == _signal.SIGINT: + captured["handler"] = handler + + def _mock_sleep(_secs): + if "handler" in captured: + captured["handler"](None, None) + + with ( + patch("signal.signal", side_effect=_mock_signal), + patch("time.sleep", side_effect=_mock_sleep), + ): + yield + + @pytest.fixture + def _two_cycles(self): + """Patch signal + sleep so the watch loop executes exactly two cycles.""" + captured = {} + call_count = {"n": 0} + + def _mock_signal(sig, handler): + if sig == _signal.SIGINT: + captured["handler"] = handler + + def _mock_sleep(_secs): + call_count["n"] += 1 + if call_count["n"] >= 2 and "handler" in captured: + captured["handler"](None, None) + + with ( + patch("signal.signal", side_effect=_mock_signal), + patch("time.sleep", side_effect=_mock_sleep), + ): + yield + + def test_no_targets_exits_2(self, runner): + result = runner.invoke(cli, ["watch"]) + assert result.exit_code == 2 + + def test_no_targets_prints_usage_hint(self, runner): + result = runner.invoke(cli, ["watch"]) + assert "No targets" in result.output or "No targets" in (result.stderr or "") + + def test_one_cycle_outputs_cycle_header(self, runner, insecure_file, _one_cycle): + result = runner.invoke(cli, ["watch", "--config", insecure_file, "--interval", "1"]) + assert result.exit_code == 0 + assert "Cycle 1" in result.output + + def test_one_cycle_prints_stop_message(self, runner, insecure_file, _one_cycle): + result = runner.invoke(cli, ["watch", "--config", insecure_file, "--interval", "1"]) + assert "sentinel watch stopped" in result.output + + def test_one_cycle_contains_findings(self, runner, insecure_file, _one_cycle): + result = runner.invoke(cli, ["watch", "--config", insecure_file, "--interval", "1"]) + assert "CFG-001" in result.output + + def test_two_cycles_second_shows_no_change(self, runner, insecure_file, _two_cycles): + result = runner.invoke(cli, ["watch", "--config", insecure_file, "--interval", "1"]) + assert "Cycle 2" in result.output + assert "no change" in result.output + + def test_two_cycles_second_shows_changed_when_findings_differ( + self, runner, tmp_path, _two_cycles + ): + cfg = tmp_path / "mutable.json" + cfg.write_text(json.dumps(INSECURE_CONFIG)) + + call_count = {"n": 0} + original_scan = None + + # clean config on the second scan cycle + from sentinel.modules.config import ConfigScanner + + original_scan = ConfigScanner.scan + + def _patched_scan(self, path): + call_count["n"] += 1 + if call_count["n"] == 2: + import json as _json + + from tests.fixtures.configs import SECURE_CONFIG + + path.write_text(_json.dumps(SECURE_CONFIG)) + return original_scan(self, path) + + with patch.object(ConfigScanner, "scan", _patched_scan): + result = runner.invoke(cli, ["watch", "--config", str(cfg), "--interval", "1"]) + + assert "CHANGED" in result.output + + def test_on_change_suppresses_report_when_no_change(self, runner, insecure_file, _two_cycles): + result = runner.invoke( + cli, + ["watch", "--config", insecure_file, "--interval", "1", "--on-change"], + ) + assert "no change, skipping report" in result.output + # header should appear once (cycle 1 only — cycle 2 is suppressed) + assert result.output.count("Cycle 1") == 1 + assert result.output.count("CFG-001") == 1 + + def test_on_change_emits_report_on_first_cycle(self, runner, insecure_file, _one_cycle): + result = runner.invoke( + cli, + ["watch", "--config", insecure_file, "--interval", "1", "--on-change"], + ) + assert "CFG-001" in result.output + + def test_output_file_written_each_cycle(self, runner, insecure_file, tmp_path, _one_cycle): + out = str(tmp_path / "watch_report.json") + runner.invoke( + cli, + [ + "watch", + "--config", + insecure_file, + "--format", + "json", + "--output", + out, + "--interval", + "1", + ], + ) + assert Path(out).exists() + data = json.loads(Path(out).read_text()) + assert "results" in data + + def test_json_format_per_cycle(self, runner, insecure_file, tmp_path, _one_cycle): + out = str(tmp_path / "out.json") + runner.invoke( + cli, + [ + "watch", + "--config", + insecure_file, + "--format", + "json", + "--output", + out, + "--interval", + "1", + ], + ) + data = json.loads(Path(out).read_text()) + rule_ids = [f["rule_id"] for r in data["results"] for f in r["findings"]] + assert "CFG-001" in rule_ids + + class TestVersionFlag: def test_version_flag_exits_0(self, runner): result = runner.invoke(cli, ["--version"]) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 25601dc..be20049 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,4 +1,5 @@ """25 tests for ConfigScanner.""" + from __future__ import annotations import json diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index cd21ff0..22c46e2 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -1,4 +1,5 @@ """9 tests for core data models: Severity, Finding, ScanResult.""" + import pytest from sentinel.core import Finding, ScanResult, Severity diff --git a/tests/unit/test_report.py b/tests/unit/test_report.py index 6e1b916..4d1b7d0 100644 --- a/tests/unit/test_report.py +++ b/tests/unit/test_report.py @@ -1,4 +1,5 @@ """17 tests for HTML, SARIF, and terminal renderers.""" + from __future__ import annotations from sentinel.core import Finding, ScanResult, Severity @@ -33,6 +34,7 @@ def _make_finding( # ── Terminal renderer ────────────────────────────────────────────────────── + class TestTerminalRenderer: def test_plain_output_is_string(self): r = _make_result() @@ -78,6 +80,7 @@ def test_severity_counts_in_summary(self): # ── HTML renderer ────────────────────────────────────────────────────────── + class TestHTMLRenderer: def test_returns_string(self): output = html_report.render([_make_result()]) @@ -113,6 +116,7 @@ def test_severity_badge_in_html(self): # ── SARIF renderer ───────────────────────────────────────────────────────── + class TestSARIFRenderer: def test_sarif_returns_dict(self): sarif = sarif_report.render_sarif([_make_result()]) diff --git a/tests/unit/test_rules.py b/tests/unit/test_rules.py index 8ee3af4..c2321ac 100644 --- a/tests/unit/test_rules.py +++ b/tests/unit/test_rules.py @@ -1,4 +1,5 @@ """11 tests for RuleRegistry.""" + from sentinel.core import Severity from sentinel.rules import RuleRegistry