Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ sentinel scans Model Context Protocol (MCP) server configurations, live endpoint
- **4 output formats** — terminal (Rich), HTML, JSON, SARIF 2.1
- **GitHub Action** — drop-in CI integration with SARIF upload support
- **Fail-on threshold** — block PRs on HIGH/CRITICAL findings
- 👁️ **Watch mode** — continuous monitoring with configurable interval and change detection

---

Expand Down Expand Up @@ -51,6 +52,9 @@ sentinel container my-mcp-image:latest
# Run all scanners in one pass
sentinel scan --config mcp.json --endpoint https://mcp.example.com --container my-image:latest

# Continuously monitor — re-scan every 60s, alert on change
sentinel watch --config mcp.json --endpoint https://mcp.example.com --interval 60 --on-change

# Output as SARIF for GitHub Code Scanning
sentinel config mcp.json --format sarif --output sentinel.sarif.json
```
Expand Down
1 change: 1 addition & 0 deletions sentinel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""sentinel — MCP security scanner by Helixar."""

__version__ = "0.1.0"
109 changes: 108 additions & 1 deletion sentinel/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""sentinel CLI — 4 commands: config, probe, container, scan."""
"""sentinel CLI — 5 commands: config, probe, container, scan, watch."""

from __future__ import annotations

import sys
Expand Down Expand Up @@ -175,6 +176,112 @@ def scan(
sys.exit(1)


@cli.command()
@click.option("--config", "config_path", default=None, type=click.Path())
@click.option("--endpoint", default=None)
@click.option("--container", "container_target", default=None)
@click.option("--interval", default=60, type=int, help="Seconds between scans.")
@click.option("--format", "fmt", default="terminal", type=_FMT_CHOICES)
@click.option("--output", default=None, help="Write report to file on each cycle.")
@click.option("--fail-on", default="high")
@click.option(
"--on-change",
is_flag=True,
default=False,
help="Only emit a report when findings change.",
)
@click.option("--safe-mode/--no-safe-mode", default=True)
@click.option("--timeout", default=10)
def watch(
config_path: Optional[str],
endpoint: Optional[str],
container_target: Optional[str],
interval: int,
fmt: str,
output: Optional[str],
fail_on: str,
on_change: bool,
safe_mode: bool,
timeout: int,
) -> None:
"""Continuously monitor targets, re-scanning on a fixed interval."""
import signal
import time
from datetime import datetime, timezone

if not any([config_path, endpoint, container_target]):
click.echo("No targets specified. Use --config, --endpoint, or --container.", err=True)
sys.exit(2)

config_scanner = ConfigScanner() if config_path else None

probe_scanner = None
if endpoint:
from sentinel.modules.probe import ProbeScanner

probe_scanner = ProbeScanner(safe_mode=safe_mode)

container_scanner = None
if container_target:
from sentinel.modules.container import ContainerScanner

container_scanner = ContainerScanner()

_stop = False

def _handle_sigint(sig, frame) -> None: # noqa: ARG001
nonlocal _stop
_stop = True

signal.signal(signal.SIGINT, _handle_sigint)

previous_fingerprint: Optional[frozenset] = None
cycle = 0

click.echo(f"sentinel watch — scanning every {interval}s | Ctrl-C to stop\n")

while not _stop:
cycle += 1
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")

results = []
if config_scanner:
results.append(config_scanner.scan(Path(config_path))) # type: ignore[arg-type]
if probe_scanner:
results.append(probe_scanner.scan(endpoint, timeout=timeout)) # type: ignore[arg-type]
if container_scanner:
results.append(container_scanner.scan(container_target)) # type: ignore[arg-type]

fingerprint = frozenset(
(f.rule_id, f.location, f.detail) for r in results for f in r.findings
)
first_run = previous_fingerprint is None
changed = fingerprint != previous_fingerprint
previous_fingerprint = fingerprint

if not on_change or changed:
if fmt == "terminal" and not output:
separator = "─" * 60
status_parts = [f"Cycle {cycle}", now]
if not first_run and changed:
status_parts.append("CHANGED")
elif not first_run:
status_parts.append("no change")
click.echo(separator)
click.echo(" " + " | ".join(status_parts))
click.echo(separator)
_write_output(results, fmt, output)
else:
click.echo(f"[{now}] Cycle {cycle} — no change, skipping report.")

for _ in range(interval):
if _stop:
break
time.sleep(1)

click.echo("\nsentinel watch stopped.")


def main() -> None:
cli()

Expand Down
1 change: 1 addition & 0 deletions sentinel/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Core data models for sentinel."""

from __future__ import annotations

from dataclasses import dataclass, field
Expand Down
37 changes: 25 additions & 12 deletions sentinel/modules/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Static analysis of MCP server configuration files."""

from __future__ import annotations

import json
Expand Down Expand Up @@ -103,7 +104,8 @@ def _check_no_auth(self, config: dict, result: ScanResult, path: Path) -> None:
auth = _get_nested(config, "auth") or _get_nested(config, "authentication")
if auth is None:
f = self._make_finding(
"no_auth", str(path),
"no_auth",
str(path),
"No 'auth' or 'authentication' block found in config.",
)
if f:
Expand All @@ -119,7 +121,8 @@ def _check_plaintext_secrets(self, config: dict, result: ScanResult, path: Path)
if value.startswith("$") or value.startswith("${"):
continue
f = self._make_finding(
"plaintext_secrets", str(path),
"plaintext_secrets",
str(path),
f"Possible plaintext secret at key '{key}'.",
)
if f:
Expand All @@ -136,14 +139,16 @@ def _check_wildcard_permissions(self, config: dict, result: ScanResult, path: Pa
return
if isinstance(perms, str) and perms in ("*", "all"):
f = self._make_finding(
"wildcard_permissions", str(path),
"wildcard_permissions",
str(path),
f"Tool permissions set to wildcard value '{perms}'.",
)
if f:
result.add_finding(f)
elif isinstance(perms, list) and ("*" in perms or "all" in perms):
f = self._make_finding(
"wildcard_permissions", str(path),
"wildcard_permissions",
str(path),
"Tool permissions list contains wildcard entry.",
)
if f:
Expand All @@ -158,7 +163,8 @@ def _check_no_rate_limiting(self, config: dict, result: ScanResult, path: Path)
)
if rl is None:
f = self._make_finding(
"no_rate_limiting", str(path),
"no_rate_limiting",
str(path),
"No rate_limit or throttle block found in config.",
)
if f:
Expand All @@ -169,7 +175,8 @@ def _check_debug_mode(self, config: dict, result: ScanResult, path: Path) -> Non
debug_on = ("true", "1", "yes", "on")
if debug is True or (isinstance(debug, str) and debug.lower() in debug_on):
f = self._make_finding(
"debug_mode_enabled", str(path),
"debug_mode_enabled",
str(path),
f"'debug' is set to {debug!r} — disable in production.",
)
if f:
Expand All @@ -184,7 +191,8 @@ def _check_no_tls(self, config: dict, result: ScanResult, path: Path) -> None:
)
if tls is None:
f = self._make_finding(
"no_tls_config", str(path),
"no_tls_config",
str(path),
"No tls/ssl block found in config. Transport encryption not configured.",
)
if f:
Expand All @@ -204,14 +212,16 @@ def _check_wildcard_cors(self, config: dict, result: ScanResult, path: Path) ->

if origins == "*":
f = self._make_finding(
"wildcard_cors", str(path),
"wildcard_cors",
str(path),
"CORS allowed_origins is set to '*', permitting all origins.",
)
if f:
result.add_finding(f)
elif isinstance(origins, list) and "*" in origins:
f = self._make_finding(
"wildcard_cors", str(path),
"wildcard_cors",
str(path),
"CORS allowed_origins list contains wildcard '*'.",
)
if f:
Expand All @@ -225,7 +235,8 @@ def _check_no_input_validation(self, config: dict, result: ScanResult, path: Pat
)
if iv is None:
f = self._make_finding(
"no_input_validation", str(path),
"no_input_validation",
str(path),
"No input_validation block found in config.",
)
if f:
Expand All @@ -247,7 +258,8 @@ def _check_sensitive_logging(self, config: dict, result: ScanResult, path: Path)
flag_on = ("true", "1", "yes")
if flag_val is True or (isinstance(flag_val, str) and flag_val.lower() in flag_on):
f = self._make_finding(
"sensitive_logging", str(path),
"sensitive_logging",
str(path),
f"Logging config has '{flag_name}' enabled — may expose sensitive data.",
)
if f:
Expand All @@ -262,7 +274,8 @@ def _check_no_timeout(self, config: dict, result: ScanResult, path: Path) -> Non
)
if timeout is None:
f = self._make_finding(
"no_timeout", str(path),
"no_timeout",
str(path),
"No timeout or timeout_seconds field found in config.",
)
if f:
Expand Down
44 changes: 28 additions & 16 deletions sentinel/modules/container.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Docker container and image security inspection."""

from __future__ import annotations

import re
Expand Down Expand Up @@ -58,13 +59,15 @@ def scan(self, target: str) -> ScanResult:
pass

if container_obj is None and image_obj is None:
result.add_finding(Finding(
rule_id="CTR-ERR",
severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO,
title="Container/image not found",
detail=f"Could not find container or image: {target}",
location=target,
))
result.add_finding(
Finding(
rule_id="CTR-ERR",
severity=__import__("sentinel.core", fromlist=["Severity"]).Severity.INFO,
title="Container/image not found",
detail=f"Could not find container or image: {target}",
location=target,
)
)
return result

if container_obj is not None:
Expand Down Expand Up @@ -103,7 +106,8 @@ def _check_running_as_root(self, config: dict, result: ScanResult, target: str)
user = config.get("User", "")
if user in ("", "0", "root"):
f = self._make_finding(
"running_as_root", target,
"running_as_root",
target,
f"Container User is {user!r} — process runs as UID 0 (root).",
)
if f:
Expand All @@ -112,7 +116,8 @@ def _check_running_as_root(self, config: dict, result: ScanResult, target: str)
def _check_privileged(self, host_config: dict, result: ScanResult, target: str) -> None:
if host_config.get("Privileged", False):
f = self._make_finding(
"privileged_container", target,
"privileged_container",
target,
"Container is running with --privileged flag.",
)
if f:
Expand All @@ -124,7 +129,8 @@ def _check_resource_limits(self, host_config: dict, result: ScanResult, target:
cpu_quota = host_config.get("CpuQuota", 0)
if not memory and not nano_cpus and not cpu_quota:
f = self._make_finding(
"no_resource_limits", target,
"no_resource_limits",
target,
"No memory or CPU limits set on container.",
)
if f:
Expand All @@ -138,7 +144,8 @@ def _check_sensitive_env_vars(self, config: dict, result: ScanResult, target: st
key, _, value = entry.partition("=")
if _SECRET_KEY_RE.search(key) and value and not value.startswith("$"):
f = self._make_finding(
"sensitive_env_vars", target,
"sensitive_env_vars",
target,
f"Environment variable '{key}' appears to contain a plaintext secret.",
)
if f:
Expand All @@ -151,7 +158,8 @@ def _check_writable_filesystem(
read_only = host_config.get("ReadonlyRootfs", False)
if not read_only:
f = self._make_finding(
"writable_filesystem", target,
"writable_filesystem",
target,
"Container root filesystem is writable (ReadonlyRootfs=false).",
)
if f:
Expand All @@ -161,7 +169,8 @@ def _check_health_check(self, config: dict, result: ScanResult, target: str) ->
health = config.get("Healthcheck")
if health is None:
f = self._make_finding(
"no_health_check", target,
"no_health_check",
target,
"No HEALTHCHECK configured in container/image.",
)
if f:
Expand All @@ -178,7 +187,8 @@ def _check_outdated_image(
pass # image has OCI metadata, considered maintained
elif not created:
f = self._make_finding(
"outdated_base_image", target,
"outdated_base_image",
target,
"Image has no creation timestamp — may be using an outdated or untracked base.",
)
if f:
Expand All @@ -191,7 +201,8 @@ def _check_dangerous_ports(self, host_config: dict, result: ScanResult, target:
port_num = int(port_proto.split("/")[0])
if port_num in _DANGEROUS_PORTS:
f = self._make_finding(
"dangerous_ports", target,
"dangerous_ports",
target,
f"Dangerous port {port_num} is exposed on the container.",
)
if f:
Expand All @@ -207,7 +218,8 @@ def _check_dangerous_ports_image(self, config: dict, result: ScanResult, target:
port_num = int(port_proto.split("/")[0])
if port_num in _DANGEROUS_PORTS:
f = self._make_finding(
"dangerous_ports", target,
"dangerous_ports",
target,
f"Image exposes dangerous port {port_num}.",
)
if f:
Expand Down
Loading
Loading