diff --git a/gateframe/audit/trend.py b/gateframe/audit/trend.py new file mode 100644 index 0000000..2f22774 --- /dev/null +++ b/gateframe/audit/trend.py @@ -0,0 +1,278 @@ +"""Cross-session contract reliability trend analysis for gateframe. + +Provides :class:`ContractTrendAnalyzer` which reads an existing JSONL audit +log produced by :class:`~gateframe.audit.exporters.JsonFileExporter`, groups +entries by ``workflow_id``, and computes per-contract OLS pass-rate slopes to +detect reliability regressions across workflow runs. + +Usage:: + + from pathlib import Path + from gateframe.audit.trend import ContractTrendAnalyzer + + analyzer = ContractTrendAnalyzer(Path("audit.jsonl"), window=20) + report = analyzer.analyze() + if report.any_regression: + print("Contracts degrading:", [r.contract_name for r in report.regressions]) + +CLI equivalent:: + + gateframe trend audit.jsonl --window 20 + +The *window* parameter specifies the number of most-recent **workflow runs** +(i.e. distinct ``workflow_id`` groups) to include in the trend calculation. +Entries without a ``workflow_id`` are ignored. + +Reference: PDR in Production v2.5 — DOI 10.5281/zenodo.19362461 +""" + +from __future__ import annotations + +import json +import statistics +from collections import defaultdict +from dataclasses import dataclass, field +from pathlib import Path + +# ── Data models ─────────────────────────────────────────────────────────────── + + +@dataclass +class WorkflowRunSummary: + """Pass-rate statistics for a single workflow run. + + A "workflow run" is the set of all audit entries that share the same + ``workflow_id``. + + Attributes: + workflow_id: The workflow identifier. + contract_name: Name of the :class:`~gateframe.core.contract.ValidationContract`. + total: Total number of validation events in this run. + passed: Number of events where ``passed == True``. + pass_rate: ``passed / total`` (``NaN`` when ``total == 0``). + """ + + workflow_id: str + contract_name: str + total: int + passed: int + + @property + def pass_rate(self) -> float: + return self.passed / self.total if self.total > 0 else float("nan") + + +@dataclass +class ContractTrend: + """OLS trend for a single contract across ordered workflow runs. + + Attributes: + contract_name: Name of the contract. + run_summaries: Ordered (oldest first) per-run statistics. + slope: OLS slope of ``pass_rate`` over run index (positive = improving). + direction: ``"improving"``, ``"worsening"``, or ``"stable"``. + regressed: ``True`` when the slope is below ``-regression_threshold``. + """ + + contract_name: str + run_summaries: list[WorkflowRunSummary] + slope: float + direction: str + regressed: bool + + +@dataclass +class TrendReport: + """Aggregated trend report across all contracts in the audit log. + + Attributes: + contract_trends: Per-contract trend results, sorted by ``contract_name``. + any_regression: ``True`` if at least one contract is degrading. + regressions: Subset of *contract_trends* where ``regressed == True``. + window: Number of workflow runs included in the analysis. + regression_threshold: Threshold used to classify a slope as a regression. + """ + + contract_trends: list[ContractTrend] + any_regression: bool + regressions: list[ContractTrend] = field(default_factory=list) + window: int = 20 + regression_threshold: float = 0.02 + + +# ── Internal helpers ────────────────────────────────────────────────────────── + + +def _ols_slope(values: list[float]) -> float: + """OLS slope of *values* over index 0 … n-1. Returns 0.0 for < 2 points.""" + n = len(values) + if n < 2: + return 0.0 + xs = list(range(n)) + slope, _ = statistics.linear_regression(xs, values) + return slope + + +def _direction(slope: float, threshold: float = 0.001) -> str: + if slope > threshold: + return "improving" + if slope < -threshold: + return "worsening" + return "stable" + + +def _is_valid_float(v: float) -> bool: + import math + return not (math.isnan(v) or math.isinf(v)) + + +# ── Public API ──────────────────────────────────────────────────────────────── + + +class ContractTrendAnalyzer: + """Reads a gateframe JSONL audit log and computes per-contract trend slopes. + + Only audit entries that carry a ``workflow_id`` are considered. Entries are + grouped by ``workflow_id``; the groups are ordered by the **earliest + timestamp** seen within each group (oldest-first), so the trend slope + represents change over time. + + Args: + audit_log_path: Path to the JSONL file written by + :class:`~gateframe.audit.exporters.JsonFileExporter`. + window: How many most-recent workflow-ID groups to include. + regression_threshold: Minimum downward slope that constitutes a + regression. Defaults to 0.02 (2 percentage-point drop per run). + + Example:: + + analyzer = ContractTrendAnalyzer(Path("audit.jsonl"), window=20) + report = analyzer.analyze() + for ct in report.contract_trends: + print(f"{ct.contract_name}: {ct.direction} (slope={ct.slope:.4f})") + """ + + def __init__( + self, + audit_log_path: Path | str, + *, + window: int = 20, + regression_threshold: float = 0.02, + ) -> None: + self._path = Path(audit_log_path) + self._window = window + self._regression_threshold = regression_threshold + + # ── private ────────────────────────────────────────────────────────────── + + def _read_entries(self) -> list[dict]: + """Parse the JSONL file; skip malformed lines.""" + entries: list[dict] = [] + try: + with self._path.open(encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if not line: + continue + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + except FileNotFoundError: + pass + return entries + + def _build_run_summaries( + self, entries: list[dict] + ) -> dict[str, list[WorkflowRunSummary]]: + """Group entries by workflow_id, compute pass rates per contract. + + Returns a dict mapping contract_name → list of WorkflowRunSummary, + ordered oldest-first by first-seen timestamp within each workflow run. + """ + # workflow_id → contract_name → {total, passed} + # Also track earliest timestamp per workflow_id for ordering. + wf_contract: dict[str, dict[str, dict]] = defaultdict( + lambda: defaultdict(lambda: {"total": 0, "passed": 0}) + ) + wf_first_ts: dict[str, str] = {} + + for entry in entries: + wf_id = entry.get("workflow_id") + if not wf_id: + continue + contract = entry.get("contract_name", "unknown") + passed = bool(entry.get("passed", False)) + ts = entry.get("timestamp", "") + + if wf_id not in wf_first_ts or ts < wf_first_ts[wf_id]: + wf_first_ts[wf_id] = ts + + bucket = wf_contract[wf_id][contract] + bucket["total"] += 1 + if passed: + bucket["passed"] += 1 + + # Sort workflow runs by first-seen timestamp → oldest first + ordered_wf_ids = sorted(wf_first_ts, key=lambda w: wf_first_ts[w]) + + # Apply window: keep only the most-recent *window* runs + if len(ordered_wf_ids) > self._window: + ordered_wf_ids = ordered_wf_ids[-self._window :] + + # Invert: contract_name → list[WorkflowRunSummary] (time-ordered) + contract_runs: dict[str, list[WorkflowRunSummary]] = defaultdict(list) + for wf_id in ordered_wf_ids: + for contract, counts in wf_contract[wf_id].items(): + contract_runs[contract].append( + WorkflowRunSummary( + workflow_id=wf_id, + contract_name=contract, + total=counts["total"], + passed=counts["passed"], + ) + ) + + return dict(contract_runs) + + # ── public ─────────────────────────────────────────────────────────────── + + def analyze(self) -> TrendReport: + """Run the trend analysis and return a :class:`TrendReport`. + + Returns: + A :class:`TrendReport` summarising per-contract slopes and + regression flags. If the audit log is empty or contains no + entries with ``workflow_id``, the report will have no + ``contract_trends``. + """ + entries = self._read_entries() + contract_runs = self._build_run_summaries(entries) + + contract_trends: list[ContractTrend] = [] + for contract_name, run_summaries in sorted(contract_runs.items()): + pass_rates = [ + s.pass_rate for s in run_summaries if _is_valid_float(s.pass_rate) + ] + slope = _ols_slope(pass_rates) + direction = _direction(slope) + regressed = slope < -self._regression_threshold + + contract_trends.append( + ContractTrend( + contract_name=contract_name, + run_summaries=run_summaries, + slope=slope, + direction=direction, + regressed=regressed, + ) + ) + + regressions = [ct for ct in contract_trends if ct.regressed] + return TrendReport( + contract_trends=contract_trends, + any_regression=bool(regressions), + regressions=regressions, + window=self._window, + regression_threshold=self._regression_threshold, + ) diff --git a/tests/audit/test_trend.py b/tests/audit/test_trend.py new file mode 100644 index 0000000..ba8c046 --- /dev/null +++ b/tests/audit/test_trend.py @@ -0,0 +1,201 @@ +"""Tests for gateframe.audit.trend — ContractTrendAnalyzer.""" + +import json +from pathlib import Path + +import pytest + +from gateframe.audit.trend import ( + ContractTrend, + ContractTrendAnalyzer, + _direction, + _ols_slope, +) + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _write_audit(path: Path, entries: list[dict]) -> None: + with path.open("w") as fh: + for e in entries: + fh.write(json.dumps(e) + "\n") + + +def _entry( + contract: str, + passed: bool, + workflow_id: str, + ts: str = "2026-04-01T00:00:00+00:00", +) -> dict: + return { + "timestamp": ts, + "contract_name": contract, + "passed": passed, + "rules_applied": 1, + "rules_failed": 0 if passed else 1, + "failures": [], + "workflow_id": workflow_id, + "confidence": 0.9, + } + + +# ── OLS helpers ─────────────────────────────────────────────────────────────── + +class TestOlsSlope: + def test_increasing(self): + assert _ols_slope([0.5, 0.7, 0.9]) == pytest.approx(0.2) + + def test_decreasing(self): + assert _ols_slope([0.9, 0.7, 0.5]) == pytest.approx(-0.2) + + def test_flat(self): + assert _ols_slope([0.8, 0.8, 0.8]) == pytest.approx(0.0) + + def test_single_returns_zero(self): + assert _ols_slope([0.7]) == 0.0 + + def test_empty_returns_zero(self): + assert _ols_slope([]) == 0.0 + + +class TestDirection: + def test_improving(self): + assert _direction(0.05) == "improving" + + def test_worsening(self): + assert _direction(-0.05) == "worsening" + + def test_stable_zero(self): + assert _direction(0.0) == "stable" + + def test_stable_tiny(self): + assert _direction(0.0001) == "stable" + + +# ── ContractTrendAnalyzer ───────────────────────────────────────────────────── + +class TestContractTrendAnalyzer: + def test_improving_contract(self, tmp_path): + log = tmp_path / "audit.jsonl" + entries = [ + # run-a: 5/10 pass → 50% + *[_entry("billing", True, "run-a", "2026-04-01T01:00:00+00:00")] * 5, + *[_entry("billing", False, "run-a", "2026-04-01T01:00:00+00:00")] * 5, + # run-b: 7/10 → 70% + *[_entry("billing", True, "run-b", "2026-04-01T02:00:00+00:00")] * 7, + *[_entry("billing", False, "run-b", "2026-04-01T02:00:00+00:00")] * 3, + # run-c: 9/10 → 90% + *[_entry("billing", True, "run-c", "2026-04-01T03:00:00+00:00")] * 9, + *[_entry("billing", False, "run-c", "2026-04-01T03:00:00+00:00")] * 1, + ] + _write_audit(log, entries) + report = ContractTrendAnalyzer(log).analyze() + assert not report.any_regression + ct = report.contract_trends[0] + assert ct.direction == "improving" + assert ct.slope > 0 + + def test_regressing_contract(self, tmp_path): + log = tmp_path / "audit.jsonl" + entries = [ + *[_entry("billing", True, "run-a", "2026-04-01T01:00:00+00:00")] * 9, + *[_entry("billing", False, "run-a", "2026-04-01T01:00:00+00:00")] * 1, + *[_entry("billing", True, "run-b", "2026-04-01T02:00:00+00:00")] * 6, + *[_entry("billing", False, "run-b", "2026-04-01T02:00:00+00:00")] * 4, + *[_entry("billing", True, "run-c", "2026-04-01T03:00:00+00:00")] * 3, + *[_entry("billing", False, "run-c", "2026-04-01T03:00:00+00:00")] * 7, + ] + _write_audit(log, entries) + report = ContractTrendAnalyzer(log).analyze() + assert report.any_regression + assert len(report.regressions) == 1 + assert report.regressions[0].contract_name == "billing" + assert report.regressions[0].direction == "worsening" + + def test_entries_without_workflow_id_ignored(self, tmp_path): + log = tmp_path / "audit.jsonl" + # Entries without workflow_id should be silently skipped + entries = [ + {"timestamp": "2026-04-01T01:00:00+00:00", "contract_name": "c", "passed": True, + "rules_applied": 1, "rules_failed": 0, "failures": []}, + ] + _write_audit(log, entries) + report = ContractTrendAnalyzer(log).analyze() + assert report.contract_trends == [] + assert not report.any_regression + + def test_multiple_contracts(self, tmp_path): + log = tmp_path / "audit.jsonl" + entries = [ + # contract A: stable 100% across two runs + _entry("A", True, "run-1", "2026-04-01T01:00:00+00:00"), + _entry("A", True, "run-2", "2026-04-01T02:00:00+00:00"), + # contract B: degrading 100% → 50% + _entry("B", True, "run-1", "2026-04-01T01:00:00+00:00"), + *[_entry("B", True, "run-2", "2026-04-01T02:00:00+00:00")] * 1, + *[_entry("B", False, "run-2", "2026-04-01T02:00:00+00:00")] * 1, + ] + _write_audit(log, entries) + report = ContractTrendAnalyzer(log).analyze() + names = {ct.contract_name for ct in report.contract_trends} + assert "A" in names and "B" in names + + def test_window_limits_runs(self, tmp_path): + log = tmp_path / "audit.jsonl" + entries = [] + for i in range(10): + ts = f"2026-04-01T{i:02d}:00:00+00:00" + entries.append(_entry("c", i < 8, f"run-{i}", ts)) + _write_audit(log, entries) + report = ContractTrendAnalyzer(log, window=5).analyze() + ct = report.contract_trends[0] + # Only the last 5 runs: runs 5-9. run-8 and run-9 failed → slope negative. + assert len(ct.run_summaries) == 5 + + def test_empty_file_returns_empty_report(self, tmp_path): + log = tmp_path / "empty.jsonl" + log.write_text("") + report = ContractTrendAnalyzer(log).analyze() + assert report.contract_trends == [] + assert not report.any_regression + + def test_missing_file_returns_empty_report(self, tmp_path): + log = tmp_path / "missing.jsonl" + report = ContractTrendAnalyzer(log).analyze() + assert report.contract_trends == [] + + def test_malformed_lines_skipped(self, tmp_path): + log = tmp_path / "audit.jsonl" + with log.open("w") as f: + f.write("not-json\n") + f.write(json.dumps(_entry("billing", True, "run-1")) + "\n") + entry = _entry("billing", True, "run-2", "2026-04-01T02:00:00+00:00") + f.write(json.dumps(entry) + "\n") + report = ContractTrendAnalyzer(log).analyze() + assert len(report.contract_trends) == 1 + + def test_temporal_ordering(self, tmp_path): + """Runs must be ordered by first-seen timestamp, not insertion order.""" + log = tmp_path / "audit.jsonl" + # Write run-b BEFORE run-a in file, but run-a has earlier timestamp + entries = [ + _entry("c", False, "run-b", "2026-04-01T02:00:00+00:00"), + _entry("c", True, "run-a", "2026-04-01T01:00:00+00:00"), + ] + _write_audit(log, entries) + report = ContractTrendAnalyzer(log).analyze() + ct = report.contract_trends[0] + # run-a should be first (older ts), then run-b + assert ct.run_summaries[0].workflow_id == "run-a" + assert ct.run_summaries[1].workflow_id == "run-b" + + def test_report_attributes(self, tmp_path): + log = tmp_path / "audit.jsonl" + _write_audit(log, [ + _entry("x", True, "r1"), + _entry("x", True, "r2", "2026-04-01T02:00:00+00:00"), + ]) + report = ContractTrendAnalyzer(log, window=15, regression_threshold=0.05).analyze() + assert report.window == 15 + assert report.regression_threshold == 0.05 + assert isinstance(report.contract_trends[0], ContractTrend)