From 5349c5ef893ea87aff5844e705d95eb276591883 Mon Sep 17 00:00:00 2001 From: Sid J <6353716+sidnz@users.noreply.github.com> Date: Mon, 9 Mar 2026 20:55:37 +1300 Subject: [PATCH 1/2] test: add integration tests for cli and probe scanner --- tests/integration/__init__.py | 0 tests/integration/test_cli.py | 174 ++++++++++++++++++++++++ tests/integration/test_probe.py | 232 ++++++++++++++++++++++++++++++++ 3 files changed, 406 insertions(+) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/test_cli.py create mode 100644 tests/integration/test_probe.py diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py new file mode 100644 index 0000000..beddc0b --- /dev/null +++ b/tests/integration/test_cli.py @@ -0,0 +1,174 @@ +"""CLI integration tests""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +from click.testing import CliRunner + +from sentinel.cli import cli +from tests.fixtures.configs import INSECURE_CONFIG + +_CLEAN_CONFIG = { + "auth": {"scheme": "bearer", "validation_mode": "strict"}, + "tls": {"cert": "/etc/ssl/certs/server.crt", "min_version": "TLS1.2"}, + "rate_limit": {"requests_per_minute": 60}, + "debug": False, + "cors": {"allowed_origins": ["https://app.example.com"]}, + "input_validation": {"enabled": True}, + "logging": {"level": "info", "log_sensitive": False, "log_body": False, "log_auth": False}, + "timeout_seconds": 30, + "permissions": ["read_resource"], +} +_HIGH_ONLY_CONFIG = { + "auth": {"scheme": "bearer"}, + "permissions": ["read_resource"], + "cors": {"allowed_origins": ["https://example.com"]}, + "input_validation": {"enabled": True}, + "timeout_seconds": 30, +} + + +@pytest.fixture +def runner(): + return CliRunner() + + +@pytest.fixture +def insecure_file(tmp_path): + p = tmp_path / "insecure.json" + p.write_text(json.dumps(INSECURE_CONFIG)) + return str(p) + + +@pytest.fixture +def clean_file(tmp_path): + p = tmp_path / "clean.json" + p.write_text(json.dumps(_CLEAN_CONFIG)) + return str(p) + + +@pytest.fixture +def high_only_file(tmp_path): + p = tmp_path / "high_only.json" + p.write_text(json.dumps(_HIGH_ONLY_CONFIG)) + return str(p) + + +class TestConfigCommandExitCodes: + def test_insecure_config_exits_1(self, runner, insecure_file): + result = runner.invoke(cli, ["config", insecure_file]) + assert result.exit_code == 1 + + def test_clean_config_exits_0(self, runner, clean_file): + result = runner.invoke(cli, ["config", clean_file]) + assert result.exit_code == 0 + + def test_fail_on_critical_passes_when_only_high_findings(self, runner, high_only_file): + result = runner.invoke(cli, ["config", high_only_file, "--fail-on", "critical"]) + assert result.exit_code == 0 + + def test_fail_on_high_fails_on_high_findings(self, runner, high_only_file): + result = runner.invoke(cli, ["config", high_only_file, "--fail-on", "high"]) + assert result.exit_code == 1 + + def test_fail_on_medium_passes_when_no_findings(self, runner, clean_file): + result = runner.invoke(cli, ["config", clean_file, "--fail-on", "medium"]) + assert result.exit_code == 0 + + def test_nonexistent_file_exits_nonzero(self, runner): + result = runner.invoke(cli, ["config", "/tmp/does-not-exist-sentinel-xyz.json"]) + assert result.exit_code != 0 + + +class TestConfigCommandOutputFormats: + def test_json_output_is_valid_json(self, runner, insecure_file): + result = runner.invoke(cli, ["config", insecure_file, "--format", "json"]) + data = json.loads(result.output) + assert "results" in data + assert "sentinel_version" in data + + def test_json_output_contains_expected_findings(self, runner, insecure_file): + result = runner.invoke(cli, ["config", insecure_file, "--format", "json"]) + data = json.loads(result.output) + rule_ids = [f["rule_id"] for r in data["results"] for f in r["findings"]] + assert "CFG-001" in rule_ids + assert "CFG-002" in rule_ids + + def test_sarif_output_is_valid(self, runner, insecure_file): + result = runner.invoke(cli, ["config", insecure_file, "--format", "sarif"]) + data = json.loads(result.output) + assert data["version"] == "2.1.0" + assert "runs" in data + assert len(data["runs"]) == 1 + + def test_sarif_findings_map_to_error_level(self, runner, insecure_file): + result = runner.invoke(cli, ["config", insecure_file, "--format", "sarif"]) + data = json.loads(result.output) + sarif_results = data["runs"][0]["results"] + critical_results = [r for r in sarif_results if r.get("ruleId") == "CFG-001"] + assert critical_results[0]["level"] == "error" + + def test_html_output_is_valid(self, runner, insecure_file): + result = runner.invoke(cli, ["config", insecure_file, "--format", "html"]) + assert "" in result.output + assert "CFG-001" in result.output + + def test_output_file_is_written(self, runner, insecure_file, tmp_path): + out = str(tmp_path / "report.json") + runner.invoke(cli, ["config", insecure_file, "--format", "json", "--output", out]) + assert Path(out).exists() + data = json.loads(Path(out).read_text()) + assert "results" in data + + def test_output_file_does_not_print_content_to_stdout(self, runner, insecure_file, tmp_path): + out = str(tmp_path / "report.json") + result = runner.invoke(cli, ["config", insecure_file, "--format", "json", "--output", out]) + assert "Report written to:" in result.output + assert "sentinel_version" not in result.output + + +class TestScanCommand: + def test_no_targets_exits_2(self, runner): + result = runner.invoke(cli, ["scan"]) + assert result.exit_code == 2 + + def test_no_targets_prints_usage_hint(self, runner): + result = runner.invoke(cli, ["scan"]) + assert "No targets" in result.output or "No targets" in (result.stderr or "") + + def test_config_only_insecure_exits_1(self, runner, insecure_file): + result = runner.invoke(cli, ["scan", "--config", insecure_file]) + assert result.exit_code == 1 + + def test_config_only_clean_exits_0(self, runner, clean_file): + result = runner.invoke(cli, ["scan", "--config", clean_file]) + assert result.exit_code == 0 + + def test_scan_json_output_includes_module_name(self, runner, insecure_file): + result = runner.invoke(cli, ["scan", "--config", insecure_file, "--format", "json"]) + data = json.loads(result.output) + assert len(data["results"]) == 1 + assert data["results"][0]["module"] == "config" + + def test_scan_writes_sarif_file(self, runner, insecure_file, tmp_path): + out = str(tmp_path / "out.sarif.json") + runner.invoke(cli, ["scan", "--config", insecure_file, "--format", "sarif", "--output", out]) + assert Path(out).exists() + sarif = json.loads(Path(out).read_text()) + assert sarif["version"] == "2.1.0" + + def test_scan_fail_on_threshold_respected(self, runner, high_only_file): + result = runner.invoke(cli, ["scan", "--config", high_only_file, "--fail-on", "critical"]) + assert result.exit_code == 0 + + +class TestVersionFlag: + def test_version_flag_exits_0(self, runner): + result = runner.invoke(cli, ["--version"]) + assert result.exit_code == 0 + + def test_version_flag_shows_version(self, runner): + result = runner.invoke(cli, ["--version"]) + assert "0.1.0" in result.output diff --git a/tests/integration/test_probe.py b/tests/integration/test_probe.py new file mode 100644 index 0000000..1228266 --- /dev/null +++ b/tests/integration/test_probe.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from sentinel.modules.probe import ProbeScanner + + +def _resp(status=200, headers=None, text="ok"): + r = MagicMock() + r.status_code = status + r.headers = headers or {} + r.text = text + return r + + +def _url_mock(*routes): + def dispatch(url, **kwargs): + for substr, resp in routes: + if substr in url: + return resp + return _resp() + return dispatch + + +_SECURE_HEADERS = { + "strict-transport-security": "max-age=31536000; includeSubDomains", + "x-content-type-options": "nosniff", + "x-frame-options": "DENY", +} + + +class TestNoAuthCheck: + def test_200_triggers_prb003(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(200)): + result = scanner.scan("http://example.com") + assert any(f.rule_id == "PRB-003" for f in result.findings) + + def test_401_suppresses_prb003(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(401)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-003" for f in result.findings) + + def test_403_suppresses_prb003(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(403)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-003" for f in result.findings) + + def test_finding_includes_status_code_in_detail(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(200)): + result = scanner.scan("http://example.com") + finding = next(f for f in result.findings if f.rule_id == "PRB-003") + assert "200" in finding.detail + + +class TestInfoDisclosureHeaders: + def test_server_header_with_version_triggers_prb004(self): + scanner = ProbeScanner() + headers = {"server": "Apache/2.4.41 (Ubuntu)", **_SECURE_HEADERS} + with patch("requests.get", return_value=_resp(401, headers=headers)): + result = scanner.scan("http://example.com") + assert any(f.rule_id == "PRB-004" for f in result.findings) + + def test_x_powered_by_with_version_triggers_prb004(self): + scanner = ProbeScanner() + headers = {"x-powered-by": "PHP/8.1.0", **_SECURE_HEADERS} + with patch("requests.get", return_value=_resp(401, headers=headers)): + result = scanner.scan("http://example.com") + assert any(f.rule_id == "PRB-004" for f in result.findings) + + def test_server_header_without_version_no_prb004(self): + scanner = ProbeScanner() + headers = {"server": "sentinel", **_SECURE_HEADERS} + with patch("requests.get", return_value=_resp(401, headers=headers)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-004" for f in result.findings) + + def test_no_version_headers_no_prb004(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(401, headers=_SECURE_HEADERS)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-004" for f in result.findings) + + +class TestMissingSecurityHeaders: + def test_no_security_headers_triggers_prb005(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(401)): + result = scanner.scan("http://example.com") + assert any(f.rule_id == "PRB-005" for f in result.findings) + + def test_all_security_headers_present_suppresses_prb005(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(401, headers=_SECURE_HEADERS)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-005" for f in result.findings) + + def test_partial_headers_still_triggers_prb005(self): + scanner = ProbeScanner() + partial = {"strict-transport-security": "max-age=31536000"} + with patch("requests.get", return_value=_resp(401, headers=partial)): + result = scanner.scan("http://example.com") + assert any(f.rule_id == "PRB-005" for f in result.findings) + + def test_finding_names_missing_headers_in_detail(self): + scanner = ProbeScanner() + partial = {"strict-transport-security": "max-age=31536000"} + with patch("requests.get", return_value=_resp(401, headers=partial)): + result = scanner.scan("http://example.com") + finding = next(f for f in result.findings if f.rule_id == "PRB-005") + assert "x-content-type-options" in finding.detail + assert "x-frame-options" in finding.detail + + +class TestToolListingExposed: + def test_tools_list_200_triggers_prb006(self): + scanner = ProbeScanner() + dispatch = _url_mock( + ("/tools/list", _resp(200)), + ("example.com", _resp(401, headers=_SECURE_HEADERS)), + ) + with patch("requests.get", side_effect=dispatch): + result = scanner.scan("http://example.com") + assert any(f.rule_id == "PRB-006" for f in result.findings) + + def test_tools_list_401_suppresses_prb006(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(401, headers=_SECURE_HEADERS)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-006" for f in result.findings) + + def test_finding_includes_tools_list_url(self): + scanner = ProbeScanner() + dispatch = _url_mock( + ("/tools/list", _resp(200)), + ("example.com", _resp(401, headers=_SECURE_HEADERS)), + ) + with patch("requests.get", side_effect=dispatch): + result = scanner.scan("http://example.com") + finding = next(f for f in result.findings if f.rule_id == "PRB-006") + assert "/tools/list" in finding.location + + +class TestVerboseErrors: + _TRACEBACK_BODY = "Traceback (most recent call last):\n File app.py, line 42\nValueError" + + def test_traceback_in_error_response_triggers_prb007(self): + scanner = ProbeScanner(safe_mode=True) + dispatch = _url_mock( + ("nonexistent-sentinel-probe", _resp(500, text=self._TRACEBACK_BODY)), + ("example.com", _resp(401, headers={**_SECURE_HEADERS, "x-ratelimit-limit": "100"})), + ) + with patch("requests.get", side_effect=dispatch): + result = scanner.scan("http://example.com") + assert any(f.rule_id == "PRB-007" for f in result.findings) + + def test_clean_error_response_suppresses_prb007(self): + scanner = ProbeScanner(safe_mode=True) + dispatch = _url_mock( + ("nonexistent-sentinel-probe", _resp(404, text="Not Found")), + ("example.com", _resp(401, headers={**_SECURE_HEADERS, "x-ratelimit-limit": "100"})), + ) + with patch("requests.get", side_effect=dispatch): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-007" for f in result.findings) + + def test_safe_mode_false_skips_check_entirely(self): + scanner = ProbeScanner(safe_mode=False) + dispatch = _url_mock( + ("nonexistent-sentinel-probe", _resp(500, text=self._TRACEBACK_BODY)), + ("example.com", _resp(401, headers={**_SECURE_HEADERS, "x-ratelimit-limit": "100"})), + ) + with patch("requests.get", side_effect=dispatch): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-007" for f in result.findings) + + +class TestRateLimiting: + def test_no_rate_limit_headers_triggers_prb008(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(401, headers=_SECURE_HEADERS)): + result = scanner.scan("http://example.com") + assert any(f.rule_id == "PRB-008" for f in result.findings) + + def test_x_ratelimit_limit_header_suppresses_prb008(self): + scanner = ProbeScanner() + headers = {**_SECURE_HEADERS, "x-ratelimit-limit": "100"} + with patch("requests.get", return_value=_resp(401, headers=headers)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-008" for f in result.findings) + + def test_retry_after_header_suppresses_prb008(self): + scanner = ProbeScanner() + headers = {**_SECURE_HEADERS, "retry-after": "60"} + with patch("requests.get", return_value=_resp(401, headers=headers)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-008" for f in result.findings) + + def test_429_response_suppresses_prb008(self): + scanner = ProbeScanner() + with patch("requests.get", return_value=_resp(429, headers=_SECURE_HEADERS)): + result = scanner.scan("http://example.com") + assert not any(f.rule_id == "PRB-008" for f in result.findings) + + +class TestEndpointUnreachable: + def test_connection_error_returns_info_finding(self): + import requests as _req + scanner = ProbeScanner() + with patch("requests.get", side_effect=_req.exceptions.ConnectionError("refused")): + result = scanner.scan("http://unreachable.example.com") + assert result.has_findings + assert any(f.rule_id == "PRB-ERR" for f in result.findings) + + def test_timeout_error_returns_info_finding(self): + import requests as _req + scanner = ProbeScanner() + with patch("requests.get", side_effect=_req.exceptions.Timeout("timed out")): + result = scanner.scan("http://slow.example.com") + assert any(f.rule_id == "PRB-ERR" for f in result.findings) + + def test_result_module_is_probe(self): + import requests as _req + scanner = ProbeScanner() + with patch("requests.get", side_effect=_req.exceptions.ConnectionError("refused")): + result = scanner.scan("http://unreachable.example.com") + assert result.module == "probe" From 45bce53fcfa75fbf7d566f5dc2e3f59801db9293 Mon Sep 17 00:00:00 2001 From: Sid J <6353716+sidnz@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:00:23 +1300 Subject: [PATCH 2/2] fix: address lint issues in integration tests --- tests/integration/test_cli.py | 9 ++++++--- tests/integration/test_probe.py | 6 ++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index beddc0b..b658768 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -1,4 +1,5 @@ """CLI integration tests""" + from __future__ import annotations import json @@ -93,8 +94,8 @@ def test_json_output_contains_expected_findings(self, runner, insecure_file): result = runner.invoke(cli, ["config", insecure_file, "--format", "json"]) data = json.loads(result.output) rule_ids = [f["rule_id"] for r in data["results"] for f in r["findings"]] - assert "CFG-001" in rule_ids - assert "CFG-002" in rule_ids + assert "CFG-001" in rule_ids + assert "CFG-002" in rule_ids def test_sarif_output_is_valid(self, runner, insecure_file): result = runner.invoke(cli, ["config", insecure_file, "--format", "sarif"]) @@ -154,7 +155,9 @@ def test_scan_json_output_includes_module_name(self, runner, insecure_file): def test_scan_writes_sarif_file(self, runner, insecure_file, tmp_path): out = str(tmp_path / "out.sarif.json") - runner.invoke(cli, ["scan", "--config", insecure_file, "--format", "sarif", "--output", out]) + runner.invoke( + cli, ["scan", "--config", insecure_file, "--format", "sarif", "--output", out] + ) assert Path(out).exists() sarif = json.loads(Path(out).read_text()) assert sarif["version"] == "2.1.0" diff --git a/tests/integration/test_probe.py b/tests/integration/test_probe.py index 1228266..9bc838e 100644 --- a/tests/integration/test_probe.py +++ b/tests/integration/test_probe.py @@ -2,8 +2,6 @@ from unittest.mock import MagicMock, patch -import pytest - from sentinel.modules.probe import ProbeScanner @@ -21,6 +19,7 @@ def dispatch(url, **kwargs): if substr in url: return resp return _resp() + return dispatch @@ -211,6 +210,7 @@ def test_429_response_suppresses_prb008(self): class TestEndpointUnreachable: def test_connection_error_returns_info_finding(self): import requests as _req + scanner = ProbeScanner() with patch("requests.get", side_effect=_req.exceptions.ConnectionError("refused")): result = scanner.scan("http://unreachable.example.com") @@ -219,6 +219,7 @@ def test_connection_error_returns_info_finding(self): def test_timeout_error_returns_info_finding(self): import requests as _req + scanner = ProbeScanner() with patch("requests.get", side_effect=_req.exceptions.Timeout("timed out")): result = scanner.scan("http://slow.example.com") @@ -226,6 +227,7 @@ def test_timeout_error_returns_info_finding(self): def test_result_module_is_probe(self): import requests as _req + scanner = ProbeScanner() with patch("requests.get", side_effect=_req.exceptions.ConnectionError("refused")): result = scanner.scan("http://unreachable.example.com")