diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc6a29923..a7b795f04 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -153,11 +153,17 @@ jobs: run: apm audit --ci # Gate B: regeneration drift (producer-side). + # NOTE: Once `apm-action` ships a CLI version that includes the + # default-on `apm audit` drift detection (issue #1071), this entire + # step becomes redundant -- Gate A above already catches the same + # divergence via install-replay. Keep this bash check until then as + # a defense-in-depth fallback. + # # The action's `apm install` step re-integrated local .apm/ into # .github/ via target auto-detection. If anything in the governed # integration directories changed, someone edited the regenerated # output without updating the canonical .apm/ source. - - name: Check APM integration drift + - name: Check APM integration drift (legacy bash fallback, see #1071) run: | if [ -n "$(git status --porcelain -- .github/ .claude/ .cursor/ .opencode/)" ]; then echo "::error::APM integration files are out of date." diff --git a/CHANGELOG.md b/CHANGELOG.md index d4ae1b3be..55a672bca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`apm audit` now catches forgotten installs and hand-edits by default.** No more shipping stale `.github/instructions/` because someone forgot to re-run `apm install`, no more silent hand-edits to regenerated content. Opt out with `--no-drift`. See the [Drift Detection guide](https://danielmeppiel.github.io/awd-cli/guides/drift-detection/). (#1137, closes #1071, supersedes scope of #898) + ### Fixed - **Parallel subdir install race.** `apm install` no longer intermittently fails with `RuntimeError: Subdirectory '' not found in repository` when multiple dependencies resolve to different subdirectories of the same `repo@ref`. The shared clone cache now stores subdir-agnostic bare clones and each consumer materializes its own working tree (mirrors the WS3 `GitCache` pattern). (#1135, fixes #1126) diff --git a/README.md b/README.md index 68b2bab9c..a3d934993 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,7 @@ Agent context is executable in effect — a prompt is a program for an LLM. APM - **[Content security](https://microsoft.github.io/apm/enterprise/security/)** — `apm install` blocks compromised packages before agents read them; `apm audit` runs the same checks on demand - **[Lockfile integrity](https://microsoft.github.io/apm/enterprise/governance/)** — `apm.lock` records resolved sources and content hashes for full provenance +- **[Drift detection](https://microsoft.github.io/apm/guides/drift-detection/)** — `apm audit` rebuilds your agent context in scratch and diffs it against your working tree to catch hand-edits before they ship - **[MCP trust boundaries](https://microsoft.github.io/apm/guides/mcp-servers/)** — transitive MCP servers require explicit consent ### 3. Governed by policy diff --git a/docs/src/content/docs/enterprise/governance-guide.md b/docs/src/content/docs/enterprise/governance-guide.md index 2f8bf8450..1cc3162ec 100644 --- a/docs/src/content/docs/enterprise/governance-guide.md +++ b/docs/src/content/docs/enterprise/governance-guide.md @@ -274,8 +274,8 @@ This is the certitude section. Read it twice if you are deciding whether `apm au | Surface | What it bypasses LOCALLY | What it CANNOT bypass | Reviewable in | |---|---|---|---| -| `apm install --no-policy` | All 17 policy checks at install (incl. transitive MCP, hash pin) | The 7 baseline checks in `apm audit --ci` | git diff of `apm.lock.yaml` in PR | -| `APM_POLICY_DISABLE=1` env | Same as `--no-policy` plus the 17 audit policy checks | The 7 baseline checks in `apm audit --ci` | PR diff; CI env vars in Actions logs | +| `apm install --no-policy` | All 17 policy checks at install (incl. transitive MCP, hash pin) | The 7 baseline checks plus integration drift detection in `apm audit --ci` | git diff of `apm.lock.yaml` in PR | +| `APM_POLICY_DISABLE=1` env | Same as `--no-policy` plus the 17 audit policy checks | The 7 baseline checks plus integration drift detection in `apm audit --ci` | PR diff; CI env vars in Actions logs | | Manual edit to `apm.lock.yaml` | Nothing; install regenerates the file each run | Audit baseline `ref-consistency` and `deployed-files-present` | git diff | | Manual edit to deployed file post-install | Local file content until next audit | Audit baseline `content-integrity` (re-hashes deployed files); hidden-Unicode scan in `apm audit` content mode | git diff of the deployed file in PR | | Direct `git clone` of an APM package, bypassing install | Everything; nothing detects out-of-band file drops | Audit baseline `no-orphaned-packages` and audit-only `unmanaged-files` | git diff | @@ -287,7 +287,7 @@ This is the certitude section. Read it twice if you are deciding whether `apm au Notes on specific rows: - **`apm install --no-policy`** also bypasses the `apm install --mcp` preflight, the transitive-MCP preflight, and any project-side `policy.hash` pin. -- **`APM_POLICY_DISABLE=1`** short-circuits discovery to `outcome="disabled"` everywhere -- including `apm audit --ci`, where the 17 policy checks are skipped (the 7 baseline checks still run). +- **`APM_POLICY_DISABLE=1`** short-circuits discovery to `outcome="disabled"` everywhere -- including `apm audit --ci`, where the 17 policy checks are skipped (the 7 baseline checks and integration drift detection still run). - **Manual lockfile edits**: `content_hash` mismatch on registry-proxy deps is caught at the next install when downloads resume. - **Direct `git clone`**: `unmanaged-files` only flags governed dirs and only when configured to `warn` / `deny`. - **Fork-to-personal-org**: discovery resolves via `git remote get-url origin`; branch protection on the upstream repo is the trust boundary. diff --git a/docs/src/content/docs/getting-started/quick-start.md b/docs/src/content/docs/getting-started/quick-start.md index ca0c780f1..4e1010ac3 100644 --- a/docs/src/content/docs/getting-started/quick-start.md +++ b/docs/src/content/docs/getting-started/quick-start.md @@ -156,7 +156,7 @@ apm install github/awesome-copilot/skills/review-and-refactor - `apm_modules/` -- add to `.gitignore`. Rebuilt from the lockfile on install. :::tip[Keeping deployed files in sync] -When you update `apm.yml`, re-run `apm install` and commit the changed `.github/`, `.claude/`, `.cursor/`, and `.gemini/` files. A [CI drift check](../../integrations/ci-cd/#verify-deployed-primitives) catches stale files automatically. +When you update `apm.yml`, re-run `apm install` and commit the changed `.github/`, `.claude/`, `.cursor/`, and `.gemini/` files. A [CI drift check](../../guides/drift-detection/) catches stale files automatically. ::: :::note[Using Codex or Gemini?] diff --git a/docs/src/content/docs/guides/drift-detection.md b/docs/src/content/docs/guides/drift-detection.md new file mode 100644 index 000000000..e7b9961a3 --- /dev/null +++ b/docs/src/content/docs/guides/drift-detection.md @@ -0,0 +1,152 @@ +--- +title: Drift Detection +sidebar: + order: 7 +--- + +`apm audit` runs **drift detection by default** so a stale working tree +cannot ship to production unnoticed. This page explains what drift means, +how the check works, and the escape hatch when you need to disable it. + +## Try it now + +```bash +cd +apm audit +``` + +If you have any `.apm/` sources or installed dependencies, the audit +will replay your install into a scratch tmpdir and report any drift. +No writes to your working tree, no network, no MCP calls. + +Common first-run results: + +- **Clean tree, no drift** -- exit 0, no output beyond the standard + audit summary. +- **Forgot to re-run `apm install`** -- drift findings under kind + `unintegrated` for every `.apm/` source whose deployed counterpart + is missing. +- **First run on a pre-marker cache** -- a one-line warning asking + you to run `apm install` once so cache pin markers are written. + +## What is integration drift? + +Integration drift is any divergence between what `apm install` would +deploy from your locked dependencies and what is actually on disk. +Three kinds matter: + +| Kind | Meaning | Typical cause | +|---|---|---| +| `unintegrated` | A `.apm/` source file is committed but its deployed counterpart is missing | Forgot to re-run `apm install` after adding/editing local primitives | +| `modified` | A deployed file's content differs from what install would produce | Hand-edit to a regenerated file under `.github/`, `.claude/`, `.cursor/`, etc. | +| `orphaned` | A deployed file exists with no current source backing it | Removed a dependency or local primitive without re-running install | + +All three previously required ad-hoc `git status --porcelain` scripts in +CI to detect. With drift detection, `apm audit` catches every case in +one read-only command -- nothing in your project, lockfile, or +`apm_modules/` is mutated. + +## How it works + +```mermaid +flowchart LR + A[apm audit] --> B[Read apm.lock.yaml
+ cache contents] + B --> C[Replay install
into scratch tmpdir] + C --> D[Diff scratch tree
vs project] + D --> E[Render findings
text / JSON / SARIF] +``` + +The replay is **cache-only** -- no network, no git fetch, no MCP +registry call. It will fail fast with `CacheMissError` if the lockfile +references content not present in the persistent cache (run +`apm install` once first). + +False-positive guards normalize: + +- Build-ID lines (e.g. APM-generated `` markers). +- CRLF -> LF line endings (Windows checkouts of LF-canonical sources). +- UTF-8 BOM byte-order marks. + +## Default behaviour and exit codes + +| Mode | Drift findings | Exit code | +|---|---|---| +| `apm audit` | Reported in stdout | 0 (advisory only) | +| `apm audit --ci` | Reported and counted as failure | 1 | +| `apm audit --no-drift` | Skipped entirely | governed only by other checks | + +In `--ci` mode drift findings are pooled with the seven baseline lockfile +checks (`lockfile-exists`, `ref-consistency`, etc.) plus integration +drift detection -- a single non-zero exit covers all of them. + +## When to use `--no-drift` + +The escape hatch exists for two legitimate cases: + +1. **Tight inner loops** where you intentionally have local edits and + just want a content-only safety scan (`apm audit --no-drift -v`). +2. **Performance budgets** in matrix CI where you've already covered + drift in a single non-matrix job upstream. + +Drift detection is also auto-skipped when `--strip` or `--file` is used; +both target a single payload and have nothing to diff against. Combining +`--no-drift` with `--strip` or `--file` is rejected with a usage error +(rather than silently picking one). + +## Output formats + +**Text (TTY default)** -- color-coded, one finding per line, grouped by kind. + +**JSON** -- the audit report gains a top-level `drift` key: + +```json +{ + "report_format_version": "1.0", + "checks": [...], + "drift": [ + { + "path": ".github/instructions/foo.md", + "kind": "modified", + "package": "", + "inline_diff": "..." + } + ] +} +``` + +**SARIF** -- findings are appended to `runs[0].results` with rule IDs +`apm/drift/modified`, `apm/drift/unintegrated`, `apm/drift/orphaned`, +ready to surface in GitHub code-scanning. + +## CI integration + +The recommended CI gate is now a single line: + +```yaml +- run: apm audit --ci +``` + +### Before vs after: the legacy bash workaround + +Previously CI pipelines had to grep `git status` to catch un-installed +or hand-edited deployed files. That workaround is no longer needed: + +```yaml +# Legacy -- no longer needed once apm-action ships with drift support +- run: | + if [ -n "$(git status --porcelain -- .github/ .claude/ .cursor/)" ]; then + exit 1 + fi +``` + +`apm audit --ci` subsumes this entirely AND catches three additional +classes of drift the bash workaround missed (`unintegrated` of source +files never integrated, `orphaned` files left behind by removed +dependencies, and `modified` files normalized for build-id / +line-ending / BOM noise). + +For org-policy enforcement, combine with `--policy org` -- drift +detection composes orthogonally with the 17 audit-only policy checks. + +See also: [CI Policy Enforcement](../ci-policy-setup/), +[Governance Guide](../../enterprise/governance-guide/). diff --git a/docs/src/content/docs/integrations/ci-cd.md b/docs/src/content/docs/integrations/ci-cd.md index a8642ae8f..ad712be6d 100644 --- a/docs/src/content/docs/integrations/ci-cd.md +++ b/docs/src/content/docs/integrations/ci-cd.md @@ -60,22 +60,23 @@ This step is not needed if your team only uses GitHub Copilot and Claude, which ### Verify Deployed Primitives -To ensure `.github/`, `.claude/`, `.cursor/`, `.opencode/`, and `.gemini/` integration files stay in sync with `apm.yml`, add a drift check: +`apm audit --ci` catches integration drift by default -- no separate +`git status` step required: ```yaml - - name: Check APM integration drift - run: | - apm install - if [ -n "$(git status --porcelain -- .github/ .claude/ .cursor/ .opencode/ .gemini/)" ]; then - echo "APM integration files are out of date. Run 'apm install' and commit." - exit 1 - fi + - name: Audit + drift check + run: apm audit --ci ``` -This catches cases where a developer updates `apm.yml` but forgets to re-run `apm install`. +This single command runs the seven baseline lockfile checks PLUS integration +drift detection (default-on) AND replays +the install pipeline into a scratch tree to detect missed `apm install` +runs, hand-edited deployed files, and orphaned files. See the +[Drift Detection guide](../../guides/drift-detection/) for details and +opt-out (`--no-drift`). :::tip[We dogfood this] -APM's own repo uses the `APM Self-Check` job in [`microsoft/apm`'s `ci.yml`](https://github.com/microsoft/apm/blob/main/.github/workflows/ci.yml) as a reference implementation for installing APM, running CI validation commands such as `apm audit --ci`, and checking for drift with `git status --porcelain`. Use it as a practical example when wiring these checks into your own workflow. +APM's own repo uses the `APM Self-Check` job in [`microsoft/apm`'s `ci.yml`](https://github.com/microsoft/apm/blob/main/.github/workflows/ci.yml) as a reference implementation for installing APM and running `apm audit --ci`. Use it as a practical example when wiring these checks into your own workflow. ::: ## Azure Pipelines @@ -147,7 +148,7 @@ apm install ## Governance with `apm audit` -`apm audit --ci` verifies lockfile consistency in CI (7 baseline checks, no configuration). Add `--policy org` to enforce organizational rules (17 additional checks). For full setup including SARIF integration and GitHub Code Scanning, see the [CI Policy Enforcement guide](../../guides/ci-policy-setup/). +`apm audit --ci` verifies lockfile consistency in CI (7 baseline checks plus integration drift detection, no configuration). Add `--policy org` to enforce organizational rules (17 additional checks). For full setup including SARIF integration and GitHub Code Scanning, see the [CI Policy Enforcement guide](../../guides/ci-policy-setup/). For content scanning and hidden Unicode detection, `apm install` automatically blocks critical findings. Run `apm audit` for on-demand reporting. See [Governance](../../enterprise/governance-guide/) for the full governance model. diff --git a/docs/src/content/docs/reference/cli-commands.md b/docs/src/content/docs/reference/cli-commands.md index 27256443d..2360a9a95 100644 --- a/docs/src/content/docs/reference/cli-commands.md +++ b/docs/src/content/docs/reference/cli-commands.md @@ -459,11 +459,12 @@ apm audit [PACKAGE] [OPTIONS] - `-v, --verbose` - Show info-level findings and file details - `-f, --format [text|json|sarif|markdown]` - Output format: `text` (default), `json` (machine-readable), `sarif` (GitHub Code Scanning), `markdown` (step summaries). Cannot be combined with `--strip` or `--dry-run`. - `-o, --output PATH` - Write report to file. Auto-detects format from extension (`.sarif`, `.sarif.json` → SARIF; `.json` → JSON; `.md` → Markdown) when `--format` is not specified. -- `--ci` - Run lockfile consistency checks for CI/CD gates. Exit 0 if clean, 1 if violations found. Auto-discovers org policy from the org `.github` repo unless `--no-policy` is set. Runs the 7 baseline checks: lockfile presence, ref consistency, deployed files present, no orphaned packages, MCP config consistency, content integrity (Unicode + hash drift on every deployed file including local content), includes consent (advisory). +- `--ci` - Run lockfile consistency checks for CI/CD gates. Exit 0 if clean, 1 if violations found. Auto-discovers org policy from the org `.github` repo unless `--no-policy` is set. Runs the 7 baseline checks: lockfile presence, ref consistency, deployed files present, no orphaned packages, MCP config consistency, content integrity (Unicode + hash drift on every deployed file including local content), includes consent (advisory). Integration drift detection runs by default alongside the baseline checks and contributes to the exit code (use `--no-drift` to opt out). - `--policy SOURCE` - *(Experimental)* Policy source. Accepts: `org` (auto-discover from your project's git remote), `owner/repo` (defaults to github.com), an `https://` URL, or a local file path. Used with `--ci` for policy checks. Without this flag, `--ci` auto-discovers. - `--no-policy` - Skip policy discovery and enforcement entirely. Equivalent to `APM_POLICY_DISABLE=1`. - `--no-cache` - Force fresh policy fetch (skip cache). Only relevant with policy discovery active. - `--no-fail-fast` - Run all checks even after a failure. By default, CI mode stops at the first failing check to save time. +- `--no-drift` - Skip integration drift detection. Drift detection is on by default (whole-project audit only) and replays the install pipeline into a scratch tree to catch missed `apm install` runs, hand-edited deployed files, and orphaned files. Mutually exclusive with `--strip`/`--file`. See the [Drift Detection guide](../../guides/drift-detection/). **Examples:** ```bash diff --git a/packages/apm-guide/.apm/skills/apm-usage/commands.md b/packages/apm-guide/.apm/skills/apm-usage/commands.md index ff14e95c7..74e769106 100644 --- a/packages/apm-guide/.apm/skills/apm-usage/commands.md +++ b/packages/apm-guide/.apm/skills/apm-usage/commands.md @@ -43,7 +43,9 @@ | Command | Purpose | Key flags | |---------|---------|-----------| -| `apm audit [PKG]` | Scan for security issues | `--file PATH`, `--strip`, `--dry-run`, `-v`, `-f [text\|json\|sarif\|md]`, `-o PATH`, `--ci`, `--policy SOURCE`, `--no-cache`, `--no-fail-fast` | +| `apm audit [PKG]` | Scan for security issues + detect integration drift | `--file PATH`, `--strip`, `--dry-run`, `-v`, `-f [text\|json\|sarif\|md]`, `-o PATH`, `--ci`, `--policy SOURCE`, `--no-cache`, `--no-fail-fast`, `--no-drift` | + +`apm audit` runs **drift detection by default** (issue #1071). It replays `apm install` cache-only into a temporary scratch tree and diffs the result against your working tree. Catches three failure modes: (1) `.apm/` source added without re-running `apm install`, (2) hand-edits to deployed files that diverge from canonical source, (3) orphan files left after their source was removed. The scan is read-only -- never writes to your project, lockfile, or `apm_modules/`. Build IDs, CRLF line endings, and BOMs are normalized away so they cannot trigger false positives. Use `--no-drift` to opt out (e.g. fast inner loops); the flag is mutually exclusive with `--strip`/`--file`. In `--ci` mode drift findings produce exit code 1 alongside the seven baseline lockfile checks. Drift output is integrated into JSON (top-level `drift` key) and SARIF (rule IDs `apm/drift/` where kind is `modified`/`unintegrated`/`orphaned`). ## Distribution diff --git a/scripts/test-integration.sh b/scripts/test-integration.sh index a8afe07bb..bf2be3274 100755 --- a/scripts/test-integration.sh +++ b/scripts/test-integration.sh @@ -524,6 +524,34 @@ run_e2e_tests() { exit 1 fi + # Run drift-detection integration tests -- offline, no tokens needed + # Guards `apm audit` drift replay (Phase D) across all 9 drift cases, + # multi-target, --no-drift opt-out, and false-positive guards + # (CRLF, BOM, Build ID line). Pinning these tests prevents silent + # regression of the drift contract. + log_info "Running drift detection integration tests..." + echo "Command: pytest tests/integration/test_drift_check.py -v -s --tb=short" + + if pytest tests/integration/test_drift_check.py -v -s --tb=short; then + log_success "Drift detection integration tests passed!" + else + log_error "Drift detection integration tests failed!" + exit 1 + fi + + # Run drift-detection E2E tests -- offline, no tokens needed + # Verifies the no-write contract, air-gap proof, performance smoke, + # and JSON/SARIF output shapes for the `apm audit` drift surface. + log_info "Running drift detection E2E tests..." + echo "Command: pytest tests/integration/test_drift_check_e2e.py -v -s --tb=short" + + if pytest tests/integration/test_drift_check_e2e.py -v -s --tb=short; then + log_success "Drift detection E2E tests passed!" + else + log_error "Drift detection E2E tests failed!" + exit 1 + fi + log_success "All integration test suites completed successfully!" diff --git a/src/apm_cli/commands/audit.py b/src/apm_cli/commands/audit.py index 6e75d063a..99d3bf8c7 100644 --- a/src/apm_cli/commands/audit.py +++ b/src/apm_cli/commands/audit.py @@ -19,7 +19,7 @@ import click from ..core.command_logger import CommandLogger -from ..deps.lockfile import LockFile, get_lockfile_path # noqa: F401 +from ..deps.lockfile import LockFile, get_lockfile_path from ..policy._help_text import POLICY_SOURCE_FORMS_HELP from ..security.content_scanner import ContentScanner, ScanFinding from ..security.file_scanner import scan_lockfile_packages @@ -398,16 +398,17 @@ def _audit_ci_gate( no_cache: bool, no_policy: bool, no_fail_fast: bool, + no_drift: bool = False, ) -> None: """Handle ``apm audit --ci`` -- lockfile consistency gate. - Runs baseline lockfile checks and (optionally) org-policy checks, - then emits a structured report and exits with 0 (clean) or 1 - (violations). + Runs baseline lockfile checks, drift detection (unless ``--no-drift``), + and (optionally) org-policy checks, then emits a structured report + and exits with 0 (clean) or 1 (violations). """ logger = cfg.logger - from ..policy.ci_checks import run_baseline_checks + from ..policy.ci_checks import _check_drift, run_baseline_checks from ..policy.policy_checks import run_policy_checks fail_fast = not no_fail_fast @@ -484,6 +485,31 @@ def _audit_ci_gate( ) ) + # -- Drift detection (default-on per ADR-02) -------------------- + drift_findings: list = [] + if not no_drift and (cfg.project_root / "apm.yml").exists(): + from ..deps.lockfile import LockFile, get_lockfile_path + + lockfile_path = get_lockfile_path(cfg.project_root) + if lockfile_path.exists(): + lockfile = LockFile.read(lockfile_path) + if lockfile is not None: + drift_check, drift_findings = _check_drift( + cfg.project_root, + lockfile, + cache_only=True, + verbose=cfg.verbose, + ) + ci_result.checks.append(drift_check) + elif no_drift and cfg.output_format == "text": + # In structured output (json/sarif), --no-drift is implicit from + # the absence of the drift check entry; no need to pollute output. + click.echo( + f"{STATUS_SYMBOLS['warning']} drift detection skipped (--no-drift); " + "coverage reduced -- hand-edits and missing integrations will not be caught", + err=True, + ) + # Resolve effective format effective_format = cfg.output_format if cfg.output_path and effective_format == "text": @@ -494,7 +520,17 @@ def _audit_ci_gate( if effective_format in ("json", "sarif"): import json as _json - payload = ci_result.to_sarif() if effective_format == "sarif" else ci_result.to_json() + from ..install.drift import render_drift_json, render_drift_sarif + + if effective_format == "sarif": + payload = ci_result.to_sarif() + if drift_findings: + payload["runs"][0]["results"].extend(render_drift_sarif(drift_findings)) + else: + payload = ci_result.to_json() + if drift_findings or not no_drift: + payload["drift"] = render_drift_json(drift_findings) + output = _json.dumps(payload, indent=2) if cfg.output_path: Path(cfg.output_path).parent.mkdir(parents=True, exist_ok=True) @@ -504,6 +540,11 @@ def _audit_ci_gate( click.echo(output) else: _render_ci_results(ci_result) + if drift_findings: + from ..install.drift import render_drift_text + + click.echo("") + click.echo(render_drift_text(drift_findings, verbose=cfg.verbose)) sys.exit(0 if ci_result.passed else 1) @@ -514,6 +555,7 @@ def _audit_content_scan( file_path: str | None, strip: bool, dry_run: bool, + no_drift: bool = False, ) -> None: """Handle default ``apm audit`` -- content integrity scanning. @@ -585,6 +627,54 @@ def _audit_content_scan( logger.progress("Nothing to clean -- no strippable characters found") sys.exit(0) + # -- Drift detection (default-on per ADR-02) -------------------- + # Drift only applies to whole-project audit (not --file or --strip + # modes; not single-package scoped). Mutex on no_drift+strip/file + # is enforced earlier via UsageError. + drift_findings: list = [] + drift_failed = False + if ( + not no_drift + and not strip + and not file_path + and not package + and (project_root / "apm.yml").exists() + ): + from ..policy.ci_checks import _check_drift + + lockfile_path = get_lockfile_path(project_root) + if lockfile_path.exists(): + lockfile = LockFile.read(lockfile_path) + if lockfile is not None: + drift_check, drift_findings = _check_drift( + project_root, + lockfile, + cache_only=True, + verbose=cfg.verbose, + ) + drift_failed = not drift_check.passed + # Bare `apm audit` is advisory: drift_failed does not gate + # the exit code (that lives in --ci). But silence on a + # cache-pin / cache-miss failure is a UX trap: the user + # cannot tell whether drift was clean or whether it was + # never attempted. Surface the failure reason on stderr + # whenever the drift check failed without producing + # findings (CacheMissError, CachePinError, missing lockfile). + if drift_failed and not drift_findings: + click.echo( + f"{STATUS_SYMBOLS['warning']} drift check could not run: " + f"{drift_check.message}", + err=True, + ) + elif no_drift and cfg.output_format == "text": + # In structured output (json/sarif), --no-drift is implicit from + # the absence of the drift check entry; no need to pollute output. + click.echo( + f"{STATUS_SYMBOLS['warning']} drift detection skipped (--no-drift); " + "coverage reduced -- hand-edits and missing integrations will not be caught", + err=True, + ) + # -- Display findings -- # Determine exit code first (shared by all formats) if not findings_by_file or not _has_actionable_findings(findings_by_file): @@ -593,6 +683,11 @@ def _audit_content_scan( all_findings = [f for ff in findings_by_file.values() for f in ff] exit_code = 1 if ContentScanner.has_critical(all_findings) else 2 + # Note: bare `apm audit` is advisory for drift; drift findings are + # rendered (text/json/sarif) but DO NOT escalate the exit code. Use + # `apm audit --ci` (handled in _audit_ci_gate) to gate on drift. + _ = drift_failed # retained for symmetry; gate path lives in --ci. + if effective_format == "text": if cfg.output_path: logger.error( @@ -603,6 +698,11 @@ def _audit_content_scan( if findings_by_file: _render_findings_table(findings_by_file, verbose=cfg.verbose) _render_summary(findings_by_file, files_scanned, logger) + if drift_findings: + from ..install.drift import render_drift_text + + click.echo("") + click.echo(render_drift_text(drift_findings, verbose=cfg.verbose)) elif effective_format == "markdown": from ..security.audit_report import findings_to_markdown @@ -717,6 +817,15 @@ def _audit_content_scan( is_flag=True, help="Run all checks even after a failure (default: stop at first failure).", ) +@click.option( + "--no-drift", + "no_drift", + is_flag=True, + help=( + "Skip the install-replay drift check. Reduces coverage; " + "use only for performance-constrained CI loops." + ), +) @click.pass_context def audit( ctx, @@ -732,6 +841,7 @@ def audit( no_cache, no_policy, no_fail_fast, + no_drift, ): """Scan deployed prompt files for hidden Unicode characters. @@ -739,23 +849,31 @@ def audit( prompt, instruction, and rules files. Dangerous and suspicious characters can be removed with --strip. - With --ci, runs lockfile consistency checks instead of content scanning. - This validates that the on-disk state matches what the lockfile declares, - suitable for CI/CD pipeline gates. + By default, also runs install-replay drift detection: catches + hand-edits to deployed files, missing integrations, and orphaned + files vs the lockfile. Use --no-drift to skip (reduces coverage). + + With --ci, runs lockfile consistency checks AND drift in machine- + readable format, suitable for CI/CD pipeline gates. \b Exit codes: - 0 Clean, info-only findings, or successful strip - 1 Critical findings detected (or --ci with violations) - 2 Warning-only findings (suspicious but not critical) + 0 Clean, info-only findings, or drift-only (advisory) in bare + audit, or successful strip + 1 Critical findings detected, or --ci with violations + (including drift in --ci mode) + 2 Warning-only findings (suspicious but not critical), or + usage error (mutually exclusive flags) \b Examples: - apm audit # Scan all installed packages + apm audit # Scan + drift (all checks) apm audit my-package # Scan a specific package - apm audit --file .cursorrules # Scan any file + apm audit --file .cursorrules # Scan any file (no drift) apm audit --strip # Remove dangerous/suspicious chars - apm audit --ci # Lockfile consistency gate + apm audit --no-drift # Skip drift only (escape hatch) + apm audit --ci # CI gate (lockfile + drift) + apm audit --ci --no-drift # CI gate without drift (rare) apm audit --ci --policy org # CI gate with org policy checks apm audit --ci -f json # JSON CI report apm audit --ci -f sarif # SARIF for GitHub Code Scanning @@ -772,6 +890,15 @@ def audit( output_path=output_path, ) + # --no-drift is a different audit mode from --strip / --file (those + # are content-scanning operations unrelated to integration drift). + # Click-native UsageError gives exit code 2 with "Usage:" prefix. + if no_drift and (strip or file_path): + raise click.UsageError( + "--no-drift cannot be combined with --strip or --file " + "(those modes do not run drift detection)" + ) + # -- CI mode: lockfile consistency gate ------------------------- if ci: if verbose: @@ -783,7 +910,7 @@ def audit( logger.error("--ci does not support --format markdown. Use json or sarif.") sys.exit(1) - _audit_ci_gate(cfg, policy_source, no_cache, no_policy, no_fail_fast) + _audit_ci_gate(cfg, policy_source, no_cache, no_policy, no_fail_fast, no_drift) return # _audit_ci_gate calls sys.exit; return guards against fall-through # -- Content scan mode ------------------------------------------ @@ -793,4 +920,4 @@ def audit( "Use 'apm audit --ci --policy ' to run policy checks." ) - _audit_content_scan(cfg, package, file_path, strip, dry_run) + _audit_content_scan(cfg, package, file_path, strip, dry_run, no_drift) diff --git a/src/apm_cli/install/cache_pin.py b/src/apm_cli/install/cache_pin.py new file mode 100644 index 000000000..b267ce948 --- /dev/null +++ b/src/apm_cli/install/cache_pin.py @@ -0,0 +1,233 @@ +"""Cache-pin marker for drift-replay correctness. + +When ``apm install`` populates ``apm_modules///`` from a +specific lockfile pin, it drops a small JSON marker (``.apm-pin``) at +the package root recording the ``resolved_commit`` that produced the +cache contents. + +``apm audit`` drift-replay then verifies the marker matches the +lockfile's ``resolved_commit`` BEFORE diffing. This catches a +critical correctness hazard: + +* Shared CI runners that retain ``apm_modules/`` across builds. +* A teammate updating ``apm.lock.yaml`` (e.g. bumping a pin from X to + Y) without re-running ``apm install``. + +Without the marker, drift-replay would happily diff the new lockfile +against stale cache content and report meaningless or misleading +findings. + +Threat model -- this is "stale-cache detection", NOT cryptographic +integrity. An adversary with write access to ``apm_modules/`` can +always tamper with both the cache content AND the marker. Defending +against active cache tampering requires content-addressed hashes / +signatures, which is deferred. The marker closes the +honest-mistake gap that motivated this work. + +Schema (v1):: + + {"schema_version": 1, "resolved_commit": ""} + +Future schema bumps must keep ``schema_version`` parseable so older +clients can fail closed with a clear message. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from apm_cli.deps.lockfile import LockedDependency, LockFile + + +MARKER_FILENAME = ".apm-pin" +SCHEMA_VERSION = 1 + + +class CachePinError(RuntimeError): + """Raised when the cache pin is missing, malformed, or stale. + + The orchestrator (drift replay) catches this and translates it to + a ``CacheMissError`` with the same message, so the user-facing + advice ("run apm install") is consistent across every cache + inconsistency reason. + """ + + +def write_marker(install_path: Path, resolved_commit: str) -> None: + """Write the cache-pin marker file to ``install_path``. + + Idempotent: overwrites any prior marker (callers may invoke this + on every install, regardless of whether the lockfile YAML changed, + to self-heal caches that pre-date the marker contract). + + Failures are silent at the filesystem layer because they are + non-fatal for ``apm install`` itself -- a missing or unwritable + marker is detected and surfaced by the drift-replay verify step. + The caller (``LockfileBuilder``) cannot stop a successful install + just because we could not annotate its cache. + """ + if not install_path.exists() or not install_path.is_dir(): + return + payload = { + "schema_version": SCHEMA_VERSION, + "resolved_commit": resolved_commit, + } + marker = install_path / MARKER_FILENAME + try: + marker.write_text(json.dumps(payload), encoding="utf-8") + except OSError: + # Read-only mount, permission denied, etc. Drift will surface + # this on the next audit; install must not fail. + return + + +def verify_marker(install_path: Path, expected_commit: str) -> None: + """Verify the marker at ``install_path`` matches ``expected_commit``. + + Raises + ------ + CachePinError + On any of: marker file absent, unreadable, malformed JSON, + unsupported ``schema_version``, missing ``resolved_commit`` + field, or commit mismatch. The exception message is + user-facing -- it is rendered verbatim by ``apm audit``. + """ + marker = install_path / MARKER_FILENAME + if not marker.exists(): + raise CachePinError( + f"cache pin marker missing at {marker} -- cache pre-dates supply-chain hardening" + ) + try: + raw = marker.read_text(encoding="utf-8") + except OSError as exc: + raise CachePinError(f"cache pin marker unreadable at {marker}: {exc}") from exc + try: + payload = json.loads(raw) + except json.JSONDecodeError as exc: + raise CachePinError(f"cache pin marker at {marker} is not valid JSON: {exc}") from exc + if not isinstance(payload, dict): + raise CachePinError( + f"cache pin marker at {marker} must be a JSON object, got {type(payload).__name__}" + ) + schema = payload.get("schema_version") + if schema != SCHEMA_VERSION: + raise CachePinError( + f"cache pin marker at {marker} has unsupported schema_version " + f"{schema!r}; expected {SCHEMA_VERSION}" + ) + actual = payload.get("resolved_commit") + if not isinstance(actual, str) or not actual: + raise CachePinError(f"cache pin marker at {marker} is missing resolved_commit") + if actual != expected_commit: + raise CachePinError( + f"cache pin mismatch at {marker}: " + f"marker says {actual!r}, lockfile expects {expected_commit!r}" + ) + + +def find_unpinned_remote_deps(lockfile: LockFile) -> list[str]: + """Return repo_url for every remote dep that lacks a ``resolved_commit``. + + A remote dep without a pinned commit is a supply-chain weakness: + + * The cache could come from any commit on the referenced branch/tag. + * Drift replay cannot verify the cache matches the lockfile. + * No marker can be written, so future drift runs cannot fail-closed. + + Callers (``sync_markers_for_lockfile`` at install, ``_materialize_install_path`` + at audit) should not silently no-op these entries -- they should + warn on install and fail-closed on audit. + """ + unpinned: list[str] = [] + for _key, dep in lockfile.dependencies.items(): + if getattr(dep, "source", None) == "local": + continue + commit = getattr(dep, "resolved_commit", None) + if not isinstance(commit, str) or not commit: + unpinned.append(getattr(dep, "repo_url", "") or _key) + return unpinned + + +def sync_markers_for_lockfile( + lockfile: LockFile, + project_root: Path, + apm_modules_dir: Path, +) -> int: + """Write a marker for every remote dep in ``lockfile`` that has a cached install. + + Self-healing: this is called unconditionally by ``LockfileBuilder`` + after ``_write_if_changed`` so that caches which pre-date the + marker contract get marked on the next ``apm install`` even when + the lockfile YAML itself does not need to be rewritten. + + Side-effect: any remote dep that lacks a ``resolved_commit`` produces + a single stderr warning (one per dep). These deps cannot participate + in stale-cache verification, which is a supply-chain weakness worth + surfacing -- silent no-op would let unpinned refs sneak past audit. + + Returns the count of markers written (useful for verbose logging + and for tests). + """ + unpinned = find_unpinned_remote_deps(lockfile) + if unpinned: + from apm_cli.utils.console import _rich_warning + + for repo in unpinned: + _rich_warning( + f"cache-pin: remote dep {repo!r} has no resolved_commit; " + "drift cannot verify its cache freshness. Re-run 'apm install' " + "with a pinned ref (commit, tag, or specific branch HEAD)." + ) + + written = 0 + for dep_key, dep in lockfile.dependencies.items(): # noqa: B007 + if not _is_markable(dep): + continue + try: + install_path = _resolve_install_path(dep, project_root, apm_modules_dir) + except Exception: # noqa: S112 -- best-effort marker sync, see module docstring + continue + if install_path is None or not install_path.exists(): + continue + # dep.resolved_commit is non-None per _is_markable. + write_marker(install_path, dep.resolved_commit) # type: ignore[arg-type] + written += 1 + return written + + +def _is_markable(dep: LockedDependency) -> bool: + """A dep is markable when it has a deterministic remote pin we can verify.""" + if getattr(dep, "source", None) == "local": + return False + commit = getattr(dep, "resolved_commit", None) + return isinstance(commit, str) and bool(commit) + + +def _resolve_install_path( + dep: LockedDependency, + project_root: Path, + apm_modules_dir: Path, +) -> Path | None: + """Resolve the on-disk install path for a remote dep without raising.""" + try: + dep_ref = dep.to_dependency_ref() + except Exception: + return None + try: + return dep_ref.get_install_path(apm_modules_dir) + except Exception: + return None + + +__all__ = [ + "MARKER_FILENAME", + "SCHEMA_VERSION", + "CachePinError", + "find_unpinned_remote_deps", + "sync_markers_for_lockfile", + "verify_marker", + "write_marker", +] diff --git a/src/apm_cli/install/drift.py b/src/apm_cli/install/drift.py new file mode 100644 index 000000000..25d75007e --- /dev/null +++ b/src/apm_cli/install/drift.py @@ -0,0 +1,731 @@ +"""Drift-detection replay engine for ``apm audit --check drift``. + +Reproduces the integration step from the lockfile in an isolated scratch +directory, then diffs the resulting tree against the working project to +surface three kinds of divergence: + +* ``modified`` -- a tracked deployed file's content differs. +* ``unintegrated`` -- a tracked deployed file is missing from the project. +* ``orphaned`` -- a managed-directory file exists in the project but + is not present in the scratch replay AND not tracked in the lockfile. + +The replay is **cache-only** in v1 (no network): cached package contents +under ``apm_modules/`` are the source of truth. A miss is reported as a +check error rather than auto-fetched. + +Design constraints (see ``WIP/drift/06-final-plan.md``): +* Pure read-only against the project tree -- writes go to the scratch + directory only. ``ensure_path_within`` guards every write redirection. +* ASCII-only console output (Windows cp1252 safety). +* Normalization strips line-ending differences, BOMs, and the APM + ``Build ID`` header that legitimately changes on every recompile. +""" + +from __future__ import annotations + +import atexit +import json +import shutil +import tempfile +import tracemalloc +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING + +import click + +from apm_cli.core.command_logger import CommandLogger +from apm_cli.utils.console import STATUS_SYMBOLS +from apm_cli.utils.guards import _ReadOnlyProjectGuard + +if TYPE_CHECKING: + from apm_cli.deps.lockfile import LockedDependency, LockFile + + +# --------------------------------------------------------------------------- +# Public dataclasses +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class ReplayConfig: + """Locked configuration for a drift replay run. + + Frozen so callers cannot mutate it mid-replay -- any change requires + a new instance, which keeps the contract auditable. + """ + + project_root: Path + lockfile_path: Path + targets: frozenset[str] | None = None + cache_only: bool = True + no_hooks: bool = True + parallel_downloads: int = 1 + + +@dataclass(frozen=True) +class DriftFinding: + """A single divergence between the replay scratch tree and the project.""" + + path: str + kind: str # one of "modified" | "unintegrated" | "orphaned" + package: str = "" + inline_diff: str = "" + + +# --------------------------------------------------------------------------- +# Errors +# --------------------------------------------------------------------------- + + +class CacheMissError(RuntimeError): + """Raised when ``cache_only=True`` but a package is not in the cache.""" + + +# --------------------------------------------------------------------------- +# Normalization helpers (operate on bytes; bytes-in / bytes-out) +# +# Re-exported from ``apm_cli.utils.normalization`` so existing callers and +# tests that import ``_strip_build_id`` / ``_normalize`` from this module +# keep working. The implementation lives in ``utils/`` so future callers +# (policy linters, content-scan helpers) can reuse it without importing +# the drift module. +# --------------------------------------------------------------------------- + +from apm_cli.utils.normalization import ( # noqa: E402, F401 -- re-exported for back-compat + _BOM, + _BUILD_ID_PATTERN, + _normalize, + _normalize_line_endings, + _strip_bom, + _strip_build_id, +) + +# --------------------------------------------------------------------------- +# Scratch directory lifecycle +# --------------------------------------------------------------------------- + + +def _assert_scratch_bound(project_root: Path, scratch_root: Path) -> None: + """Defense-in-depth: a scratch dir must NOT live inside the project tree. + + Prevents the replay engine from accidentally writing into the live + project (which would defeat the read-only contract). + """ + project_root = project_root.resolve() + scratch_root = scratch_root.resolve() + try: + scratch_root.relative_to(project_root) + except ValueError: + return + raise RuntimeError( + f"drift scratch dir {scratch_root!s} is inside project tree " + f"{project_root!s}; refusing to proceed" + ) + + +def _make_scratch_root(project_root: Path) -> Path: + """Allocate a scratch dir outside the project tree, with atexit cleanup.""" + scratch = Path(tempfile.mkdtemp(prefix="apm_drift_")) + _assert_scratch_bound(project_root, scratch) + + def _cleanup() -> None: + try: + shutil.rmtree(scratch, ignore_errors=False) + except OSError as exc: + click.echo( + f"{STATUS_SYMBOLS['warning']} failed to clean drift scratch dir {scratch}: {exc}", + err=True, + ) + + atexit.register(_cleanup) + return scratch + + +# --------------------------------------------------------------------------- +# Stderr-only logger for audit phases (CommandLogger writes to stdout) +# --------------------------------------------------------------------------- + + +class CheckLogger(CommandLogger): + """CheckLogger emits drift phase markers to stderr. + + ``CommandLogger._rich_*`` writes to stdout (intended for human + install output). Audit/drift output must stay on stderr so that + machine-parseable JSON/SARIF on stdout is never polluted. + """ + + def __init__(self, verbose: bool = False) -> None: + super().__init__("audit-drift", verbose=verbose) + + def _emit(self, symbol_key: str, msg: str) -> None: + click.echo(f"{STATUS_SYMBOLS[symbol_key]} {msg}", err=True) + + def replay_start(self) -> None: + self._emit("running", "Replaying install (cache-only)...") + + def scratch_root(self, path: Path) -> None: + """Verbose-only: announce the scratch tmpdir to stderr. + + Stays on stderr so JSON/SARIF stdout payloads remain + machine-parseable. Self-gates on ``self.verbose`` so the + normal-mode user never sees it. + """ + if not self.verbose: + return + click.echo( + f"{STATUS_SYMBOLS['info']} drift scratch root: {path}", + err=True, + ) + + def diff_start(self) -> None: + self._emit("running", "Diffing scratch vs working tree...") + + def replay_complete(self, n: int) -> None: + self._emit("check", f"Replayed {n} package(s)") + + def clean(self) -> None: + self._emit("check", "No drift detected") + + def findings(self, n: int) -> None: + self._emit("warning", f"Drift detected: {n} file(s)") + + +# --------------------------------------------------------------------------- +# Package materialization (cache-only) +# --------------------------------------------------------------------------- + + +def _materialize_install_path( + lock_dep: LockedDependency, + project_root: Path, + apm_modules_dir: Path, + cache_only: bool, +) -> Path: + """Resolve the on-disk path for a locked dep's package contents. + + For local deps -- contents live at ``project_root / lock_dep.local_path``. + For remote deps -- contents live at the canonical apm_modules subpath. + + Raises + ------ + CacheMissError + If ``cache_only`` is True and the path does not exist. + NotImplementedError + If ``cache_only`` is False (network-enabled replay is a follow-up). + """ + if not cache_only: + raise NotImplementedError("--no-cache replay requires auth wiring; tracked in follow-up") + + if lock_dep.source == "local": + if not lock_dep.local_path: + raise CacheMissError(f"local dep {lock_dep.repo_url!r} has no local_path in lockfile") + candidate = (project_root / lock_dep.local_path).resolve() + if not candidate.exists(): + raise CacheMissError( + f"local source missing for {lock_dep.local_path!r}: expected {candidate}" + ) + return candidate + + dep_ref = lock_dep.to_dependency_ref() + candidate = dep_ref.get_install_path(apm_modules_dir) + # Supply-chain fail-closed: a remote dep without a resolved_commit is + # unverifiable -- there is no marker we can write at install time and + # no commit we can compare at audit time. Refuse to replay it rather + # than silently trust whatever happens to live in the cache. + if getattr(lock_dep, "source", None) != "local" and not lock_dep.resolved_commit: + raise CacheMissError( + f"cannot replay {lock_dep.repo_url}: lockfile entry has no resolved_commit " + "(cache freshness unverifiable). Re-run 'apm install' with a pinned ref " + "(commit, tag, or specific branch HEAD) before audit." + ) + if not candidate.exists(): + raise CacheMissError( + f"cache miss for {lock_dep.repo_url}@{lock_dep.resolved_commit}: " + f"expected {candidate}; run 'apm install' to populate the cache" + ) + # Stale-cache detection: verify the cache pin marker matches the + # lockfile's resolved_commit. Catches the "teammate bumped the + # lockfile, didn't reinstall" + "shared CI runner reused stale + # apm_modules" scenarios. Not defense against active tampering. + if lock_dep.resolved_commit: + from apm_cli.install.cache_pin import CachePinError, verify_marker + + try: + verify_marker(candidate, lock_dep.resolved_commit) + except CachePinError as exc: + raise CacheMissError(f"{exc}; run 'apm install' to refresh apm_modules cache") from exc + return candidate + + +def _build_package_info( + lock_dep: LockedDependency, + install_path: Path, +): + """Construct a real ``PackageInfo`` for the integrators. + + Loads ``apm.yml`` when present so integrators that read + ``package_info.package.name`` see the right package identity. + """ + from apm_cli.models.apm_package import ( + APMPackage, + GitReferenceType, + PackageInfo, + ResolvedReference, + ) + from apm_cli.models.validation import detect_package_type + + apm_yml = install_path / "apm.yml" + if apm_yml.exists(): + try: + pkg = APMPackage.from_apm_yml(apm_yml, source_path=install_path) + except Exception: + pkg = APMPackage( + name=install_path.name, + version=lock_dep.version or "unknown", + package_path=install_path, + source=lock_dep.repo_url, + ) + if not pkg.source: + pkg.source = lock_dep.repo_url + else: + pkg = APMPackage( + name=install_path.name, + version=lock_dep.version or "unknown", + package_path=install_path, + source=lock_dep.repo_url, + ) + + resolved_ref = ResolvedReference( + original_ref=lock_dep.resolved_ref or "locked", + ref_type=GitReferenceType.BRANCH, + resolved_commit=lock_dep.resolved_commit or "locked", + ref_name=lock_dep.resolved_ref or "locked", + ) + + info = PackageInfo( + package=pkg, + install_path=install_path, + resolved_reference=resolved_ref, + dependency_ref=lock_dep.to_dependency_ref(), + ) + try: + pkg_type, _ = detect_package_type(install_path) + info.package_type = pkg_type + except Exception: + info.package_type = None + return info + + +# --------------------------------------------------------------------------- +# Replay orchestrator +# --------------------------------------------------------------------------- + + +def _make_integrators(): + """Build a fresh integrator set for one replay run. + + Mirrors ``apm_cli.install.phases.targets:208-215`` so the replay + behaves identically to a real ``apm install --integrate``. + """ + from apm_cli.integration.agent_integrator import AgentIntegrator + from apm_cli.integration.command_integrator import CommandIntegrator + from apm_cli.integration.hook_integrator import HookIntegrator + from apm_cli.integration.instruction_integrator import InstructionIntegrator + from apm_cli.integration.prompt_integrator import PromptIntegrator + from apm_cli.integration.skill_integrator import SkillIntegrator + + return { + "prompt": PromptIntegrator(), + "agent": AgentIntegrator(), + "skill": SkillIntegrator(), + "command": CommandIntegrator(), + "hook": HookIntegrator(), + "instruction": InstructionIntegrator(), + } + + +def _filter_targets(all_targets, names: frozenset[str] | None): + """Restrict resolved targets to the explicit allowlist when provided.""" + if not names: + return all_targets + return [t for t in all_targets if t.name in names] + + +def _read_apm_yml_target(project_root: Path): + """Return the ``target:`` field from ``apm.yml`` if present, else ``None``. + + This lets ``run_replay`` reproduce the SAME target set the install + pipeline used, instead of falling back to directory auto-detection + that misses targets whose deployment directories are still empty. + """ + apm_yml = project_root / "apm.yml" + if not apm_yml.exists(): + return None + try: + import yaml as _yaml # local import: drift module avoids top-level yaml dep + + data = _yaml.safe_load(apm_yml.read_text(encoding="utf-8")) or {} + except Exception: + # Manifest unreadable / corrupt: fall back to auto-detect rather + # than crashing the replay; the caller still surfaces a useful + # error elsewhere if the project is truly broken. + return None + raw = data.get("target") + if raw is None: + return None + try: + from apm_cli.core.target_detection import parse_target_field + + return parse_target_field(raw, source_path=apm_yml) + except Exception: + return None + + +def run_replay(config: ReplayConfig, logger: CheckLogger) -> Path: + """Execute the cache-only replay and return the populated scratch dir. + + The scratch directory is registered for atexit cleanup so callers do + not need to manage its lifetime. + + Raises + ------ + CacheMissError + Surfaced verbatim when a locked dep is not in the cache. + """ + from apm_cli.deps.lockfile import _SELF_KEY, LockFile + from apm_cli.install.services import integrate_package_primitives + from apm_cli.integration.targets import resolve_targets + from apm_cli.utils.diagnostics import DiagnosticCollector + + if not config.lockfile_path.exists(): + raise CacheMissError( + f"lockfile not found at {config.lockfile_path}; run 'apm install' to generate it" + ) + + lock = LockFile.read(config.lockfile_path) + if lock is None: + raise CacheMissError(f"lockfile at {config.lockfile_path} is empty or unreadable") + + project_root = config.project_root.resolve() + scratch_root = _make_scratch_root(project_root) + logger.scratch_root(scratch_root) + apm_modules_dir = project_root / "apm_modules" + + # Honor apm.yml's ``target:`` field so multi-target projects replay + # into all governed roots (not just whichever directory happens to + # already exist via auto-detection). Without this, a project that + # targets ``copilot,claude,cursor`` would replay only the primary + # auto-detected target and report the others as ``orphaned``. + explicit_target = _read_apm_yml_target(project_root) + all_targets = resolve_targets(project_root, explicit_target=explicit_target) + targets = _filter_targets(all_targets, config.targets) + + diagnostics = DiagnosticCollector(verbose=logger.verbose) + integrators = _make_integrators() + + # Defense-in-depth: snapshot every file under a governed root and + # under apm.lock.yaml, then assert no mutation on exit. The primary + # write-redirect is ``scratch_root=scratch_root`` threaded into every + # integrator; this guard catches accidental direct-path writes that + # bypass the redirect (e.g. an integrator that hard-codes + # ``project_root / target.root_dir``). See guards.py for semantics. + governed = _governed_root_dirs(targets) + protected_subpaths = [*sorted(governed), "apm.lock.yaml", "AGENTS.md"] + + snapshot_started = False + if logger.verbose: + try: + tracemalloc.start() + snapshot_started = True + except RuntimeError: + snapshot_started = False + + logger.replay_start() + replayed_count = 0 + try: + with _ReadOnlyProjectGuard(project_root, protected_subpaths): + for lock_dep in lock.get_all_dependencies(): + if lock_dep.local_path == _SELF_KEY: + # Synthesized self-entry: project's own local content. + # Re-integrate from project_root itself. + install_path = project_root + else: + install_path = _materialize_install_path( + lock_dep, + project_root, + apm_modules_dir, + cache_only=config.cache_only, + ) + + package_info = _build_package_info(lock_dep, install_path) + dep_key = lock_dep.get_unique_key() + + integrate_package_primitives( + package_info, + scratch_root, + targets=targets, + prompt_integrator=integrators["prompt"], + agent_integrator=integrators["agent"], + skill_integrator=integrators["skill"], + instruction_integrator=integrators["instruction"], + command_integrator=integrators["command"], + hook_integrator=integrators["hook"], + force=True, + managed_files=set(), + diagnostics=diagnostics, + package_name=dep_key, + logger=None, + scope=None, + skill_subset=None, + ctx=None, + scratch_root=scratch_root, + ) + replayed_count += 1 + finally: + if snapshot_started: + try: + _, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + click.echo( + f"{STATUS_SYMBOLS['info']} drift replay peak memory: " + f"{peak / (1024 * 1024):.2f} MB", + err=True, + ) + except RuntimeError: + pass + + logger.replay_complete(replayed_count) + return scratch_root + + +# --------------------------------------------------------------------------- +# Diff engine +# --------------------------------------------------------------------------- + +_INLINE_DIFF_BYTE_CAP = 100 * 1024 # 100 KB + + +def _governed_root_dirs(targets) -> set[str]: + """Return the set of top-level managed directory names to walk.""" + roots: set[str] = {".apm"} + for t in targets or []: + root = getattr(t, "root_dir", None) + if root: + roots.add(str(root).split("/", 1)[0]) + return roots + + +def _walk_managed(root: Path, governed_roots: set[str]) -> dict[str, Path]: + """Return a mapping of project-relative posix paths to absolute paths.""" + out: dict[str, Path] = {} + if not root.exists(): + return out + for top in governed_roots: + base = root / top + if not base.exists(): + continue + if base.is_file(): + out[top] = base + continue + for p in base.rglob("*"): + if p.is_file(): + rel = p.relative_to(root).as_posix() + out[rel] = p + # AGENTS.md is a flat top-level file in some target layouts. + agents_md = root / "AGENTS.md" + if agents_md.is_file(): + out["AGENTS.md"] = agents_md + return out + + +def _collect_tracked_files(lockfile: LockFile) -> dict[str, str]: + """Return ``{deployed_path: package_name}`` aggregating all sources.""" + tracked: dict[str, str] = {} + for key, dep in lockfile.dependencies.items(): + for path in dep.deployed_files or []: + tracked.setdefault(path, key) + for path in lockfile.local_deployed_files or []: + tracked.setdefault(path, ".") + return tracked + + +def _inline_diff_for(scratch_path: Path, project_path: Path) -> str: + """Build an inline diff hint, capped to keep findings compact.""" + try: + s_size = scratch_path.stat().st_size + p_size = project_path.stat().st_size + except OSError: + return "" + if s_size > _INLINE_DIFF_BYTE_CAP or p_size > _INLINE_DIFF_BYTE_CAP: + return "(file too large for inline diff; use 'git diff --no-index' to compare)" + return "" + + +def diff_scratch_against_project( + scratch_root: Path, + project_root: Path, + lockfile: LockFile, + targets, +) -> list[DriftFinding]: + """Compare the replay scratch tree against the project tree. + + Three kinds of findings are emitted: + + * ``modified`` -- file exists in both, normalized content differs. + * ``unintegrated`` -- file exists in scratch but not in project. + * ``orphaned`` -- file exists in project + tracked in lockfile + ``deployed_files`` but no longer in scratch. + + Untracked extra files in governed directories are intentionally + ignored to avoid false positives from user-authored content. + """ + scratch_root = scratch_root.resolve() + project_root = project_root.resolve() + governed = _governed_root_dirs(targets) + scratch_files = _walk_managed(scratch_root, governed) + project_files = _walk_managed(project_root, governed) + tracked = _collect_tracked_files(lockfile) + + findings: list[DriftFinding] = [] + + for rel, scratch_path in sorted(scratch_files.items()): + project_path = project_files.get(rel) + if project_path is None: + findings.append( + DriftFinding( + path=rel, + kind="unintegrated", + package=tracked.get(rel, ""), + ) + ) + continue + try: + s_bytes = _normalize(scratch_path.read_bytes()) + p_bytes = _normalize(project_path.read_bytes()) + except OSError as exc: + findings.append( + DriftFinding( + path=rel, + kind="modified", + package=tracked.get(rel, ""), + inline_diff=f"(read error: {exc})", + ) + ) + continue + if s_bytes != p_bytes: + findings.append( + DriftFinding( + path=rel, + kind="modified", + package=tracked.get(rel, ""), + inline_diff=_inline_diff_for(scratch_path, project_path), + ) + ) + + for rel in sorted(project_files.keys()): + if rel in scratch_files: + continue + if rel in tracked: + findings.append( + DriftFinding( + path=rel, + kind="orphaned", + package=tracked.get(rel, ""), + ) + ) + # else: untracked governed file -- ignore (user authored). + + return findings + + +# --------------------------------------------------------------------------- +# Renderers +# --------------------------------------------------------------------------- + + +def render_drift_text(findings: list[DriftFinding], verbose: bool = False) -> str: + """Human-readable text rendering grouped by kind.""" + if not findings: + return f"{STATUS_SYMBOLS['check']} No drift detected" + + lines: list[str] = [ + f"{STATUS_SYMBOLS['warning']} Drift detected: {len(findings)} file(s)", + "", + ] + by_kind: dict[str, list[DriftFinding]] = {} + for f in findings: + by_kind.setdefault(f.kind, []).append(f) + + for kind in ("modified", "unintegrated", "orphaned"): + items = by_kind.get(kind, []) + if not items: + continue + lines.append(f" {kind} ({len(items)}):") + for item in items: + suffix = f" [{item.package}]" if item.package else "" + lines.append(f" - {item.path}{suffix}") + if verbose and item.inline_diff: + lines.append(f" {item.inline_diff}") + lines.append("") + + lines.append( + f" {STATUS_SYMBOLS['info']} Run 'apm install' to re-sync deployed files with the lockfile." + ) + + return "\n".join(lines).rstrip() + "\n" + + +def render_drift_json(findings: list[DriftFinding]) -> dict: + """Machine-readable JSON shape: ``{\"drift\": [...]}``.""" + return { + "drift": [ + { + "path": f.path, + "kind": f.kind, + "package": f.package, + "inline_diff": f.inline_diff, + } + for f in findings + ] + } + + +def render_drift_sarif(findings: list[DriftFinding]) -> list[dict]: + """SARIF ``results`` array; rule IDs use ``apm/drift/``.""" + results: list[dict] = [] + for f in findings: + results.append( + { + "ruleId": f"apm/drift/{f.kind}", + "level": "warning" if f.kind != "modified" else "error", + "message": {"text": f"drift ({f.kind}): {f.path}"}, + "locations": [ + { + "physicalLocation": { + "artifactLocation": {"uri": f.path}, + } + } + ], + "properties": {"package": f.package}, + } + ) + return results + + +# --------------------------------------------------------------------------- +# CLI helper -- intentionally minimal so commands/audit.py can re-use it. +# --------------------------------------------------------------------------- + + +def render_drift( + findings: list[DriftFinding], + fmt: str = "text", + verbose: bool = False, +) -> str: + """Single rendering entrypoint for callers that pick a format string.""" + if fmt == "json": + return json.dumps(render_drift_json(findings), indent=2) + if fmt == "sarif": + return json.dumps({"results": render_drift_sarif(findings)}, indent=2) + return render_drift_text(findings, verbose=verbose) diff --git a/src/apm_cli/install/phases/lockfile.py b/src/apm_cli/install/phases/lockfile.py index 1ff650176..c58f9d295 100644 --- a/src/apm_cli/install/phases/lockfile.py +++ b/src/apm_cli/install/phases/lockfile.py @@ -67,6 +67,12 @@ def __init__(self, ctx: InstallContext) -> None: def build_and_save(self) -> None: """Assemble lockfile from ctx state and write it (no-op when nothing was installed).""" if not self.ctx.installed_packages: + # Even with nothing newly installed, a pre-existing + # lockfile may need its cache pin markers refreshed -- + # e.g. user upgraded APM and their cache pre-dates the + # marker contract. Sync best-effort against the on-disk + # lockfile. + self._sync_cache_pin_markers_from_disk() return try: from apm_cli.deps.lockfile import LockFile as _LF @@ -105,6 +111,13 @@ def build_and_save(self) -> None: # Only write when the semantic content has actually changed # (avoids generated_at churn in version control). self._write_if_changed(lockfile, lockfile_path, _LF) + # Self-heal cache pin markers EVERY install, regardless of + # whether the lockfile YAML changed. This unblocks users + # whose caches pre-date the supply-chain hardening (PR + # #1137 follow-up): if their lockfile is already current, + # _write_if_changed is a no-op, but markers must still be + # written so the next `apm audit` drift replay succeeds. + self._sync_cache_pin_markers(lockfile) except Exception as e: self._handle_failure(e) @@ -198,6 +211,50 @@ def _handle_failure(self, e: Exception) -> None: if self.ctx.logger: self.ctx.logger.error(_lock_msg) + def _sync_cache_pin_markers(self, lockfile: LockFile) -> None: + """Write ``.apm-pin`` markers for every cached remote dep. + + Idempotent and best-effort: a missing or unwritable cache + directory is silently skipped at the marker-helper level and + will surface during the next ``apm audit`` drift replay. + Wrapped in a broad except because lockfile assembly success + must not be undone by a marker write failure. + """ + try: + from apm_cli.install.cache_pin import sync_markers_for_lockfile + + apm_modules_dir = self.ctx.apm_modules_dir + if apm_modules_dir is None: + return + written = sync_markers_for_lockfile(lockfile, self.ctx.project_root, apm_modules_dir) + if self.ctx.logger and written: + self.ctx.logger.verbose_detail( + f"Wrote {written} cache pin marker(s) for drift replay" + ) + except Exception as exc: + if self.ctx.logger: + self.ctx.logger.verbose_detail(f"Cache pin marker sync skipped: {exc}") + + def _sync_cache_pin_markers_from_disk(self) -> None: + """Self-heal markers from the on-disk lockfile when no install ran. + + This handles the upgrade path: user installed an older APM, + runs the new APM with no manifest changes, expects the next + ``apm audit`` to find every remote dep correctly marked. + """ + try: + from apm_cli.deps.lockfile import LockFile as _LF + from apm_cli.deps.lockfile import get_lockfile_path + + lockfile_path = get_lockfile_path(self.ctx.apm_dir) + if not lockfile_path.exists(): + return + lockfile = _LF.load_or_create(lockfile_path) + self._sync_cache_pin_markers(lockfile) + except Exception as exc: + if self.ctx.logger: + self.ctx.logger.verbose_detail(f"Cache pin marker self-heal skipped: {exc}") + def compute_deployed_hashes(self, rel_paths) -> dict[str, str]: """Delegate to the module-level canonical implementation.""" return compute_deployed_hashes(rel_paths, self.ctx.project_root) diff --git a/src/apm_cli/install/services.py b/src/apm_cli/install/services.py index 8192ca0fa..d6b8ebb11 100644 --- a/src/apm_cli/install/services.py +++ b/src/apm_cli/install/services.py @@ -93,6 +93,7 @@ def integrate_package_primitives( scope: InstallScope | None = None, skill_subset: tuple | None = None, ctx: InstallContext | None = None, + scratch_root: Path | None = None, ) -> dict: """Run the full integration pipeline for a single package. @@ -130,6 +131,23 @@ def integrate_package_primitives( if not targets: return result + # ------------------------------------------------------------------ + # Drift-replay safety guard (#drift): when ``scratch_root`` is set, + # the caller is replaying integration into an isolated directory. + # We assert it exists and is NOT inside ``project_root`` to keep the + # read-only contract of ``apm audit --check drift`` enforceable. + # The ``project_root`` passed in will already point at ``scratch_root`` + # (so all writes redirect via target.deploy_path), so this check is + # purely defense-in-depth against accidental misuse. + # ------------------------------------------------------------------ + if scratch_root is not None: + from apm_cli.utils.path_security import ensure_path_within + + scratch_root = Path(scratch_root).resolve() + # ``project_root`` is the redirect target; it must equal scratch_root + # OR sit inside it. ensure_path_within(child, parent) raises if not. + ensure_path_within(Path(project_root).resolve(), scratch_root) + # --- Amendment 6: cowork non-skill primitive warning (once per run) --- _cowork_active = any(t.name == "copilot-cowork" for t in targets) if _cowork_active and ctx is not None and not ctx.cowork_nonsupported_warned: diff --git a/src/apm_cli/policy/ci_checks.py b/src/apm_cli/policy/ci_checks.py index 98086a033..7b2ab1222 100644 --- a/src/apm_cli/policy/ci_checks.py +++ b/src/apm_cli/policy/ci_checks.py @@ -14,11 +14,15 @@ import logging from pathlib import Path -from typing import List, Optional # noqa: F401, UP035 +from typing import TYPE_CHECKING, List, Optional, Sequence # noqa: F401, UP035 from ..deps.lockfile import _SELF_KEY from .models import CheckResult, CIAuditResult +if TYPE_CHECKING: + from ..deps.lockfile import LockFile + from ..install.drift import DriftFinding + _logger = logging.getLogger(__name__) @@ -403,6 +407,95 @@ def _check_includes_consent( ) +def _check_drift( + project_root: Path, + lockfile: LockFile, + targets: Sequence[str] | None = None, + cache_only: bool = True, + verbose: bool = False, +) -> tuple[CheckResult, list[DriftFinding]]: + """Replay the install in a scratch dir and diff against the project. + + Returns the standard :class:`CheckResult` PLUS the list of + :class:`DriftFinding` instances so callers can render them in the + output format of their choice (text/json/sarif) without re-running + the replay. + + Cache-only by default: a missing cache entry produces a check + failure rather than a network fetch (audit must be deterministic). + """ + from ..deps.lockfile import get_lockfile_path + from ..install.drift import ( + CacheMissError, + CheckLogger, + ReplayConfig, + diff_scratch_against_project, + run_replay, + ) + from ..integration.targets import resolve_targets + + logger = CheckLogger(verbose=verbose) + config = ReplayConfig( + project_root=project_root, + lockfile_path=get_lockfile_path(project_root), + targets=frozenset(targets) if targets else None, + cache_only=cache_only, + ) + + try: + scratch = run_replay(config, logger) + except CacheMissError as exc: + return ( + CheckResult( + name="drift", + passed=False, + message=( + f"drift replay aborted: {exc}; run 'apm install' to refresh apm_modules cache" + ), + ), + [], + ) + except NotImplementedError as exc: + return ( + CheckResult( + name="drift", + passed=False, + message=f"drift replay unsupported: {exc}", + ), + [], + ) + + logger.diff_start() + resolved_targets = resolve_targets(project_root) + if targets: + resolved_targets = [t for t in resolved_targets if t.name in set(targets)] + findings = diff_scratch_against_project(scratch, project_root, lockfile, resolved_targets) + + if not findings: + logger.clean() + return ( + CheckResult( + name="drift", + passed=True, + message="no drift detected against lockfile", + ), + [], + ) + + logger.findings(len(findings)) + preview = ", ".join(f.path for f in findings[:3]) + suffix = "" if len(findings) <= 3 else f" (+{len(findings) - 3} more)" + return ( + CheckResult( + name="drift", + passed=False, + message=f"drift detected: {len(findings)} file(s): {preview}{suffix}", + details=[f"{f.kind}: {f.path}" for f in findings], + ), + findings, + ) + + # -- Aggregate runner ---------------------------------------------- diff --git a/src/apm_cli/utils/diagnostics.py b/src/apm_cli/utils/diagnostics.py index bc6cfe161..7e8274a5a 100644 --- a/src/apm_cli/utils/diagnostics.py +++ b/src/apm_cli/utils/diagnostics.py @@ -26,12 +26,19 @@ CATEGORY_SECURITY = "security" CATEGORY_POLICY = "policy" CATEGORY_AUTH = "auth" +CATEGORY_DRIFT = "drift" CATEGORY_INFO = "info" +# Drift severities: kinds of divergence from the lockfile-defined state. +DRIFT_MODIFIED = "modified" # tracked file content changed +DRIFT_UNINTEGRATED = "unintegrated" # tracked file missing from project +DRIFT_ORPHANED = "orphaned" # tracked in lockfile but not produced by replay + _CATEGORY_ORDER = [ CATEGORY_SECURITY, CATEGORY_POLICY, CATEGORY_AUTH, + CATEGORY_DRIFT, CATEGORY_COLLISION, CATEGORY_OVERWRITE, CATEGORY_WARNING, @@ -177,6 +184,37 @@ def auth(self, message: str, package: str = "", detail: str = "") -> None: ) ) + def drift( + self, + path: str, + kind: str, + package: str = "", + detail: str = "", + ) -> None: + """Record a drift finding from ``apm audit`` replay. + + Parameters + ---------- + path : str + Project-relative path of the divergent file. + kind : str + One of ``DRIFT_MODIFIED``, ``DRIFT_UNINTEGRATED``, ``DRIFT_ORPHANED``. + package : str + Package name owning the file (best-effort; may be empty for orphans). + detail : str + Optional inline diff or extra context (rendered only in verbose). + """ + with self._lock: + self._diagnostics.append( + Diagnostic( + message=path, + category=CATEGORY_DRIFT, + package=package, + detail=detail, + severity=kind, + ) + ) + # ------------------------------------------------------------------ # Query helpers # ------------------------------------------------------------------ @@ -205,6 +243,11 @@ def policy_count(self) -> int: """Return number of policy diagnostics.""" return sum(1 for d in self._diagnostics if d.category == CATEGORY_POLICY) + @property + def drift_count(self) -> int: + """Return number of drift findings.""" + return sum(1 for d in self._diagnostics if d.category == CATEGORY_DRIFT) + @property def has_critical_security(self) -> bool: """Return True if any critical-severity security finding exists.""" @@ -259,6 +302,8 @@ def render_summary(self) -> None: self._render_policy_group(items) elif cat == CATEGORY_AUTH: self._render_auth_group(items) + elif cat == CATEGORY_DRIFT: + self._render_drift_group(items) elif cat == CATEGORY_COLLISION: self._render_collision_group(items) elif cat == CATEGORY_OVERWRITE: @@ -401,6 +446,34 @@ def _render_info_group(self, items: list[Diagnostic]) -> None: if d.detail and self.verbose: _rich_echo(f" +- {d.detail}", color="dim") + def _render_drift_group(self, items: list[Diagnostic]) -> None: + """Render drift findings: modified / unintegrated / orphaned files. + + Stable section header so machine consumers can grep for it. + Counts shown by kind, then per-file lines with severity-coded markers. + """ + modified = [d for d in items if d.severity == "modified"] + unintegrated = [d for d in items if d.severity == "unintegrated"] + orphaned = [d for d in items if d.severity == "orphaned"] + + total = len(items) + _rich_warning(f" [!] Drift detected: {total} file(s) diverge from lockfile") + + for label, group, marker in ( + ("modified", modified, "M"), + ("unintegrated", unintegrated, "U"), + ("orphaned", orphaned, "O"), + ): + if not group: + continue + _rich_echo(f" {len(group)} {label}:", color="yellow") + for d in group: + pkg_prefix = f"[{d.package}] " if d.package else "" + _rich_echo(f" {marker} {pkg_prefix}{d.message}", color="yellow") + if d.detail and self.verbose: + for line in d.detail.splitlines(): + _rich_echo(f" {line}", color="dim") + def _group_by_package(items: list[Diagnostic]) -> dict[str, list[Diagnostic]]: """Group diagnostics by package, preserving insertion order. diff --git a/src/apm_cli/utils/guards.py b/src/apm_cli/utils/guards.py new file mode 100644 index 000000000..6da1e8b5d --- /dev/null +++ b/src/apm_cli/utils/guards.py @@ -0,0 +1,123 @@ +"""Read-only project-tree guard for drift detection. + +When ``apm audit`` runs the install pipeline against a scratch directory +to compute drift, the working tree must remain untouched. ``_ReadOnlyProjectGuard`` +takes a stat snapshot of every protected path on entry and asserts no +mutation occurred on exit. Any divergence raises ``ProtectedPathMutationError``. + +This is a defense-in-depth check: the primary mechanism is redirecting all +writes via ``project_root=scratch_root``. The guard catches accidental +direct-path writes that bypass the redirection. +""" + +from __future__ import annotations + +import os +from pathlib import Path + + +class ProtectedPathMutationError(RuntimeError): + """Raised when a path under guard was mutated during drift replay.""" + + +def _snapshot(paths: list[Path]) -> dict[Path, tuple[int, int] | None]: + """Capture (mtime_ns, size) for each path, or ``None`` if missing. + + Symlinks are followed; missing paths record ``None`` so they may + legitimately remain absent without triggering the guard. + """ + snap: dict[Path, tuple[int, int] | None] = {} + for p in paths: + try: + st = p.stat() + snap[p] = (st.st_mtime_ns, st.st_size) + except FileNotFoundError: + snap[p] = None + except OSError: + snap[p] = None + return snap + + +def _walk_protected(roots: list[Path]) -> list[Path]: + """Enumerate every regular file under each root (recursive). + + Missing roots are silently dropped. Symlinks are NOT recursed into + to avoid infinite loops. + """ + files: list[Path] = [] + for root in roots: + if not root.exists(): + continue + if root.is_file(): + files.append(root) + continue + for dirpath, _dirnames, filenames in os.walk(root, followlinks=False): + for fn in filenames: + files.append(Path(dirpath) / fn) + return files + + +class _ReadOnlyProjectGuard: + """Context manager that snapshots protected paths and asserts no mutation. + + Usage:: + + with _ReadOnlyProjectGuard(project_root, [".apm", "apm.lock.yaml", ".github"]): + run_replay(...) # writes only to scratch_root, never to project_root + + On exit, every snapshotted file is re-stat'd. Any (mtime, size) divergence + OR a previously-present file going missing OR a previously-missing file + appearing raises ``ProtectedPathMutationError`` listing the offending paths. + + The guard tolerates files that were missing on entry AND missing on exit + (no-op) -- this is important for fresh-repo scenarios where ``.apm/`` may + not exist yet. + """ + + def __init__(self, project_root: Path, protected_subpaths: list[str]) -> None: + self.project_root = project_root.resolve() + self.protected_roots = [self.project_root / sp for sp in protected_subpaths] + self._snapshot: dict[Path, tuple[int, int] | None] = {} + + def __enter__(self) -> _ReadOnlyProjectGuard: + files = _walk_protected(self.protected_roots) + self._snapshot = _snapshot(files) + return self + + def __exit__(self, exc_type, exc, tb) -> None: + # Re-walk in case new files appeared (or files vanished). + current_files = set(_walk_protected(self.protected_roots)) + snapshotted_files = set(self._snapshot.keys()) + + violations: list[str] = [] + + # Newly-appeared files under protected roots are violations. + for new_path in current_files - snapshotted_files: + violations.append(f"created: {new_path}") + + # Snapshotted files that vanished or changed. + for path, prev in self._snapshot.items(): + try: + st = path.stat() + cur = (st.st_mtime_ns, st.st_size) + except FileNotFoundError: + cur = None + except OSError: + cur = None + + if prev is None and cur is None: + continue # missing -> still missing: fine + if prev is None and cur is not None: + violations.append(f"created: {path}") + elif prev is not None and cur is None: + violations.append(f"deleted: {path}") + elif prev != cur: + violations.append(f"modified: {path}") + + if violations and exc_type is None: + # Only raise if no other exception is propagating -- don't mask + # the original error. + raise ProtectedPathMutationError( + "Drift replay mutated protected project paths:\n - " + + "\n - ".join(sorted(violations)) + ) diff --git a/src/apm_cli/utils/normalization.py b/src/apm_cli/utils/normalization.py new file mode 100644 index 000000000..d1c78807c --- /dev/null +++ b/src/apm_cli/utils/normalization.py @@ -0,0 +1,57 @@ +"""Bytes-in / bytes-out content normalization helpers. + +Used by drift-detection (``apm audit``) to compare a deployed file's +on-disk bytes against the replay scratch tree without flagging +legitimate, deterministic differences: + +* Line-ending differences (CRLF vs LF) introduced by editors / VCS. +* UTF-8 BOMs at the start of the file (Windows tool output). +* APM ```` headers that are intentionally + re-stamped on every recompile. + +Kept in ``utils/`` (not ``install/drift.py``) so future callers -- +e.g. policy linters or content-scan helpers -- can reuse the same +normalization without importing the drift module. +""" + +from __future__ import annotations + +import re + +_BUILD_ID_PATTERN = re.compile( + rb"\s*\n?", + re.IGNORECASE, +) +_BOM = b"\xef\xbb\xbf" + + +def _strip_build_id(content: bytes) -> bytes: + """Remove APM ```` headers wherever they appear.""" + return _BUILD_ID_PATTERN.sub(b"", content) + + +def _normalize_line_endings(content: bytes) -> bytes: + """Convert CRLF to LF; leaves bare CR alone (rare, intentional).""" + return content.replace(b"\r\n", b"\n") + + +def _strip_bom(content: bytes) -> bytes: + """Drop a UTF-8 BOM at the start of the file (only at offset 0).""" + if content.startswith(_BOM): + return content[len(_BOM) :] + return content + + +def _normalize(content: bytes) -> bytes: + """Apply all drift-tolerant normalizations to a file's bytes.""" + return _strip_build_id(_normalize_line_endings(_strip_bom(content))) + + +__all__ = [ + "_BOM", + "_BUILD_ID_PATTERN", + "_normalize", + "_normalize_line_endings", + "_strip_bom", + "_strip_build_id", +] diff --git a/tests/integration/test_drift_check.py b/tests/integration/test_drift_check.py new file mode 100644 index 000000000..592ba721c --- /dev/null +++ b/tests/integration/test_drift_check.py @@ -0,0 +1,751 @@ +"""Integration tests for ``apm audit`` drift detection (Phase D). + +Covers: + * Section A -- 9 drift cases (modified / unintegrated / orphaned, plus + normalization-driven false-positive guards for CRLF, BOM, and the + injected ``Build ID`` marker). + * Section B -- regression traps tied to specific past PRs. + * Section C -- edge cases (no lockfile, corrupt lockfile, empty + primitives, untracked governed files). + * Section D -- multi-target projects (copilot + claude). + * Section E -- ``--no-drift`` flag, mutex with ``--strip``/``--file``, + warning routing, and CI JSON / text output shapes. + +Anti-patterns enforced (per matrix Section 10): + * ``write_bytes`` only -- never ``write_text`` (avoids platform line-ending + surprises). + * No URL substring matching (we do not assert on URLs at all here). + * ``catch_exceptions=False`` so Click's exception chaining surfaces. + * ASCII-only assertions (no Unicode box characters or emoji). + * ``CliRunner()`` without ``mix_stderr=`` -- Click 8.x removed that kwarg + and always provides ``result.stdout`` / ``result.stderr`` separately. +""" + +from __future__ import annotations + +import json +from collections.abc import Mapping +from pathlib import Path +from typing import Any + +import pytest +import yaml +from click.testing import CliRunner + +from apm_cli.cli import cli + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +_DEFAULT_INSTRUCTION = ( + b"---\n" + b'applyTo: "**"\n' + b"---\n" + b"# Rules\n" + b"\n" + b"Follow these conventions strictly.\n" + b"Never commit secrets.\n" +) + + +def _make_apm_project( + tmp_path: Path, + *, + name: str = "drift-fixture", + version: str = "1.0.0", + target: str | None = None, + files: Mapping[str, bytes] | None = None, +) -> Path: + """Create a minimal APM project rooted under ``tmp_path``. + + The project has an ``apm.yml`` plus ``.apm/`` source content. After + ``apm install`` runs against it the lockfile records a synthetic + self-entry (``_SELF_KEY``) under ``local_deployed_files`` -- which is + what the drift replay engine walks. + + ``files`` keys are paths *relative to ``.apm/``* (e.g. + ``"instructions/rules.instructions.md"``). + """ + project = tmp_path / name + project.mkdir(parents=True, exist_ok=False) + + manifest: dict[str, Any] = {"name": name, "version": version} + if target is not None: + manifest["target"] = target + (project / "apm.yml").write_bytes(yaml.safe_dump(manifest).encode("utf-8")) + + payload = ( + files + if files is not None + else { + "instructions/rules.instructions.md": _DEFAULT_INSTRUCTION, + } + ) + for rel, content in payload.items(): + dest = project / ".apm" / rel + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(content) + + return project + + +def _install(project: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Run ``apm install`` in ``project``; assert success.""" + monkeypatch.chdir(project) + runner = CliRunner() + result = runner.invoke(cli, ["install"], catch_exceptions=False) + assert result.exit_code == 0, ( + f"apm install failed: exit={result.exit_code}\n" + f"stdout:\n{result.stdout}\nstderr:\n{result.stderr}" + ) + + +def _audit( + project: Path, + monkeypatch: pytest.MonkeyPatch, + *args: str, +) -> Any: + """Run ``apm audit`` and return the Click result.""" + monkeypatch.chdir(project) + runner = CliRunner() + return runner.invoke(cli, ["audit", *args], catch_exceptions=False) + + +def _snapshot_tree(root: Path) -> dict[str, tuple[int, bytes]]: + """Snapshot every file under ``root`` as ``{relpath: (size, sha256-bytes)}``. + + Used to prove the no-write contract -- ``apm audit`` MUST NOT mutate + the working tree. + """ + import hashlib + + snap: dict[str, tuple[int, bytes]] = {} + for path in sorted(root.rglob("*")): + if not path.is_file(): + continue + rel = path.relative_to(root).as_posix() + data = path.read_bytes() + snap[rel] = (len(data), hashlib.sha256(data).digest()) + return snap + + +def _drift_paths(result_stdout: str) -> list[str]: + """Extract drift entry paths from a ``--ci -f json`` payload.""" + payload = json.loads(result_stdout) + drift_section = payload.get("drift") or {} + entries = drift_section.get("drift") or [] + return [entry.get("path", "") for entry in entries] + + +def _drift_kinds(result_stdout: str) -> list[tuple[str, str]]: + """Extract ``(path, kind)`` tuples from a ``--ci -f json`` payload.""" + payload = json.loads(result_stdout) + entries = (payload.get("drift") or {}).get("drift") or [] + return [(e.get("path", ""), e.get("kind", "")) for e in entries] + + +def _checks_by_name(result_stdout: str) -> dict[str, dict[str, Any]]: + payload = json.loads(result_stdout) + return {c["name"]: c for c in payload.get("checks", [])} + + +def _assert_text_safe(result: Any, *needles: str) -> None: + """Assert each needle appears in stdout-or-stderr (text mode).""" + blob = (result.stdout or "") + "\n" + (result.stderr or "") + for needle in needles: + assert needle in blob, ( + f"missing {needle!r}\nstdout:\n{result.stdout}\nstderr:\n{result.stderr}" + ) + + +# --------------------------------------------------------------------------- +# Section A -- 9 drift cases +# --------------------------------------------------------------------------- + + +class TestSectionADriftCases: + """Each test exercises one drift kind or one false-positive guard.""" + + def test_a1_modified_simple_edit_to_deployed_file( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + deployed = project / ".github" / "instructions" / "rules.instructions.md" + assert deployed.exists(), "fixture pre-condition: install deploys to .github/" + deployed.write_bytes(b"# tampered\n") + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 1 + kinds = dict(_drift_kinds(result.stdout)) + assert kinds.get(".github/instructions/rules.instructions.md") == "modified" + + def test_a2_unintegrated_new_source_added_no_install( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + extra = project / ".apm" / "instructions" / "extra.instructions.md" + extra.write_bytes(b'---\napplyTo: "**"\n---\n# Extra\nNew rule, never integrated.\n') + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 1 + kinds = dict(_drift_kinds(result.stdout)) + # The diff engine reports the *output* path that would have been + # produced, not the source path. + assert kinds.get(".github/instructions/extra.instructions.md") == "unintegrated" + + def test_a3_unintegrated_when_deployed_file_deleted( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Deployed file deleted from disk -> scratch still produces it. + + The diff engine reports this as ``unintegrated`` (file present in + scratch, absent from project) rather than ``orphaned`` -- the + orphaned kind is reserved for files in project + lockfile but + absent from the scratch reproduction. + """ + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + deployed = project / ".github" / "instructions" / "rules.instructions.md" + deployed.unlink() + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 1 + kinds = dict(_drift_kinds(result.stdout)) + assert kinds.get(".github/instructions/rules.instructions.md") in { + "unintegrated", + "orphaned", + } + + def test_a4_orphaned_source_removed_but_output_remains( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Source file removed from ``.apm/`` after install. + + Scratch no longer reproduces the integrated output, but the file + still lives on disk and is recorded in the lockfile -- this is + the canonical ``orphaned`` case. + """ + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + source = project / ".apm" / "instructions" / "rules.instructions.md" + source.unlink() + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 1 + kinds = dict(_drift_kinds(result.stdout)) + assert kinds.get(".github/instructions/rules.instructions.md") == "orphaned" + + def test_a5_clean_state_no_drift_after_install( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 0 + checks = _checks_by_name(result.stdout) + assert checks["drift"]["passed"] is True + assert _drift_paths(result.stdout) == [] + + def test_a6_modified_multiple_files_all_reported( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project( + tmp_path, + files={ + "instructions/a.instructions.md": (b'---\napplyTo: "**"\n---\n# A\nfirst.\n'), + "instructions/b.instructions.md": (b'---\napplyTo: "**"\n---\n# B\nsecond.\n'), + }, + ) + _install(project, monkeypatch) + + for name in ("a", "b"): + (project / ".github" / "instructions" / f"{name}.instructions.md").write_bytes( + f"# tampered-{name}\n".encode() + ) + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 1 + paths = sorted(_drift_paths(result.stdout)) + assert paths == [ + ".github/instructions/a.instructions.md", + ".github/instructions/b.instructions.md", + ] + + def test_a7_crlf_only_change_is_not_drift( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Normalization strips line endings before diff.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + deployed = project / ".github" / "instructions" / "rules.instructions.md" + original = deployed.read_bytes() + assert b"\r\n" not in original, "fixture must start LF-only" + deployed.write_bytes(original.replace(b"\n", b"\r\n")) + + # Bare audit (no --ci) -- avoids the unrelated content-integrity + # baseline check which compares raw byte hashes. + result = _audit(project, monkeypatch) + assert result.exit_code == 0, ( + f"CRLF normalization regressed: stdout={result.stdout}\nstderr={result.stderr}" + ) + _assert_text_safe(result, "no issues found") + + def test_a8_bom_only_change_is_not_drift( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + deployed = project / ".github" / "instructions" / "rules.instructions.md" + original = deployed.read_bytes() + deployed.write_bytes(b"\xef\xbb\xbf" + original) + + result = _audit(project, monkeypatch) + assert result.exit_code == 0 + # Drift specifically must report no findings; the content-scan may + # surface an info-level "unusual characters" finding which we + # tolerate explicitly. + assert "Drift detected" not in (result.stdout + result.stderr) + + def test_a9_build_id_only_change_is_not_drift( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Injected ```` lines are stripped pre-diff.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + deployed = project / ".github" / "instructions" / "rules.instructions.md" + original = deployed.read_bytes() + deployed.write_bytes(original + b"\n") + + result = _audit(project, monkeypatch) + assert result.exit_code == 0 + assert "Drift detected" not in (result.stdout + result.stderr) + + +# --------------------------------------------------------------------------- +# Section B -- regression traps for past PRs +# --------------------------------------------------------------------------- + + +class TestSectionBRegressions: + """Behaviours we have explicitly broken before.""" + + def test_b1_self_entry_replays_local_apm_directory( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Regression: drift must walk the synthetic ``_SELF_KEY`` dep. + + Without it the project's own ``.apm/`` content would be + invisible to replay and any drift would silently pass. + """ + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + deployed = project / ".github" / "instructions" / "rules.instructions.md" + deployed.write_bytes(b"# tampered\n") + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 1 + # If the self-entry walk regressed, the drift list would be empty. + assert _drift_paths(result.stdout), ( + "self-entry replay regressed -- drift returned no findings" + ) + + def test_b2_audit_ci_exit_code_propagates_drift_failure( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Regression: ``--ci`` must exit non-zero when drift is found.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered\n" + ) + + result = _audit(project, monkeypatch, "--ci") + assert result.exit_code == 1 + + def test_b3_text_renderer_uses_ascii_status_symbols( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Regression: drift output must stay within printable ASCII.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered\n" + ) + + result = _audit(project, monkeypatch) + # Scope ASCII assertion to drift-specific output lines (the + # baseline audit table uses Rich box-drawing chars that are + # tolerated for that table but forbidden in drift output). + combined = (result.stdout or "") + (result.stderr or "") + for line in combined.splitlines(): + if "Drift detected" not in line and not ("modified" in line and ".github" in line): + continue + for ch in line: + assert ord(ch) < 128, f"non-ASCII char {ch!r} in drift line: {line!r}" + # Strict: drift-specific markers must appear and be ASCII. + for needle in ("[!]", "modified"): + assert needle in combined + + def test_b4_clean_install_does_not_emit_false_drift_warning( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Regression: a clean tree must not log ``Drift detected``.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + result = _audit(project, monkeypatch) + assert result.exit_code == 0 + combined = (result.stdout or "") + (result.stderr or "") + assert "Drift detected" not in combined + + +# --------------------------------------------------------------------------- +# Section C -- edge cases +# --------------------------------------------------------------------------- + + +class TestSectionCEdgeCases: + def test_c1_no_lockfile_bare_audit_skips_silently( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Fresh repo with no lockfile -- bare ``apm audit`` exits 0. + + The current implementation treats ``apm.lock.yaml`` absence as + "nothing to scan" rather than a usage error. This test pins that + behaviour so a future change is intentional. + """ + project = tmp_path / "fresh" + project.mkdir() + (project / "apm.yml").write_bytes(b"name: fresh\nversion: 1.0.0\n") + + result = _audit(project, monkeypatch) + assert result.exit_code == 0 + assert "nothing to scan" in result.stdout.lower() or "no apm.lock" in result.stdout + + def test_c2_no_lockfile_ci_audit_passes_baseline( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """No declared dependencies -> ``--ci`` baseline passes.""" + project = tmp_path / "fresh-ci" + project.mkdir() + (project / "apm.yml").write_bytes(b"name: fresh\nversion: 1.0.0\n") + + result = _audit(project, monkeypatch, "--ci") + assert result.exit_code == 0 + + def test_c3_corrupt_lockfile_yaml_skips_drift( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Bad YAML in lockfile -> ``LockFile.read`` returns ``None``. + + The audit command then skips drift silently. Pinning this so a + future ``raise`` doesn't surprise users. + """ + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + (project / "apm.lock.yaml").write_bytes(b"!!!{not valid yaml: [\n") + + result = _audit(project, monkeypatch) + # Either passes silently (no parseable lockfile -> nothing to + # scan) or emits a warning -- both acceptable, but no traceback. + assert result.exit_code in {0, 1} + assert "Traceback" not in result.stdout + assert "Traceback" not in result.stderr + + def test_c4_empty_apm_directory_no_drift( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Project with no governed primitives -> no drift possible.""" + project = tmp_path / "empty" + project.mkdir() + (project / "apm.yml").write_bytes(b"name: empty\nversion: 1.0.0\n") + # No .apm/ content at all. + + result = _audit(project, monkeypatch, "--ci") + assert result.exit_code == 0 + + def test_c5_untracked_governed_file_silently_ignored( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """A hand-authored file in ``.github/`` that the lockfile does not + track must NOT be reported as drift. + + This is the contract that lets users keep CODEOWNERS, manual + instruction files, and other ungoverned content alongside APM + deploys. + """ + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + manual = project / ".github" / "instructions" / "manual.instructions.md" + manual.write_bytes(b"# hand authored, not from APM\n") + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 0 + paths = _drift_paths(result.stdout) + assert ".github/instructions/manual.instructions.md" not in paths + + def test_c6_apm_audit_makes_no_writes_to_working_tree( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """No-write contract -- audit is read-only.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + # Introduce drift so the replay engine actually does work. + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered\n" + ) + + before = _snapshot_tree(project) + result = _audit(project, monkeypatch, "--ci") + after = _snapshot_tree(project) + assert before == after, ( + "apm audit mutated the working tree (no-write contract violated)\n" + f"exit={result.exit_code}\nstdout={result.stdout[:400]}" + ) + + def test_c7_repeated_audits_are_idempotent( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + first = _audit(project, monkeypatch, "--ci", "-f", "json") + second = _audit(project, monkeypatch, "--ci", "-f", "json") + assert first.exit_code == second.exit_code == 0 + assert _drift_paths(first.stdout) == _drift_paths(second.stdout) == [] + + +# --------------------------------------------------------------------------- +# Section D -- multi-target projects +# --------------------------------------------------------------------------- + + +class TestSectionDMultiTarget: + """Multi-target apm.yml support. + + Pin: ``apm install`` honours apm.yml's ``target: a,b`` and writes to + every named target's root_dir. The drift replay engine, however, + currently re-integrates per-package using only the first/auto-detected + target. Secondary targets (``.claude`` etc.) therefore appear as + ``orphaned`` -- their on-disk files exist but the scratch reproduction + does not produce them. These tests pin that observed behaviour so a + future fix surfaces as an intentional change rather than silent drift. + """ + + def test_d1_multi_target_install_creates_all_target_outputs( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path, target="copilot,claude") + _install(project, monkeypatch) + # Sanity: both targets received the integrated file. + assert (project / ".github" / "instructions" / "rules.instructions.md").exists() + assert (project / ".claude" / "rules" / "rules.md").exists() + + def test_d2_multi_target_drift_reports_secondary_target_files( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Secondary target files are visible to the drift engine. + + Whether reported as ``orphaned`` (current replay limitation) or + ``modified`` (after a future fix), the secondary target's + deployed file MUST appear in the drift findings if it is + tampered with -- it must never be silently ignored. + """ + project = _make_apm_project(tmp_path, target="copilot,claude") + _install(project, monkeypatch) + (project / ".claude" / "rules" / "rules.md").write_bytes(b"# tampered\n") + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 1 + kinds = dict(_drift_kinds(result.stdout)) + assert ".claude/rules/rules.md" in kinds, ( + f"secondary target was silently ignored: kinds={kinds}" + ) + # Acceptable kinds today: orphaned (replay missed it) or modified + # (replay reproduced it and content differs). Both prove the + # secondary target is not invisible. + assert kinds[".claude/rules/rules.md"] in {"modified", "orphaned"} + + def test_d3_multi_target_primary_target_drift_detected( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Primary (auto-detected) target drift is always reported.""" + project = _make_apm_project(tmp_path, target="copilot,claude") + _install(project, monkeypatch) + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered-copilot\n" + ) + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + assert result.exit_code == 1 + kinds = dict(_drift_kinds(result.stdout)) + assert kinds.get(".github/instructions/rules.instructions.md") == "modified" + + +# --------------------------------------------------------------------------- +# Section E -- --no-drift opt-out and CLI surface +# --------------------------------------------------------------------------- + + +class TestSectionENoDriftFlag: + def test_e1_no_drift_with_strip_is_usage_error( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + result = _audit(project, monkeypatch, "--no-drift", "--strip") + assert result.exit_code == 2 + assert "no-drift" in result.stderr.lower() + assert "strip" in result.stderr.lower() + + def test_e2_no_drift_with_file_is_usage_error( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + result = _audit( + project, + monkeypatch, + "--no-drift", + "--file", + ".github/instructions/rules.instructions.md", + ) + assert result.exit_code == 2 + assert "no-drift" in result.stderr.lower() + + def test_e3_no_drift_warning_routed_to_stderr_text_mode( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """The warning must appear on stderr (not stdout) in text mode.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + result = _audit(project, monkeypatch, "--no-drift") + assert result.exit_code == 0 + assert "drift detection skipped" in result.stderr.lower() + assert "drift detection skipped" not in result.stdout.lower() + + def test_e4_no_drift_suppresses_warning_in_json_mode( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """JSON consumers must not see the human warning.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + result = _audit(project, monkeypatch, "--ci", "--no-drift", "-f", "json") + # JSON must be parseable and must NOT contain the drift section. + data = json.loads(result.stdout) + assert "drift" not in data, ( + f"expected no drift section under --no-drift, got: {list(data.keys())}" + ) + assert "drift detection skipped" not in result.stderr.lower() + + def test_e5_no_drift_with_modified_file_skips_drift_check( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """--no-drift removes the ``drift`` check from the CI gate. + + Note: ``content-integrity`` (a separate baseline check, PR #889) + ALSO catches modified files via hashes, so the overall exit code + may still be 1. The point of this test is that the *drift* check + is absent, not that exit becomes 0. + """ + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered\n" + ) + + result = _audit(project, monkeypatch, "--ci", "--no-drift", "-f", "json") + checks = _checks_by_name(result.stdout) + assert "drift" not in checks, ( + f"--no-drift should remove the drift check, got: {list(checks)}" + ) + + def test_e6_default_audit_runs_drift_check( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Drift must be on by default in ``--ci`` mode.""" + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + result = _audit(project, monkeypatch, "--ci", "-f", "json") + checks = _checks_by_name(result.stdout) + assert "drift" in checks, "drift check missing from default --ci payload" + assert checks["drift"]["passed"] is True + + def test_e7_default_audit_emits_no_skip_warning( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + result = _audit(project, monkeypatch) + combined = (result.stdout or "") + (result.stderr or "") + assert "drift detection skipped" not in combined.lower() + + def test_e8_no_drift_help_text_documents_flag( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + runner = CliRunner() + result = runner.invoke(cli, ["audit", "--help"], catch_exceptions=False) + assert result.exit_code == 0 + assert "--no-drift" in result.stdout + + def test_e9_text_mode_drift_summary_includes_kind_and_path( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered\n" + ) + + result = _audit(project, monkeypatch) + combined = result.stdout + result.stderr + assert "modified" in combined + assert ".github/instructions/rules.instructions.md" in combined + + def test_e10_bare_audit_surfaces_cache_miss_on_stderr( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """When drift fails (CacheMissError, CachePinError) and produces no + findings, bare ``apm audit`` MUST tell the user on stderr -- silence + is a UX trap (user cannot tell "drift was clean" from "drift never + ran"). Per dev-ux + cli-logging-ux panel feedback (PR #1137).""" + from apm_cli.install.drift import CacheMissError + + project = _make_apm_project(tmp_path) + _install(project, monkeypatch) + + def _boom(*_args, **_kwargs): + raise CacheMissError( + "cache miss for org/foo@deadbeef: expected /tmp/x; " + "run 'apm install' to populate the cache" + ) + + # Patch the symbol where ci_checks._check_drift looks it up. + monkeypatch.setattr("apm_cli.install.drift.run_replay", _boom) + + result = _audit(project, monkeypatch) + # Bare audit is advisory: exit code is not gated on drift failure. + assert result.exit_code in {0, 2} + # The stderr-warning contract. + assert "drift check could not run" in result.stderr.lower() + assert "cache miss" in result.stderr.lower() diff --git a/tests/integration/test_drift_check_e2e.py b/tests/integration/test_drift_check_e2e.py new file mode 100644 index 000000000..4d8a6f303 --- /dev/null +++ b/tests/integration/test_drift_check_e2e.py @@ -0,0 +1,394 @@ +"""End-to-end tests for ``apm audit`` drift detection. + +These tests prove the user-observable contracts of the drift system: + + * No-write contract -- ``apm audit`` never mutates the working tree + (snapshot/diff every file under the project root). + * Air-gap proof -- a populated cache lets the entire flow + (install + audit) run with subprocess sentinels installed against + ``git``/``gh``/``curl``/``wget``. + * Performance smoke -- a small project audits in well under the 30s + budget the matrix calls out. + * SARIF and JSON shapes are stable enough for CI consumers. + * The full ``install -> tamper -> audit -> reinstall -> audit`` loop + converges back to a clean state. +""" + +from __future__ import annotations + +import json +import os +import subprocess +import time +from pathlib import Path +from typing import Any + +import pytest +import yaml +from click.testing import CliRunner + +from apm_cli.cli import cli + +# --------------------------------------------------------------------------- +# Helpers (kept local so this file is self-contained) +# --------------------------------------------------------------------------- + + +_INSTRUCTION_BYTES = b'---\napplyTo: "**"\n---\n# Rules\n\nE2E fixture content.\n' + + +def _make_project(tmp_path: Path, name: str = "drift-e2e") -> Path: + project = tmp_path / name + project.mkdir() + (project / "apm.yml").write_bytes(yaml.safe_dump({"name": name, "version": "1.0.0"}).encode()) + inst_dir = project / ".apm" / "instructions" + inst_dir.mkdir(parents=True) + (inst_dir / "rules.instructions.md").write_bytes(_INSTRUCTION_BYTES) + return project + + +def _run(args: list[str]) -> Any: + return CliRunner().invoke(cli, args, catch_exceptions=False) + + +def _snapshot_tree(root: Path) -> dict[str, tuple[int, bytes]]: + import hashlib + + snap: dict[str, tuple[int, bytes]] = {} + for path in sorted(root.rglob("*")): + if not path.is_file(): + continue + rel = path.relative_to(root).as_posix() + data = path.read_bytes() + snap[rel] = (len(data), hashlib.sha256(data).digest()) + return snap + + +_NETWORK_BINARIES = {"gh", "curl", "wget"} +_orig_subprocess_run = subprocess.run +_orig_subprocess_popen = subprocess.Popen + + +def _network_sentinel_run(*args, **kwargs): + cmd = args[0] if args else kwargs.get("args", []) + if isinstance(cmd, (list, tuple)) and cmd: + if os.path.basename(str(cmd[0])) in _NETWORK_BINARIES: + raise AssertionError(f"Unexpected network subprocess: {os.path.basename(str(cmd[0]))}") + elif isinstance(cmd, str) and cmd: + first = cmd.split(maxsplit=1)[0] + if os.path.basename(first) in _NETWORK_BINARIES: + raise AssertionError(f"Unexpected network subprocess: {first}") + return _orig_subprocess_run(*args, **kwargs) + + +def _network_sentinel_popen(*args, **kwargs): + cmd = args[0] if args else kwargs.get("args", []) + if isinstance(cmd, (list, tuple)) and cmd: + if os.path.basename(str(cmd[0])) in _NETWORK_BINARIES: + raise AssertionError(f"Unexpected network Popen: {os.path.basename(str(cmd[0]))}") + return _orig_subprocess_popen(*args, **kwargs) + + +# --------------------------------------------------------------------------- +# E2E tests +# --------------------------------------------------------------------------- + + +class TestDriftE2E: + def test_apm_audit_makes_no_writes_to_working_tree( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """No-write contract -- audit must never mutate the project tree. + + Snapshot every file (size + SHA-256) before and after a + clean-state ``apm audit --ci`` run; trees must be byte-equal. + """ + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + + before = _snapshot_tree(project) + result = _run(["audit", "--ci"]) + after = _snapshot_tree(project) + + assert result.exit_code == 0 + assert before == after, "apm audit mutated the working tree (no-write contract violated)" + + def test_apm_audit_makes_no_writes_when_drift_present( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Even with drift, audit only reports -- never auto-heals.""" + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + + deployed = project / ".github" / "instructions" / "rules.instructions.md" + deployed.write_bytes(b"# tampered\n") + + before = _snapshot_tree(project) + result = _run(["audit", "--ci"]) + after = _snapshot_tree(project) + + assert result.exit_code == 1 + assert before == after, "apm audit auto-healed drift -- contract violated" + # Tampered bytes still on disk afterwards. + assert deployed.read_bytes() == b"# tampered\n" + + def test_audit_runs_without_network_subprocesses( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Air-gap proof -- block ``gh``/``curl``/``wget``; audit must succeed. + + ``git`` is intentionally NOT blocked: ``audit --ci`` runs + ``git remote get-url origin`` for local policy auto-discovery + (no network involved). Real network calls would route through + ``gh``, ``curl``, or ``wget``. + """ + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + + monkeypatch.setattr("subprocess.run", _network_sentinel_run) + monkeypatch.setattr("subprocess.Popen", _network_sentinel_popen) + + result = _run(["audit", "--ci", "-f", "json"]) + assert result.exit_code == 0, ( + f"audit failed under network sentinel: {result.stdout}\n{result.stderr}" + ) + + def test_audit_completes_within_smoke_budget( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Performance smoke: a small project must audit in well under 30s. + + Budget is generous -- a real regression would balloon to + minutes via repeated cache lookups or accidental re-downloads. + """ + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + + start = time.perf_counter() + result = _run(["audit", "--ci"]) + elapsed = time.perf_counter() - start + + assert result.exit_code == 0 + assert elapsed < 30.0, f"audit took {elapsed:.2f}s, budget is 30s" + + def test_install_audit_tamper_audit_reinstall_audit_loop( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Round-trip: clean -> tampered (drift) -> reinstall -> clean.""" + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + + # Phase 1: clean. + assert _run(["audit", "--ci"]).exit_code == 0 + + # Phase 2: tamper -> drift detected. + deployed = project / ".github" / "instructions" / "rules.instructions.md" + deployed.write_bytes(b"# tampered\n") + assert _run(["audit", "--ci"]).exit_code == 1 + + # Phase 3: reinstall heals -> drift cleared. + assert _run(["install"]).exit_code == 0 + assert _run(["audit", "--ci"]).exit_code == 0 + + def test_audit_ci_json_payload_has_stable_top_level_keys( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """JSON consumers depend on the top-level shape.""" + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + + result = _run(["audit", "--ci", "-f", "json"]) + payload = json.loads(result.stdout) + + # Required keys for the CI schema. + for key in ("passed", "checks", "summary"): + assert key in payload, f"missing top-level key {key!r}" + + # ``drift`` section present in default mode (no --no-drift). + assert "drift" in payload + assert isinstance(payload["drift"].get("drift"), list) + + def test_audit_ci_sarif_payload_is_valid_sarif( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """SARIF output remains parseable and has the SARIF skeleton.""" + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + # Introduce drift so SARIF carries a result. + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered\n" + ) + + result = _run(["audit", "--ci", "-f", "sarif"]) + # SARIF must parse as JSON. + payload = json.loads(result.stdout) + assert payload.get("$schema", "").endswith(".json") or "schema" in payload + assert payload.get("version") == "2.1.0" + runs = payload.get("runs") or [] + assert runs and isinstance(runs, list) + # At least one drift result with the expected rule id prefix. + rule_ids = {r.get("ruleId", "") for r in (runs[0].get("results") or [])} + assert any(rid.startswith("apm/drift/") for rid in rule_ids), ( + f"no drift rule ids in SARIF: {rule_ids}" + ) + + def test_audit_text_mode_drift_section_uses_ascii_only( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Cross-platform encoding -- output must stay within ASCII. + + Per ``encoding.instructions.md``: Windows cp1252 raises on + non-ASCII, so drift output must avoid emoji and box characters. + We tolerate the few Rich-table box chars that the *baseline* + audit table uses, but the drift-specific summary section must + be pure ASCII. + """ + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered\n" + ) + + result = _run(["audit"]) + # Bare `apm audit` is advisory: drift is rendered but does not + # fail the run (closes contract bug from PR #1137 review). Use + # `apm audit --ci` for gated drift. + assert result.exit_code == 0 + + combined = (result.stdout or "") + (result.stderr or "") + # Filter to lines that contain drift-specific markers. + for line in combined.splitlines(): + if "Drift detected" in line or ("modified" in line and ".github" in line): + for ch in line: + assert ord(ch) < 128, f"non-ASCII char {ch!r} in drift output line: {line!r}" + + def test_audit_with_force_install_cache_only_reuses_lockfile( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Audit-after-install must succeed without consulting the network. + + We do not patch subprocess here; instead, this is a sanity check + that successive audits use only the lockfile + scratch area. + """ + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + + for _ in range(3): + r = _run(["audit", "--ci"]) + assert r.exit_code == 0 + + def test_audit_cli_help_documents_drift_flag(self) -> None: + """User-discoverable: ``apm audit --help`` mentions ``--no-drift``.""" + result = _run(["audit", "--help"]) + assert result.exit_code == 0 + assert "--no-drift" in result.stdout + + def test_bare_audit_with_drift_exits_zero_but_ci_audit_exits_one( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Drift exit-code contract (regression for PR #1137 review). + + * Bare ``apm audit`` is advisory: drift is rendered but exit + stays 0 so users can run audit locally without their shell + treating drift as a fatal error. + * ``apm audit --ci`` is the explicit gate: the same drift state + must produce exit code 1 so CI pipelines fail closed. + """ + project = _make_project(tmp_path) + monkeypatch.chdir(project) + assert _run(["install"]).exit_code == 0 + + # Tamper with a deployed file so drift is non-empty. + (project / ".github" / "instructions" / "rules.instructions.md").write_bytes( + b"# tampered locally\n" + ) + + bare = _run(["audit"]) + assert bare.exit_code == 0, ( + f"bare audit must be advisory on drift; got exit={bare.exit_code} " + f"stdout={bare.stdout!r} stderr={bare.stderr!r}" + ) + bare_combined = (bare.stdout or "") + (bare.stderr or "") + assert "Drift detected" in bare_combined or "drift" in bare_combined.lower(), ( + "bare audit must still RENDER drift even though exit is 0" + ) + + ci = _run(["audit", "--ci"]) + assert ci.exit_code == 1, ( + f"--ci audit must gate on drift; got exit={ci.exit_code} " + f"stdout={ci.stdout!r} stderr={ci.stderr!r}" + ) + + def test_apm_install_writes_cache_pin_marker_for_each_remote_dep( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """End-to-end proof of the cache-pin contract (PR #1137 follow-up). + + After ``apm install`` finishes, every cached non-local dep + whose lockfile entry has a ``resolved_commit`` MUST carry a + ``.apm-pin`` JSON marker recording that exact commit. + + This drives the supply-chain hardening: a future ``apm audit`` + on the same workspace will refuse to compare a stale cache + against an updated lockfile because the marker will not + match. We exercise the WRITE side here with a synthetic + remote-style locked dep and the corresponding cached payload, + because spinning up a real remote install is out of scope for + a hermetic test. + """ + from apm_cli.deps.lockfile import LockedDependency, LockFile, get_lockfile_path + from apm_cli.install.cache_pin import ( + MARKER_FILENAME, + SCHEMA_VERSION, + verify_marker, + ) + + project = _make_project(tmp_path) + monkeypatch.chdir(project) + + # Pre-populate apm_modules to simulate a previously-cached + # remote dep whose marker is missing (e.g. cache pre-dates + # this release of APM). + apm_modules = project / "apm_modules" + cached_pkg = apm_modules / "owner" / "repo" + cached_pkg.mkdir(parents=True) + (cached_pkg / "apm.yml").write_bytes( + yaml.safe_dump({"name": "repo", "version": "1.0.0"}).encode() + ) + assert not (cached_pkg / MARKER_FILENAME).exists() + + # Pre-seed apm.lock.yaml with a remote dep so LockfileBuilder + # has something to mark (an `apm install` rebuild from manifest + # alone would not include this dep). + commit_sha = "deadc0de" * 5 + seed_lock = LockFile() + seed_lock.add_dependency( + LockedDependency( + repo_url="owner/repo", + host="github.com", + resolved_commit=commit_sha, + package_type="apm_package", + ) + ) + seed_lock.save(get_lockfile_path(project)) + + result = _run(["install"]) + assert result.exit_code == 0, ( + f"install failed: stdout={result.stdout!r} stderr={result.stderr!r}" + ) + + marker = cached_pkg / MARKER_FILENAME + assert marker.exists(), "install must self-heal pre-existing caches by writing the marker" + verify_marker(cached_pkg, commit_sha) + payload = json.loads(marker.read_text(encoding="utf-8")) + assert payload["schema_version"] == SCHEMA_VERSION + assert payload["resolved_commit"] == commit_sha diff --git a/tests/unit/install/test_cache_pin.py b/tests/unit/install/test_cache_pin.py new file mode 100644 index 000000000..ec338496d --- /dev/null +++ b/tests/unit/install/test_cache_pin.py @@ -0,0 +1,262 @@ +"""Unit tests for the cache-pin marker module. + +These tests pin the supply-chain hardening contract (PR #1137 follow-up): +``apm install`` writes a ``.apm-pin`` marker into each cached package +root, and ``apm audit`` verifies the marker matches the lockfile's +``resolved_commit`` before running drift replay. + +Coverage: + * Positive: matching marker passes verification. + * Missing: absent marker -> CachePinError. + * Malformed JSON: garbled marker -> CachePinError. + * Wrong schema_version: future-incompatible marker -> CachePinError. + * Missing resolved_commit field -> CachePinError. + * Mismatched commit -> CachePinError. + * sync_markers_for_lockfile skips local deps and deps without commits. + * sync_markers_for_lockfile is idempotent (re-runs overwrite cleanly). +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from apm_cli.deps.lockfile import LockedDependency, LockFile +from apm_cli.install.cache_pin import ( + MARKER_FILENAME, + SCHEMA_VERSION, + CachePinError, + sync_markers_for_lockfile, + verify_marker, + write_marker, +) + + +def _make_install_dir(tmp_path: Path, name: str = "pkg") -> Path: + install = tmp_path / name + install.mkdir(parents=True) + return install + + +def test_write_then_verify_matching_commit_passes(tmp_path: Path) -> None: + install = _make_install_dir(tmp_path) + write_marker(install, "deadbeef" * 5) + # Should NOT raise. + verify_marker(install, "deadbeef" * 5) + + +def test_verify_missing_marker_raises_with_actionable_message(tmp_path: Path) -> None: + install = _make_install_dir(tmp_path) # no marker written + with pytest.raises(CachePinError) as exc_info: + verify_marker(install, "deadbeef" * 5) + msg = str(exc_info.value) + assert "missing" in msg.lower() + assert MARKER_FILENAME in msg + + +def test_verify_malformed_json_raises(tmp_path: Path) -> None: + install = _make_install_dir(tmp_path) + (install / MARKER_FILENAME).write_text("{not json", encoding="utf-8") + with pytest.raises(CachePinError) as exc_info: + verify_marker(install, "deadbeef" * 5) + assert "JSON" in str(exc_info.value) or "json" in str(exc_info.value) + + +def test_verify_non_object_payload_raises(tmp_path: Path) -> None: + install = _make_install_dir(tmp_path) + (install / MARKER_FILENAME).write_text('"a string"', encoding="utf-8") + with pytest.raises(CachePinError) as exc_info: + verify_marker(install, "deadbeef" * 5) + assert "object" in str(exc_info.value).lower() + + +def test_verify_unsupported_schema_version_raises(tmp_path: Path) -> None: + install = _make_install_dir(tmp_path) + payload = {"schema_version": 999, "resolved_commit": "deadbeef" * 5} + (install / MARKER_FILENAME).write_text(json.dumps(payload), encoding="utf-8") + with pytest.raises(CachePinError) as exc_info: + verify_marker(install, "deadbeef" * 5) + assert "schema_version" in str(exc_info.value) + + +def test_verify_missing_resolved_commit_field_raises(tmp_path: Path) -> None: + install = _make_install_dir(tmp_path) + payload = {"schema_version": SCHEMA_VERSION} + (install / MARKER_FILENAME).write_text(json.dumps(payload), encoding="utf-8") + with pytest.raises(CachePinError) as exc_info: + verify_marker(install, "deadbeef" * 5) + assert "resolved_commit" in str(exc_info.value) + + +def test_verify_commit_mismatch_raises_with_both_values(tmp_path: Path) -> None: + install = _make_install_dir(tmp_path) + write_marker(install, "aaaa" * 10) + with pytest.raises(CachePinError) as exc_info: + verify_marker(install, "bbbb" * 10) + msg = str(exc_info.value) + assert "mismatch" in msg.lower() + assert "aaaa" * 10 in msg + assert "bbbb" * 10 in msg + + +def test_write_marker_is_idempotent(tmp_path: Path) -> None: + install = _make_install_dir(tmp_path) + write_marker(install, "first" * 8) + write_marker(install, "second" * 7) + verify_marker(install, "second" * 7) + + +def test_write_marker_silently_skips_missing_dir(tmp_path: Path) -> None: + # Should not raise even if the install dir does not exist (cache wiped). + write_marker(tmp_path / "does-not-exist", "deadbeef" * 5) + + +def test_sync_markers_writes_for_remote_deps_with_commits(tmp_path: Path) -> None: + apm_modules = tmp_path / "apm_modules" + pkg_dir = apm_modules / "owner" / "repo" + pkg_dir.mkdir(parents=True) + + lockfile = LockFile() + dep = LockedDependency( + repo_url="owner/repo", + host="github.com", + resolved_commit="cafebabe" * 5, + package_type="apm_package", + ) + lockfile.add_dependency(dep) + + written = sync_markers_for_lockfile(lockfile, tmp_path, apm_modules) + assert written == 1 + verify_marker(pkg_dir, "cafebabe" * 5) + + +def test_sync_markers_skips_local_deps(tmp_path: Path) -> None: + apm_modules = tmp_path / "apm_modules" + apm_modules.mkdir() + + lockfile = LockFile() + local_dep = LockedDependency( + repo_url="./local-pkg", + source="local", + local_path="./local-pkg", + resolved_commit=None, + ) + lockfile.add_dependency(local_dep) + + written = sync_markers_for_lockfile(lockfile, tmp_path, apm_modules) + assert written == 0 + + +def test_sync_markers_warns_on_remote_unpinned_dep( + tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + """Remote dep without ``resolved_commit`` MUST emit a stderr warning. + + Silent no-op was a supply-chain fail-open: an unpinned remote dep + cannot get a marker, so drift cannot verify its cache freshness. + Per supply-chain panel feedback (PR #1137), surface this loudly so + operators notice and re-pin. Marker count is still zero (nothing + to write), but the warning is mandatory. + """ + apm_modules = tmp_path / "apm_modules" + pkg_dir = apm_modules / "owner" / "repo" + pkg_dir.mkdir(parents=True) + + lockfile = LockFile() + dep = LockedDependency( + repo_url="owner/repo", + host="github.com", + resolved_commit=None, # branch-tracked dep without pin + ) + lockfile.add_dependency(dep) + + written = sync_markers_for_lockfile(lockfile, tmp_path, apm_modules) + assert written == 0 + assert not (pkg_dir / MARKER_FILENAME).exists() + + captured = capsys.readouterr() + combined = captured.out + captured.err + assert "owner/repo" in combined + assert "no resolved_commit" in combined + assert "apm install" in combined + + +def test_find_unpinned_remote_deps_excludes_local_and_pinned(tmp_path: Path) -> None: + """find_unpinned_remote_deps returns only remote deps with no commit.""" + lockfile = LockFile() + lockfile.add_dependency( + LockedDependency( + repo_url="./local-pkg", + source="local", + local_path="./local-pkg", + resolved_commit=None, + ) + ) + lockfile.add_dependency( + LockedDependency( + repo_url="org/pinned", + host="github.com", + resolved_commit="cafebabe" * 5, + ) + ) + lockfile.add_dependency( + LockedDependency( + repo_url="org/unpinned", + host="github.com", + resolved_commit=None, + ) + ) + + from apm_cli.install.cache_pin import find_unpinned_remote_deps + + unpinned = find_unpinned_remote_deps(lockfile) + assert unpinned == ["org/unpinned"] + + +def test_sync_markers_skips_deps_with_no_cached_install(tmp_path: Path) -> None: + """A lockfile entry whose cache was wiped MUST NOT crash sync.""" + apm_modules = tmp_path / "apm_modules" + apm_modules.mkdir() + # Note: NO pkg_dir created -- cache is empty. + + lockfile = LockFile() + dep = LockedDependency( + repo_url="owner/repo", + host="github.com", + resolved_commit="cafebabe" * 5, + ) + lockfile.add_dependency(dep) + + written = sync_markers_for_lockfile(lockfile, tmp_path, apm_modules) + assert written == 0 + + +def test_sync_markers_self_heals_caches_missing_marker(tmp_path: Path) -> None: + """Regression: an existing cache without a marker MUST get marked. + + This is the unchanged-lockfile self-heal scenario: a user upgraded + APM, ran drift, hit a CachePinError, then ran ``apm install``. + The lockfile YAML did not change (already current), but + ``LockfileBuilder`` must still call sync_markers so the cache is + marked and the next drift run succeeds. + """ + apm_modules = tmp_path / "apm_modules" + pkg_dir = apm_modules / "owner" / "repo" + pkg_dir.mkdir(parents=True) + # Simulate a pre-existing cache: contents present, but NO marker. + (pkg_dir / "some-file.md").write_text("cached", encoding="utf-8") + assert not (pkg_dir / MARKER_FILENAME).exists() + + lockfile = LockFile() + dep = LockedDependency( + repo_url="owner/repo", + host="github.com", + resolved_commit="cafebabe" * 5, + ) + lockfile.add_dependency(dep) + + written = sync_markers_for_lockfile(lockfile, tmp_path, apm_modules) + assert written == 1 + verify_marker(pkg_dir, "cafebabe" * 5) diff --git a/tests/unit/install/test_drift.py b/tests/unit/install/test_drift.py new file mode 100644 index 000000000..30f826c90 --- /dev/null +++ b/tests/unit/install/test_drift.py @@ -0,0 +1,409 @@ +"""Unit tests for the drift-detection replay engine. + +Covers: +* Normalization helpers (build-id strip, line endings, BOM). +* Public dataclass immutability contracts. +* Diff engine kinds (modified, unintegrated, orphaned, ignored). +* Inline-diff size cap. +* SARIF rule ID prefix. +* CheckLogger phase markers go to stderr. +""" + +from __future__ import annotations + +import dataclasses + +import pytest + +from apm_cli.deps.lockfile import LockedDependency, LockFile +from apm_cli.install.drift import ( + CheckLogger, + DriftFinding, + ReplayConfig, + _normalize_line_endings, + _strip_bom, + _strip_build_id, + diff_scratch_against_project, + render_drift_sarif, +) + +# --------------------------------------------------------------------------- +# Normalization helpers +# --------------------------------------------------------------------------- + + +def test_strip_build_id_removes_header_preserves_rest(): + src = b"# Title\n\nbody line\ntrailing\n" + out = _strip_build_id(src) + assert b"Build ID" not in out + assert b"# Title\n" in out + assert b"body line\n" in out + assert b"trailing\n" in out + + +def test_normalize_line_endings_crlf_to_lf(): + assert _normalize_line_endings(b"a\r\nb\r\nc") == b"a\nb\nc" + assert _normalize_line_endings(b"no-crlf") == b"no-crlf" + + +def test_strip_bom_at_start_only(): + assert _strip_bom(b"\xef\xbb\xbfhello") == b"hello" + # BOM mid-stream must not be removed (not a real BOM there). + mid = b"x\xef\xbb\xbfy" + assert _strip_bom(mid) == mid + + +# --------------------------------------------------------------------------- +# Dataclass contracts +# --------------------------------------------------------------------------- + + +def test_replay_config_is_frozen(tmp_path): + cfg = ReplayConfig( + project_root=tmp_path, + lockfile_path=tmp_path / "apm.lock.yaml", + ) + with pytest.raises(dataclasses.FrozenInstanceError): + cfg.cache_only = False # type: ignore[misc] + + +def test_drift_finding_is_frozen(): + f = DriftFinding(path=".apm/x.md", kind="modified") + with pytest.raises(dataclasses.FrozenInstanceError): + f.kind = "orphaned" # type: ignore[misc] + + +# --------------------------------------------------------------------------- +# Diff engine +# --------------------------------------------------------------------------- + + +def _empty_lockfile() -> LockFile: + return LockFile() + + +def _lockfile_with_tracked(paths: list[str]) -> LockFile: + lock = LockFile() + dep = LockedDependency(repo_url="example/pkg", deployed_files=list(paths)) + lock.add_dependency(dep) + return lock + + +def _write(path, content: bytes = b"hello\n"): + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(content) + + +def test_diff_engine_modified_kind(tmp_path): + scratch = tmp_path / "scratch" + project = tmp_path / "project" + _write(scratch / ".apm" / "skills" / "x.md", b"new content\n") + _write(project / ".apm" / "skills" / "x.md", b"old content\n") + + findings = diff_scratch_against_project(scratch, project, _empty_lockfile(), targets=[]) + assert len(findings) == 1 + assert findings[0].kind == "modified" + assert findings[0].path == ".apm/skills/x.md" + + +def test_diff_engine_modified_ignored_after_normalization(tmp_path): + scratch = tmp_path / "scratch" + project = tmp_path / "project" + _write(scratch / ".apm" / "skills" / "x.md", b"line1\nline2\n") + # Same logical content but CRLF + BOM + spurious build id header. + _write( + project / ".apm" / "skills" / "x.md", + b"\xef\xbb\xbf\r\nline1\r\nline2\r\n", + ) + + findings = diff_scratch_against_project(scratch, project, _empty_lockfile(), targets=[]) + assert findings == [] + + +# Inverse-normalization regression suite: each guard MUST NOT mask a +# real content change that happens to coexist with a +# legitimately-ignorable difference. Without these tests, a future +# refactor that over-broadens normalization (e.g. stripping all HTML +# comments instead of just Build-ID, or normalizing all whitespace) +# would silently hide real drift. + + +def test_normalization_does_not_mask_real_drift_under_build_id(tmp_path): + """A real text change MUST be reported even when one side has a Build-ID header.""" + scratch = tmp_path / "scratch" + project = tmp_path / "project" + _write( + scratch / ".apm" / "skills" / "x.md", + b"\nallowlisted line\n", + ) + # Project: different Build-ID (ignorable) AND a real content change + # ("BLOCKED line" instead of "allowlisted line"). + _write( + project / ".apm" / "skills" / "x.md", + b"\nBLOCKED line\n", + ) + + findings = diff_scratch_against_project(scratch, project, _empty_lockfile(), targets=[]) + assert len(findings) == 1 + assert findings[0].kind == "modified" + assert findings[0].path == ".apm/skills/x.md" + + +def test_normalization_does_not_mask_real_drift_under_bom(tmp_path): + """BOM stripping MUST NOT mask a real change in the rest of the file.""" + scratch = tmp_path / "scratch" + project = tmp_path / "project" + _write(scratch / ".apm" / "skills" / "x.md", b"hello world\n") + # Project starts with a BOM (ignorable) but has different body content. + _write(project / ".apm" / "skills" / "x.md", b"\xef\xbb\xbfhello WORLD\n") + + findings = diff_scratch_against_project(scratch, project, _empty_lockfile(), targets=[]) + assert len(findings) == 1 + assert findings[0].kind == "modified" + + +def test_normalization_does_not_mask_real_drift_under_crlf(tmp_path): + """CRLF normalization MUST NOT mask a real character change.""" + scratch = tmp_path / "scratch" + project = tmp_path / "project" + _write(scratch / ".apm" / "skills" / "x.md", b"alpha\nbeta\ngamma\n") + # Project uses CRLF (ignorable) AND has a different middle line. + _write( + project / ".apm" / "skills" / "x.md", + b"alpha\r\nBETA-changed\r\ngamma\r\n", + ) + + findings = diff_scratch_against_project(scratch, project, _empty_lockfile(), targets=[]) + assert len(findings) == 1 + assert findings[0].kind == "modified" + + +def test_diff_engine_unintegrated_kind(tmp_path): + scratch = tmp_path / "scratch" + project = tmp_path / "project" + _write(scratch / ".apm" / "skills" / "missing.md", b"x\n") + project.mkdir() + + findings = diff_scratch_against_project(scratch, project, _empty_lockfile(), targets=[]) + assert len(findings) == 1 + assert findings[0].kind == "unintegrated" + assert findings[0].path == ".apm/skills/missing.md" + + +def test_diff_engine_orphaned_kind(tmp_path): + scratch = tmp_path / "scratch" + project = tmp_path / "project" + scratch.mkdir() + _write(project / ".apm" / "skills" / "old.md", b"stale\n") + + lock = _lockfile_with_tracked([".apm/skills/old.md"]) + + findings = diff_scratch_against_project(scratch, project, lock, targets=[]) + assert len(findings) == 1 + assert findings[0].kind == "orphaned" + assert findings[0].path == ".apm/skills/old.md" + + +def test_diff_engine_ignores_untracked_governed_file(tmp_path): + scratch = tmp_path / "scratch" + project = tmp_path / "project" + scratch.mkdir() + # User-authored extra file in a governed dir, NOT tracked in lockfile. + _write(project / ".apm" / "skills" / "user-wrote-this.md", b"hand-edited\n") + + findings = diff_scratch_against_project(scratch, project, _empty_lockfile(), targets=[]) + assert findings == [] + + +def test_diff_engine_100kb_inline_cap(tmp_path): + scratch = tmp_path / "scratch" + project = tmp_path / "project" + big_a = b"a" * (200 * 1024) + big_b = b"b" * (200 * 1024) + _write(scratch / ".apm" / "skills" / "huge.md", big_a) + _write(project / ".apm" / "skills" / "huge.md", big_b) + + findings = diff_scratch_against_project(scratch, project, _empty_lockfile(), targets=[]) + assert len(findings) == 1 + assert findings[0].kind == "modified" + assert "too large for inline diff" in findings[0].inline_diff + + +# --------------------------------------------------------------------------- +# Renderers +# --------------------------------------------------------------------------- + + +def test_render_sarif_rule_id_prefix(): + findings = [ + DriftFinding(path="a.md", kind="modified", package="pkg-a"), + DriftFinding(path="b.md", kind="orphaned", package="pkg-b"), + ] + results = render_drift_sarif(findings) + assert results[0]["ruleId"] == "apm/drift/modified" + assert results[1]["ruleId"] == "apm/drift/orphaned" + assert results[0]["locations"][0]["physicalLocation"]["artifactLocation"]["uri"] == "a.md" + assert results[1]["properties"]["package"] == "pkg-b" + + +# --------------------------------------------------------------------------- +# CheckLogger -- stderr only +# --------------------------------------------------------------------------- + + +def test_check_logger_phases_to_stderr(capsys): + logger = CheckLogger(verbose=False) + logger.replay_start() + logger.replay_complete(3) + logger.diff_start() + logger.findings(2) + logger.clean() + + captured = capsys.readouterr() + # Everything must be on stderr to keep stdout JSON-clean. + assert captured.out == "" + assert "Replaying install" in captured.err + assert "Replayed 3 package(s)" in captured.err + assert "Diffing scratch" in captured.err + assert "Drift detected: 2 file(s)" in captured.err + assert "No drift detected" in captured.err + # ASCII-only status symbols. + assert "[>]" in captured.err + assert "[+]" in captured.err + assert "[!]" in captured.err + + +def test_check_logger_scratch_root_emits_to_stderr_when_verbose(capsys, tmp_path): + """Verbose-mode scratch root announcement stays on stderr (B4 follow-up). + + Stdout must remain a clean JSON/SARIF channel. The scratch tmpdir + is diagnostic noise, not report payload, so it goes to stderr and + is gated on ``verbose`` so the normal-mode user never sees it. + """ + logger = CheckLogger(verbose=True) + logger.scratch_root(tmp_path / "drift-scratch-xyz") + captured = capsys.readouterr() + assert captured.out == "" + assert "drift scratch root:" in captured.err + assert "drift-scratch-xyz" in captured.err + assert "[i]" in captured.err + + +def test_check_logger_scratch_root_silent_when_not_verbose(capsys, tmp_path): + """Non-verbose mode must NOT leak the scratch tmpdir.""" + logger = CheckLogger(verbose=False) + logger.scratch_root(tmp_path / "drift-scratch-secret") + captured = capsys.readouterr() + assert captured.out == "" + assert "drift-scratch-secret" not in captured.err + assert captured.err == "" + + +# --------------------------------------------------------------------------- +# Supply-chain fail-closed: unpinned remote dep +# --------------------------------------------------------------------------- + + +def test_materialize_unpinned_remote_dep_raises_cache_miss(tmp_path): + """Remote dep with empty resolved_commit MUST fail-closed at audit. + + The cache could contain content from any commit on the referenced + branch. Drift cannot prove freshness without a marker, and a marker + cannot be written without a commit. Per supply-chain panel feedback + (PR #1137), refuse to replay rather than silently trust the cache. + """ + from apm_cli.install.drift import CacheMissError, _materialize_install_path + + apm_modules = tmp_path / "apm_modules" + apm_modules.mkdir() + dep = LockedDependency( + repo_url="org/unpinned", + host="github.com", + resolved_commit=None, + ) + + with pytest.raises(CacheMissError) as exc_info: + _materialize_install_path(dep, tmp_path, apm_modules, cache_only=True) + + msg = str(exc_info.value) + assert "org/unpinned" in msg + assert "no resolved_commit" in msg + assert "apm install" in msg + + +def test_materialize_local_dep_without_commit_does_not_raise(tmp_path): + """Local deps legitimately have no resolved_commit -- must not fail-closed.""" + from apm_cli.install.drift import _materialize_install_path + + apm_modules = tmp_path / "apm_modules" + apm_modules.mkdir() + local_pkg = tmp_path / "local-pkg" + local_pkg.mkdir() + dep = LockedDependency( + repo_url="./local-pkg", + source="local", + local_path="./local-pkg", + resolved_commit=None, + ) + # Should not raise the unpinned-remote guard. (May raise a different + # error depending on local-path resolution; we only assert the + # specific supply-chain message is NOT what surfaced.) + try: + _materialize_install_path(dep, tmp_path, apm_modules, cache_only=True) + except Exception as exc: # pragma: no cover -- only inspecting message + assert "no resolved_commit" not in str(exc), ( + "local deps must not trip the unpinned-remote guard" + ) + + +# --------------------------------------------------------------------------- +# Defense-in-depth: _ReadOnlyProjectGuard wired into run_replay +# --------------------------------------------------------------------------- + + +def test_run_replay_wraps_loop_with_readonly_guard(monkeypatch, tmp_path): + """A monkeypatched integrator that writes to project_root MUST trip the guard. + + Defense-in-depth: even though every integrator should be redirected + via ``scratch_root=scratch_root``, an accidental direct-path write + (or a future regression) would silently corrupt the working tree. + The guard catches it and raises ProtectedPathMutationError. + """ + from apm_cli.deps.lockfile import LockFile + from apm_cli.install.drift import ReplayConfig, run_replay + from apm_cli.utils.guards import ProtectedPathMutationError + + project_root = tmp_path / "proj" + project_root.mkdir() + (project_root / ".apm").mkdir() + (project_root / ".apm" / "tracked.md").write_text("baseline\n", encoding="utf-8") + + lockfile_path = project_root / "apm.lock.yaml" + # Seed local_deployed_files so LockFile.read() synthesizes the self-entry + # and the replay loop iterates at least once. + lock = LockFile() + lock.local_deployed_files = [".apm/tracked.md"] + lock.write(lockfile_path) + + # Monkey-patch integrate_package_primitives to write into project_root. + def _bad_integrator(*args, **kwargs): + # Simulate a leaky integrator that writes to project tree, not scratch. + (project_root / ".apm" / "leaked.md").write_text("oops\n", encoding="utf-8") + + monkeypatch.setattr( + "apm_cli.install.services.integrate_package_primitives", + _bad_integrator, + ) + + config = ReplayConfig( + project_root=project_root, + lockfile_path=lockfile_path, + targets=None, + cache_only=True, + ) + logger = CheckLogger(verbose=False) + + with pytest.raises(ProtectedPathMutationError) as exc_info: + run_replay(config, logger) + + assert "leaked.md" in str(exc_info.value) or "created:" in str(exc_info.value) diff --git a/tests/unit/install/test_drift_perf.py b/tests/unit/install/test_drift_perf.py new file mode 100644 index 000000000..29c68878b --- /dev/null +++ b/tests/unit/install/test_drift_perf.py @@ -0,0 +1,81 @@ +"""Performance smoke test for the drift replay engine. + +The matrix calls out a 30-second budget for a small project end-to-end. +This unit-level test exercises ``run_replay`` directly with 100 +synthetic ``.apm/instructions/*.instructions.md`` primitives and +asserts the replay completes in under 5 seconds on a stock dev box. + +If this regresses you have likely: + * Introduced an O(n^2) lookup in the diff engine, or + * Re-enabled a network or disk-cache fallback inside the replay. +""" + +from __future__ import annotations + +import time +from pathlib import Path + +import pytest +import yaml +from click.testing import CliRunner + +from apm_cli.cli import cli +from apm_cli.deps.lockfile import LockFile +from apm_cli.install.drift import ( + CheckLogger, + ReplayConfig, + diff_scratch_against_project, + run_replay, +) +from apm_cli.integration.targets import resolve_targets + +_INSTR_TEMPLATE = ( + b"---\n" + b'applyTo: "**"\n' + b"---\n" + b"# Rule {idx}\n" + b"\n" + b"Body line for primitive {idx}.\n" + b"Another line for primitive {idx}.\n" +) + + +def _make_large_project(tmp_path: Path, n_primitives: int) -> Path: + project = tmp_path / "perf-fixture" + project.mkdir() + (project / "apm.yml").write_bytes( + yaml.safe_dump({"name": "perf-fixture", "version": "1.0.0"}).encode() + ) + inst_dir = project / ".apm" / "instructions" + inst_dir.mkdir(parents=True) + for idx in range(n_primitives): + body = _INSTR_TEMPLATE.replace(b"{idx}", str(idx).encode()) + (inst_dir / f"rule-{idx:03d}.instructions.md").write_bytes(body) + return project + + +def test_drift_replay_under_5s_for_100_primitives( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + project = _make_large_project(tmp_path, n_primitives=100) + monkeypatch.chdir(project) + + install_result = CliRunner().invoke(cli, ["install"], catch_exceptions=False) + assert install_result.exit_code == 0, install_result.output + + targets = resolve_targets(project.resolve()) + cfg = ReplayConfig( + project_root=project.resolve(), + lockfile_path=project / "apm.lock.yaml", + ) + logger = CheckLogger(verbose=False) + lockfile = LockFile.read(cfg.lockfile_path) + assert lockfile is not None + + start = time.perf_counter() + scratch = run_replay(cfg, logger) + findings = diff_scratch_against_project(scratch, project.resolve(), lockfile, targets) + elapsed = time.perf_counter() - start + + assert findings == [], f"clean fixture must produce zero drift, got: {findings}" + assert elapsed < 5.0, f"drift replay+diff took {elapsed:.2f}s for 100 primitives (budget: 5s)" diff --git a/tests/unit/test_audit_ci_auto_discovery.py b/tests/unit/test_audit_ci_auto_discovery.py index 846cfe3cc..2c30300a5 100644 --- a/tests/unit/test_audit_ci_auto_discovery.py +++ b/tests/unit/test_audit_ci_auto_discovery.py @@ -113,7 +113,7 @@ def test_auto_discovery_finds_policy_runs_unmanaged_check( mock_discover.return_value = _make_policy_fetch_with_unmanaged_deny() with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci"]) + result = runner.invoke(audit, ["--ci", "--no-drift"]) # Auto-discovery should have been invoked. mock_discover.assert_called_once() @@ -126,7 +126,7 @@ def test_auto_discovery_no_policy_baseline_only_passes(self, mock_discover, runn mock_discover.return_value = _make_no_policy_fetch() with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci"]) + result = runner.invoke(audit, ["--ci", "--no-drift"]) mock_discover.assert_called_once() # Baseline-only (no unmanaged-file enforcement) -> exit 0. @@ -144,7 +144,7 @@ def test_no_policy_skips_auto_discovery(self, mock_discover, runner, tmp_path): mock_discover.return_value = _make_policy_fetch_with_unmanaged_deny() with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "--no-policy"]) + result = runner.invoke(audit, ["--ci", "--no-drift", "--no-policy"]) mock_discover.assert_not_called() assert result.exit_code == 0, result.output @@ -164,7 +164,7 @@ def test_fetch_failure_warn_proceeds(self, mock_discover, runner, tmp_path): ) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci"]) + result = runner.invoke(audit, ["--ci", "--no-drift"]) # Default warn -> proceed with baseline only. assert result.exit_code == 0, result.output @@ -183,6 +183,6 @@ def test_fetch_failure_block_exits_one(self, mock_discover, runner, tmp_path): ) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci"]) + result = runner.invoke(audit, ["--ci", "--no-drift"]) assert result.exit_code == 1, result.output diff --git a/tests/unit/test_audit_ci_command.py b/tests/unit/test_audit_ci_command.py index 8609d2b3d..d176c7e34 100644 --- a/tests/unit/test_audit_ci_command.py +++ b/tests/unit/test_audit_ci_command.py @@ -93,26 +93,26 @@ class TestCIIncompatibleFlags: def test_ci_with_strip(self, runner, tmp_path): _setup_clean_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "--strip"]) + result = runner.invoke(audit, ["--ci", "--no-drift", "--strip"]) assert result.exit_code != 0 def test_ci_with_dry_run(self, runner, tmp_path): _setup_clean_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "--dry-run"]) + result = runner.invoke(audit, ["--ci", "--no-drift", "--dry-run"]) assert result.exit_code != 0 def test_ci_with_file(self, runner, tmp_path): test_file = tmp_path / "dummy.md" test_file.write_text("hello", encoding="utf-8") with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "--file", str(test_file)]) + result = runner.invoke(audit, ["--ci", "--no-drift", "--file", str(test_file)]) assert result.exit_code != 0 def test_ci_with_package(self, runner, tmp_path): _setup_clean_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "some-package"]) + result = runner.invoke(audit, ["--ci", "--no-drift", "some-package"]) assert result.exit_code != 0 @@ -120,13 +120,13 @@ class TestCIExitCodes: def test_exit_0_all_pass(self, runner, tmp_path): _setup_clean_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci"]) + result = runner.invoke(audit, ["--ci", "--no-drift"]) assert result.exit_code == 0 def test_exit_1_on_failure(self, runner, tmp_path): _setup_failing_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci"]) + result = runner.invoke(audit, ["--ci", "--no-drift"]) assert result.exit_code == 1 @@ -134,7 +134,7 @@ class TestCIOutputFormats: def test_json_output(self, runner, tmp_path): _setup_clean_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "-f", "json"]) + result = runner.invoke(audit, ["--ci", "--no-drift", "-f", "json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "passed" in data @@ -145,7 +145,7 @@ def test_json_output(self, runner, tmp_path): def test_sarif_output(self, runner, tmp_path): _setup_clean_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "-f", "sarif"]) + result = runner.invoke(audit, ["--ci", "--no-drift", "-f", "sarif"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["version"] == "2.1.0" @@ -154,7 +154,7 @@ def test_sarif_output(self, runner, tmp_path): def test_json_output_with_failures(self, runner, tmp_path): _setup_failing_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "-f", "json"]) + result = runner.invoke(audit, ["--ci", "--no-drift", "-f", "json"]) assert result.exit_code == 1 data = json.loads(result.output) assert data["passed"] is False @@ -163,7 +163,7 @@ def test_json_output_with_failures(self, runner, tmp_path): def test_text_output_shows_checks(self, runner, tmp_path): _setup_clean_project(tmp_path) with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci"]) + result = runner.invoke(audit, ["--ci", "--no-drift"]) assert result.exit_code == 0 assert "passed" in result.output.lower() or "check" in result.output.lower() @@ -171,7 +171,7 @@ def test_output_to_file(self, runner, tmp_path): _setup_clean_project(tmp_path) outfile = tmp_path / "report.json" with patch("apm_cli.commands.audit.Path.cwd", return_value=tmp_path): - result = runner.invoke(audit, ["--ci", "-f", "json", "-o", str(outfile)]) + result = runner.invoke(audit, ["--ci", "--no-drift", "-f", "json", "-o", str(outfile)]) assert result.exit_code == 0 assert outfile.exists() data = json.loads(outfile.read_text(encoding="utf-8")) diff --git a/tests/unit/test_audit_policy_command.py b/tests/unit/test_audit_policy_command.py index 14c388f53..8e8f174aa 100644 --- a/tests/unit/test_audit_policy_command.py +++ b/tests/unit/test_audit_policy_command.py @@ -82,7 +82,7 @@ def test_ci_with_policy_file(self, runner, tmp_path, monkeypatch): result = runner.invoke( audit, - ["--ci", "--policy", str(policy_path)], + ["--ci", "--no-drift", "--policy", str(policy_path)], catch_exceptions=False, ) assert result.exit_code == 0 @@ -95,7 +95,7 @@ def test_ci_with_policy_json_output(self, runner, tmp_path, monkeypatch): result = runner.invoke( audit, - ["--ci", "--policy", str(policy_path), "-f", "json"], + ["--ci", "--no-drift", "--policy", str(policy_path), "-f", "json"], catch_exceptions=False, ) assert result.exit_code == 0 @@ -123,7 +123,7 @@ def test_ci_with_policy_deny_fails(self, runner, tmp_path, monkeypatch): result = runner.invoke( audit, - ["--ci", "--policy", str(policy_path)], + ["--ci", "--no-drift", "--policy", str(policy_path)], catch_exceptions=False, ) assert result.exit_code == 1 @@ -142,7 +142,7 @@ def test_ci_policy_org_discovery(self, runner, tmp_path, monkeypatch): ) as mock_disc: result = runner.invoke( audit, - ["--ci", "--policy", "org"], + ["--ci", "--no-drift", "--policy", "org"], catch_exceptions=False, ) mock_disc.assert_called_once() @@ -162,7 +162,7 @@ def test_policy_not_found_still_runs_baseline(self, runner, tmp_path, monkeypatc with patch("apm_cli.policy.discovery.discover_policy", return_value=mock_result): result = runner.invoke( audit, - ["--ci", "--policy", "org"], + ["--ci", "--no-drift", "--policy", "org"], catch_exceptions=False, ) assert result.exit_code == 0 @@ -188,7 +188,7 @@ def test_fetch_error_exits_1(self, runner, tmp_path, monkeypatch): with patch("apm_cli.policy.discovery.discover_policy", return_value=mock_result): result = runner.invoke( audit, - ["--ci", "--policy", "org"], + ["--ci", "--no-drift", "--policy", "org"], catch_exceptions=False, ) assert result.exit_code == 1 @@ -208,7 +208,7 @@ def test_no_cache_flag_accepted(self, runner, tmp_path, monkeypatch): ) as mock_disc: result = runner.invoke( audit, - ["--ci", "--policy", "org", "--no-cache"], + ["--ci", "--no-drift", "--policy", "org", "--no-cache"], catch_exceptions=False, ) mock_disc.assert_called_once() @@ -222,7 +222,7 @@ def test_no_cache_without_policy(self, runner, tmp_path, monkeypatch): result = runner.invoke( audit, - ["--ci", "--no-cache"], + ["--ci", "--no-drift", "--no-cache"], catch_exceptions=False, ) assert result.exit_code == 0 @@ -236,7 +236,7 @@ def test_baseline_only(self, runner, tmp_path, monkeypatch): result = runner.invoke( audit, - ["--ci", "-f", "json"], + ["--ci", "--no-drift", "-f", "json"], catch_exceptions=False, ) assert result.exit_code == 0 diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/utils/test_guards.py b/tests/unit/utils/test_guards.py new file mode 100644 index 000000000..06e84a18c --- /dev/null +++ b/tests/unit/utils/test_guards.py @@ -0,0 +1,84 @@ +"""Tests for ``utils.guards._ReadOnlyProjectGuard``.""" + +from __future__ import annotations + +import os +import time + +import pytest + +from apm_cli.utils.guards import ( + ProtectedPathMutationError, + _ReadOnlyProjectGuard, +) + + +def _bump_mtime(p): + now_ns = time.time_ns() + os.utime(p, ns=(now_ns, now_ns)) + + +def test_no_mutation_passes(tmp_path): + apm_dir = tmp_path / ".apm" + apm_dir.mkdir() + (apm_dir / "x.md").write_text("x") + with _ReadOnlyProjectGuard(tmp_path, [".apm"]): + pass + + +def test_modification_raises(tmp_path): + apm_dir = tmp_path / ".apm" + apm_dir.mkdir() + target = apm_dir / "x.md" + target.write_bytes(b"original") + + with pytest.raises(ProtectedPathMutationError, match=r"modified"): + with _ReadOnlyProjectGuard(tmp_path, [".apm"]): + time.sleep(0.01) + target.write_bytes(b"changed-content-different-size") + _bump_mtime(target) + + +def test_creation_under_protected_root_raises(tmp_path): + apm_dir = tmp_path / ".apm" + apm_dir.mkdir() + with pytest.raises(ProtectedPathMutationError, match=r"created"): + with _ReadOnlyProjectGuard(tmp_path, [".apm"]): + (apm_dir / "new.md").write_text("new") + + +def test_deletion_raises(tmp_path): + apm_dir = tmp_path / ".apm" + apm_dir.mkdir() + target = apm_dir / "x.md" + target.write_text("x") + with pytest.raises(ProtectedPathMutationError, match=r"deleted"): + with _ReadOnlyProjectGuard(tmp_path, [".apm"]): + target.unlink() + + +def test_missing_path_tolerated(tmp_path): + with _ReadOnlyProjectGuard(tmp_path, [".apm", "apm.lock.yaml"]): + pass + + +def test_existing_exception_not_masked(tmp_path): + apm_dir = tmp_path / ".apm" + apm_dir.mkdir() + (apm_dir / "x.md").write_text("x") + with pytest.raises(ValueError, match="boom"): + with _ReadOnlyProjectGuard(tmp_path, [".apm"]): + (apm_dir / "x.md").write_text("changed") + raise ValueError("boom") + + +def test_single_file_protected_path(tmp_path): + lock = tmp_path / "apm.lock.yaml" + lock.write_text("locked: true\n") + with _ReadOnlyProjectGuard(tmp_path, ["apm.lock.yaml"]): + pass + with pytest.raises(ProtectedPathMutationError, match=r"modified"): + with _ReadOnlyProjectGuard(tmp_path, ["apm.lock.yaml"]): + time.sleep(0.01) + lock.write_bytes(b"locked: false-and-longer\n") + _bump_mtime(lock) diff --git a/uv.lock b/uv.lock index c18c37419..c19639af6 100644 --- a/uv.lock +++ b/uv.lock @@ -179,7 +179,7 @@ wheels = [ [[package]] name = "apm-cli" -version = "0.12.0" +version = "0.12.1" source = { editable = "." } dependencies = [ { name = "click" },