diff --git a/CHANGELOG.md b/CHANGELOG.md index fab3c81..e449744 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,33 @@ Versioning follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html). --- +## [1.3.0] - 2026-03-09 + +CLI verifier and epoch-based key rotation system. + +### Added + +- **`capsule` CLI** -- command-line tool for verification, inspection, and key management. Installed as a console script via `pip install qp-capsule`. Zero new dependencies (stdlib `argparse` + existing `pynacl`). + - `capsule verify ` -- verify chain integrity from JSON files (`chain.json`) or SQLite databases (`--db`). Three verification levels: `--structural` (default, sequence + previous_hash linkage), `--full` (+ SHA3-256 recomputation), `--signatures` (+ Ed25519 verification via keyring). Output modes: colored terminal (default), `--json` (machine-readable for policy engines and CI), `--quiet` (exit code only: 0=pass, 1=fail, 2=error). + - `capsule inspect` -- show a capsule's full 6-section content with seal metadata. Lookup by `--seq` or `--id` from JSON files or SQLite databases. + - `capsule keys info` -- display keyring metadata, epoch history, and capsule counts per epoch. + - `capsule keys rotate` -- generate new Ed25519 key pair, retire current key, update keyring. No downtime. + - `capsule keys export-public` -- export current public key for third-party verification. + - `capsule hash ` -- compute SHA3-256 of any file. +- **Epoch-based key rotation** (`keyring.py`) -- automated key lifecycle management aligned with NIST SP 800-57. Keyring stored at `~/.quantumpipes/keyring.json` with atomic writes for crash safety. Supports backward-compatible verification across key rotations via fingerprint lookup. Seamless migration from existing single-key installations (auto-creates epoch 0 on first encounter). +- **Epoch-aware signature verification** -- `Seal(keyring=kr)` uses the capsule's `signed_by` fingerprint to look up the correct epoch's public key from the keyring. Capsules signed with old keys continue to verify after rotation. +- **`KeyringError` exception** -- dedicated exception for keyring operations (load, save, rotate, lookup). +- **100+ new tests** -- keyring creation, migration, rotation, lookup, registration, export, atomic writes, edge cases. CLI verification (structural/full/signatures), inspection, key management, hash utility, ANSI color support, cross-rotation verification, seal+keyring integration. 100% code coverage maintained. + +### Security + +- Key rotation follows NIST SP 800-57 lifecycle: Generation, Active, Retired, Destroyed. Private keys are securely replaced on rotation (old key overwritten with new). Public keys are retained in the keyring for backward-compatible verification. +- Keyring writes are atomic (temp file + `os.replace`) to prevent corruption on crash. +- New `qp_key_XXXX` fingerprint format with backward-compatible lookup of legacy 16-char hex fingerprints. +- `Seal.verify()` resolves the verification key from the keyring when available, falling back to the local key. No manual key management during verification. + +--- + ## [1.2.0] - 2026-03-08 Protocol-first restructure, TypeScript implementation, finalized URI scheme, full CapsuleType conformance, Security Considerations in spec, cryptographic chain verification, and 11-framework compliance directory. @@ -111,6 +138,7 @@ Initial public release of the Capsule Protocol Specification (CPS) v1.0 referenc --- +[1.3.0]: https://github.com/quantumpipes/capsule/releases/tag/v1.3.0 [1.2.0]: https://github.com/quantumpipes/capsule/releases/tag/v1.2.0 [1.1.0]: https://github.com/quantumpipes/capsule/releases/tag/v1.1.0 [1.0.0]: https://github.com/quantumpipes/capsule/releases/tag/v1.0.0 diff --git a/README.md b/README.md index d585d24..08a0959 100644 --- a/README.md +++ b/README.md @@ -168,7 +168,7 @@ See more examples in [`examples/`](./examples/). | Language | Status | Install | Source | |---|---|---|---| -| **Python** | v1.1.0 (stable) | `pip install qp-capsule` | [`reference/python/`](./reference/python/) | +| **Python** | v1.3.0 (stable) | `pip install qp-capsule` | [`reference/python/`](./reference/python/) | | **TypeScript** | v0.0.1 (conformant, 16/16 fixtures) | `npm install @quantumpipes/capsule` | [`reference/typescript/`](./reference/typescript/) | | Go | Separate repo (planned) | — | [quantumpipes/capsule-go](https://github.com/quantumpipes/capsule-go) | | Rust | Separate repo (planned) | — | [quantumpipes/capsule-rust](https://github.com/quantumpipes/capsule-rust) | @@ -197,6 +197,22 @@ seal.seal(capsule) assert seal.verify(capsule) ``` +### CLI + +The Python package includes a CLI for verification, inspection, and key management: + +```bash +capsule verify chain.json # Structural verification +capsule verify --full --db capsules.db # + SHA3-256 recomputation +capsule verify --signatures --json chain.json # + Ed25519, JSON output +capsule inspect --db capsules.db --seq 47 # Full 6-section display +capsule keys info # Epoch history +capsule keys rotate # Rotate to new key (no downtime) +capsule hash document.pdf # SHA3-256 of any file +``` + +Exit codes: `0` = pass, `1` = fail, `2` = error. Designed for CI/CD gates: `capsule verify --quiet && deploy`. + See the [Python reference documentation](./reference/python/) for the full guide. ### Quick Start (TypeScript) @@ -260,6 +276,7 @@ See the [compliance overview](./docs/compliance/) for FIPS algorithm details and | [Compliance Mapping](./docs/compliance/) | Regulators, GRC | | [Why Capsules](./docs/why-capsules.md) | Decision-Makers, Architects | | [Implementor's Guide](./docs/implementors-guide.md) | SDK Authors | +| [CLI Reference](./docs/architecture.md#cli) | DevOps, Auditors | --- diff --git a/docs/architecture.md b/docs/architecture.md index 7ec0209..1743436 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -1,7 +1,7 @@ --- title: "Capsule Architecture" description: "Complete technical architecture of Capsule: the 6-section record model, cryptographic sealing, hash chain integrity, and storage backends." -date_modified: "2026-03-07" +date_modified: "2026-03-09" ai_context: | Full architecture of the Capsule system. Covers the 6-section Capsule model (Trigger, Context, Reasoning, Authority, Execution, Outcome), two-tier @@ -207,18 +207,58 @@ If any step fails, `seal.verify()` returns `False`. ### Key Management - + + -| Key | Location | Permissions | Generated | +| File | Location | Permissions | Purpose | |---|---|---|---| -| Ed25519 private key | `~/.quantumpipes/key` | `0600` (owner only) | On first `seal()` call | -| ML-DSA-65 secret key | `~/.quantumpipes/key.ml` | `0600` (owner only) | On first PQ `seal()` call | -| ML-DSA-65 public key | `~/.quantumpipes/key.ml.pub` | `0644` (world-readable) | On first PQ `seal()` call | +| Ed25519 private key | `~/.quantumpipes/key` | `0600` | Signing (active epoch only) | +| Keyring | `~/.quantumpipes/keyring.json` | `0600` | Epoch history and public keys | +| ML-DSA-65 secret key | `~/.quantumpipes/key.ml` | `0600` | Post-quantum signing | +| ML-DSA-65 public key | `~/.quantumpipes/key.ml.pub` | `0644` | Post-quantum verification | Override the key directory with the `QUANTUMPIPES_DATA_DIR` environment variable or by passing `key_path` to the `Seal` constructor. Keys are generated using cryptographically secure random sources. File creation uses `umask(0o077)` to prevent race conditions between creation and permission setting. +### Key Rotation and Epochs + + + +Keys are managed through *epochs*. Each epoch tracks a single Ed25519 key pair with a lifecycle aligned to NIST SP 800-57: + +| Lifecycle Phase | Implementation | +|---|---| +| Generation | `capsule keys rotate` or auto-generate on first `seal()` | +| Active | Current epoch, used for new capsules | +| Retired | Old epoch, public key retained for verification only | +| Destroyed | Private key securely overwritten on rotation | + +The keyring (`keyring.json`) stores the epoch history: + +```json +{ + "version": 1, + "active_epoch": 1, + "epochs": [ + { "epoch": 0, "algorithm": "ed25519", "fingerprint": "qp_key_a7f3", "status": "retired" }, + { "epoch": 1, "algorithm": "ed25519", "fingerprint": "qp_key_8b1d", "status": "active" } + ] +} +``` + +**Rotation protocol:** + +1. Generate new Ed25519 key pair +2. Set current epoch's status to `retired` with `rotated_at` timestamp +3. Add new epoch with status `active` +4. Write new private key (securely replaces old) +5. Save keyring atomically (temp file + `os.replace`) + +**Backward-compatible verification:** `Seal.verify()` uses the capsule's `signed_by` fingerprint to look up the correct epoch's public key from the keyring. Capsules signed with old keys verify after rotation without manual key management. + +**Migration:** The first time the `Seal` or CLI encounters an existing key file without a keyring, it creates `keyring.json` with epoch 0 for the existing key. No manual migration required. + --- ## Hash Chain @@ -374,6 +414,52 @@ Capsules can form parent-child hierarchies: `WORKFLOW` (parent) -> `AGENT` (chil --- +## CLI + + + +The `capsule` CLI provides command-line verification, inspection, and key management. It is installed automatically with the Python package and has zero additional dependencies. + +```bash +pip install qp-capsule +``` + +### Verification + +```bash +capsule verify chain.json # Structural (sequence + previous_hash) +capsule verify --full chain.json # + recompute SHA3-256 from content +capsule verify --signatures --db capsules.db # + Ed25519 via keyring +capsule verify --json chain.json # Machine-readable for policy engines +capsule verify --quiet chain.json && deploy # CI/CD gate (exit code only) +``` + +Exit codes: 0 = pass, 1 = fail, 2 = error. + +### Inspection + +```bash +capsule inspect --db capsules.db --seq 47 # Full 6-section display for capsule #47 +capsule inspect --db capsules.db --id # Look up by ID +capsule inspect capsule.json # From exported JSON +``` + +### Key Management + +```bash +capsule keys info # Epoch history and active key +capsule keys rotate # Rotate to new epoch (no downtime) +capsule keys export-public # Export for third-party verification +``` + +### Utility + +```bash +capsule hash document.pdf # SHA3-256 of any file +``` + +--- + ## Related Documentation - [Why Capsules](./why-capsules.md) — The case for cryptographic AI memory diff --git a/docs/implementors-guide.md b/docs/implementors-guide.md index c2b86a1..95cc206 100644 --- a/docs/implementors-guide.md +++ b/docs/implementors-guide.md @@ -187,6 +187,58 @@ Validate against [`conformance/uri-fixtures.json`](../conformance/uri-fixtures.j --- +## Step 9: Key Management (Recommended) + +Implementations SHOULD support epoch-based key rotation per [CPS Section 8](../spec/README.md#8-key-management-recommendations). + +### Keyring + +Maintain a keyring that maps fingerprints to public keys across rotation epochs. The Python reference uses `~/.quantumpipes/keyring.json`: + +```json +{ + "version": 1, + "active_epoch": 1, + "epochs": [ + { "epoch": 0, "algorithm": "ed25519", "public_key_hex": "...", "fingerprint": "qp_key_a7f3", "status": "retired" }, + { "epoch": 1, "algorithm": "ed25519", "public_key_hex": "...", "fingerprint": "qp_key_8b1d", "status": "active" } + ] +} +``` + +### Epoch-Aware Verification + +When verifying a sealed Capsule: + +``` +1. Read capsule.signed_by fingerprint +2. Look up fingerprint in keyring → find epoch → get public_key_hex +3. Verify Ed25519 signature with the resolved key +4. If fingerprint not found, fall back to the local active key +``` + +This enables verification of Capsules signed across key rotations without manual key management. + +### Rotation Protocol + +``` +1. Generate new Ed25519 key pair +2. Set current epoch status to "retired" with rotated_at timestamp +3. Add new epoch with status "active" +4. Securely overwrite old private key with new +5. Save keyring atomically (temp file + rename) +``` + +### Migration + +On first use, if a key file exists but no keyring file, create the keyring with epoch 0 for the existing key. No user intervention required. + +### Fingerprints + +The Python reference uses `qp_key_XXXX` (first 4 hex characters of the public key). Implementations MAY use different formats but MUST ensure fingerprints are unique within a keyring. Legacy capsules may use a 16-character hex prefix as `signed_by`; the lookup function should match both formats. + +--- + ## Registering Your Implementation Once conformant, submit a PR to add your implementation to `reference//` and update the implementation matrix in [`reference/README.md`](../reference/README.md). See [CONTRIBUTING.md](../CONTRIBUTING.md) for details. @@ -197,6 +249,6 @@ Once conformant, submit a PR to add your implementation to `reference/ - [CPS v1.0 Specification](../spec/) — The normative protocol spec - [Conformance Suite](../conformance/) — Golden test vectors -- [Python Reference](../reference/python/) — Python implementation (conformant, 506 tests, 100% coverage) +- [Python Reference](../reference/python/) — Python implementation (conformant, 668 tests, 100% coverage) - [TypeScript Reference](../reference/typescript/) — TypeScript implementation (conformant, 101 tests, 100% coverage) - [URI Scheme](../spec/uri-scheme.md) — Content-addressable references diff --git a/docs/security.md b/docs/security.md index 3950e44..0aaf292 100644 --- a/docs/security.md +++ b/docs/security.md @@ -1,7 +1,7 @@ --- title: "Security Evaluation Guide" description: "Security evaluation guide for CISOs and security teams assessing Capsule for organizational adoption. Covers cryptographic architecture, key management, tamper evidence, attack surface, and deployment." -date_modified: "2026-03-07" +date_modified: "2026-03-09" classification: "Public" ai_context: | CISO-targeted security evaluation of the Capsule package. Covers two-tier @@ -15,7 +15,7 @@ ai_context: | **For CISOs and Security Teams Evaluating Capsule** -*Capsule v1.0.0 — March 2026* +*Capsule v1.3.0 — March 2026* *Classification: Public* --- @@ -93,12 +93,27 @@ Ed25519 is always required. ML-DSA-65 is additive, never a replacement. If one a ### Key Rotation -Key rotation is currently a manual process: + -1. Stop the application -2. Move existing keys to backup -3. Restart the application (new keys auto-generated) -4. Previous Capsules remain verifiable using `seal.verify_with_key(capsule, old_public_key_hex)` +Key rotation is automated through the epoch-based keyring system, aligned with NIST SP 800-57: + +```bash +capsule keys rotate # Generate new key, retire current, update keyring. No downtime. +``` + +Rotation protocol: + +1. Generate new Ed25519 key pair using cryptographically secure random +2. Set current epoch's status to `retired` with timestamp +3. Add new epoch as `active` +4. Write new private key to disk (securely replaces old) +5. Save keyring atomically (temp file + `os.replace`) + +**Backward compatibility:** After rotation, Capsules signed with previous keys continue to verify. The keyring retains all retired epochs' public keys. `Seal.verify()` uses the capsule's `signed_by` fingerprint to look up the correct epoch's public key automatically. + +**Migration:** Existing installations without a keyring file are migrated seamlessly. On first use, the Seal or CLI creates `keyring.json` with epoch 0 for the existing key. No manual intervention required. + +**Automated rotation:** Schedule `capsule keys rotate` via cron for periodic rotation (e.g., every 90 days). > **Note:** HSM integration is planned for a future release. File-based key storage is appropriate for development and single-tenant deployments. For multi-tenant production deployments, ensure key directories have appropriate OS-level access controls. @@ -203,7 +218,7 @@ Use this checklist when evaluating Capsule for your organization: | Multi-tenant isolation | Yes | PostgreSQL backend with `tenant_id` scoping | | Test coverage | Yes | 100% line coverage enforced in CI (`fail_under = 100`) | | Warning-free | Yes | `filterwarnings = ["error"]` with zero exemptions | -| Key rotation support | Partial | Manual rotation; `verify_with_key()` for old keys; HSM planned | +| Key rotation support | Yes | Automated via `capsule keys rotate`; epoch-based keyring with backward-compatible verification; HSM planned | | FIPS 140-2/3 validated module | No | Uses FIPS-approved algorithms via PyNaCl/libsodium; module itself is not FIPS-validated | --- @@ -217,4 +232,4 @@ Use this checklist when evaluating Capsule for your organization: --- -*Capsule v1.0.0 — Quantum Pipes Technologies, LLC* +*Capsule v1.3.0 — Quantum Pipes Technologies, LLC* diff --git a/reference/python/docs/api.md b/reference/python/docs/api.md index 9029454..ac50d9b 100644 --- a/reference/python/docs/api.md +++ b/reference/python/docs/api.md @@ -1,14 +1,15 @@ --- title: "API Reference" description: "Complete API reference for Capsule: every class, method, parameter, and type." -date_modified: "2026-03-07" +date_modified: "2026-03-09" ai_context: | - Complete Python API reference for the qp-capsule package v1.1.0. Covers Capsule model - (6 sections, 8 CapsuleTypes), Seal (seal, verify, verify_with_key, compute_hash), + Complete Python API reference for the qp-capsule package v1.3.0. Covers Capsule model + (6 sections, 8 CapsuleTypes), Seal (seal, verify, verify_with_key, compute_hash, + keyring integration), Keyring (epoch-based key rotation, NIST SP 800-57), CapsuleChain (add, verify, seal_and_store), CapsuleStorageProtocol (7 methods), CapsuleStorage (SQLite), PostgresCapsuleStorage (multi-tenant), exception hierarchy, - and the v1.1.0 high-level API: Capsules class, @audit() decorator, current() context - variable, and mount_capsules() FastAPI integration. All signatures verified against source. + CLI (verify, inspect, keys, hash), and the high-level API: Capsules class, @audit() + decorator, current() context variable, and mount_capsules() FastAPI integration. --- # API Reference @@ -34,6 +35,9 @@ from qp_capsule import ( # Cryptographic Seal Seal, compute_hash, + # Key Management (v1.3.0+) + Keyring, Epoch, + # Storage Protocol CapsuleStorageProtocol, @@ -45,7 +49,7 @@ from qp_capsule import ( PostgresCapsuleStorage, # PostgreSQL # Exceptions - CapsuleError, SealError, ChainError, StorageError, + CapsuleError, SealError, ChainError, StorageError, KeyringError, ) # FastAPI Integration (optional, requires fastapi) @@ -88,7 +92,7 @@ class Capsule: signature: str # Ed25519 signature (hex) signature_pq: str # ML-DSA-65 signature (hex, optional) signed_at: datetime | None # When sealed (UTC) - signed_by: str # Key fingerprint (first 16 hex chars) + signed_by: str # Key fingerprint (qp_key_XXXX or legacy 16 hex chars) ``` ### Methods @@ -248,7 +252,7 @@ class CapsuleType(StrEnum): Cryptographic sealing with two-tier architecture. - + ```python class Seal: @@ -256,6 +260,8 @@ class Seal: self, key_path: Path | None = None, enable_pq: bool | None = None, + *, + keyring: Keyring | None = None, ) ``` @@ -263,6 +269,7 @@ class Seal: |---|---|---|---| | `key_path` | `Path \| None` | `~/.quantumpipes/key` | Ed25519 private key file path | | `enable_pq` | `bool \| None` | `None` (auto-detect) | `None` = use ML-DSA-65 if available; `True` = require; `False` = disable | +| `keyring` | `Keyring \| None` | `None` | Keyring for epoch-aware verification. When provided, `verify()` uses the capsule's `signed_by` fingerprint to resolve the correct epoch's public key. | ### Properties @@ -276,7 +283,7 @@ class Seal: Seal a Capsule. Fills `hash`, `signature`, `signature_pq` (if PQ enabled), `signed_at`, `signed_by`. Raises `SealError` on failure. **`verify(capsule: Capsule, verify_pq: bool = False) -> bool`** -Verify a sealed Capsule. Returns `True` if hash and Ed25519 signature are valid. Set `verify_pq=True` to also verify the ML-DSA-65 signature. +Verify a sealed Capsule. Returns `True` if hash and Ed25519 signature are valid. When a `keyring` was provided at construction, the capsule's `signed_by` fingerprint is used to look up the correct epoch's public key, enabling verification across key rotations. Set `verify_pq=True` to also verify the ML-DSA-65 signature. **`verify_with_key(capsule: Capsule, public_key_hex: str) -> bool`** Verify a Capsule using a specific Ed25519 public key (hex-encoded). Useful for verifying Capsules sealed by other instances. @@ -285,7 +292,7 @@ Verify a Capsule using a specific Ed25519 public key (hex-encoded). Useful for v Returns the Ed25519 public key as a 64-character hex string. **`get_key_fingerprint() -> str`** -Returns the first 16 characters of the hex-encoded public key. +Returns the keyring's `qp_key_XXXX` format when a keyring is available with an active epoch, otherwise falls back to the first 16 characters of the hex-encoded public key (legacy format). ### compute_hash @@ -299,6 +306,85 @@ Standalone SHA3-256 hash of a dictionary. Uses canonical JSON (sorted keys, comp --- +## Keyring + +> **Added in v1.3.0.** + +Epoch-based key lifecycle manager aligned with NIST SP 800-57. + + + +```python +class Keyring: + def __init__( + self, + keyring_path: Path | None = None, + key_path: Path | None = None, + ) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `keyring_path` | `Path \| None` | `~/.quantumpipes/keyring.json` | Path to keyring file | +| `key_path` | `Path \| None` | `~/.quantumpipes/key` | Path to Ed25519 private key | + +On first access, loads from disk. If a key file exists but no keyring file, migrates automatically by creating epoch 0 for the existing key. + +### Properties + +| Property | Type | Description | +|---|---|---| +| `path` | `Path` | Path to the keyring file | +| `key_path` | `Path` | Path to the Ed25519 private key file | +| `active_epoch` | `int` | Current active epoch number | +| `epochs` | `list[Epoch]` | All epochs (returns a copy) | + +### Methods + + + +**`load() -> None`** +Load keyring from disk, migrating from existing key files if needed. + +**`get_active() -> Epoch | None`** +Get the active epoch, or `None` if no epochs exist. + +**`lookup(fingerprint: str) -> Epoch | None`** +Look up an epoch by fingerprint. Matches on the `qp_key_XXXX` format and on the legacy 16-char hex prefix. + +**`lookup_public_key(fingerprint: str) -> str | None`** +Look up a public key hex string by fingerprint. + +**`rotate() -> Epoch`** +Rotate to a new key pair. Retires the current epoch, generates a new Ed25519 key, writes keyring atomically. + +**`register_key(signing_key: SigningKey) -> Epoch`** +Register an existing key in the keyring. Idempotent. Called by `Seal` when generating a key for a keyring that does not yet track it. + +**`export_public_key() -> str | None`** +Export the active epoch's public key as a hex string. + +**`to_dict() -> dict[str, Any]`** +Serialize keyring to dict. + +### Epoch + + + +```python +@dataclass +class Epoch: + epoch: int # Epoch number (0-indexed) + algorithm: str # "ed25519" + public_key_hex: str # 64-char hex public key + fingerprint: str # "qp_key_XXXX" + created_at: str # ISO 8601 timestamp + rotated_at: str | None # ISO 8601 timestamp (None if active) + status: str # "active" or "retired" +``` + +--- + ## CapsuleChain Hash chain management. Accepts any `CapsuleStorageProtocol` backend. @@ -423,13 +509,14 @@ The `list()` and `count()` methods accept extra parameters beyond the Protocol: ## Exceptions - + ``` CapsuleError Base exception for all Capsule operations ├── SealError Sealing or verification failed ├── ChainError Hash chain integrity error -└── StorageError Storage operation failed (e.g., storing unsealed Capsule) +├── StorageError Storage operation failed (e.g., storing unsealed Capsule) +└── KeyringError Keyring operation failed (load, save, rotate, lookup) ``` All inherit from `CapsuleError`. Catch `CapsuleError` for unified error handling. @@ -609,6 +696,36 @@ FastAPI is not a hard dependency. Raises `CapsuleError` if not installed. --- +## CLI + +> **Added in v1.3.0.** + +The `capsule` command is installed automatically with the package. + + + +```bash +capsule verify chain.json # Structural (sequence + hash linkage) +capsule verify --full --db capsules.db # + SHA3-256 recomputation +capsule verify --signatures chain.json # + Ed25519 via keyring +capsule verify --json chain.json # Machine-readable JSON +capsule verify --quiet chain.json # Exit code only (CI/CD gates) + +capsule inspect --db capsules.db --seq 47 # Full 6-section display +capsule inspect --db capsules.db --id # Lookup by capsule ID +capsule inspect capsule.json # From exported JSON + +capsule keys info # Epoch history and active key +capsule keys rotate # Rotate to new epoch (no downtime) +capsule keys export-public # Export active public key (hex) + +capsule hash document.pdf # SHA3-256 of any file +``` + +Exit codes: `0` = pass, `1` = fail, `2` = error. + +--- + ## Related Documentation - [High-Level API Guide](./high-level-api.md) — Full walkthrough with examples diff --git a/reference/python/docs/getting-started.md b/reference/python/docs/getting-started.md index 03f3f36..74aba80 100644 --- a/reference/python/docs/getting-started.md +++ b/reference/python/docs/getting-started.md @@ -1,7 +1,7 @@ --- title: "Getting Started with Capsule" description: "Create, seal, verify, and chain your first Capsule in under 60 seconds." -date_modified: "2026-03-07" +date_modified: "2026-03-09" ai_context: | Developer quickstart for the qp-capsule Python package. Covers install, creating a Capsule with 6 sections, sealing with Ed25519, verification, @@ -243,6 +243,23 @@ See [High-Level API](./high-level-api.md) for the full guide. --- +## CLI (v1.3.0+) + +The package installs a `capsule` command for verification, inspection, and key management: + +```bash +capsule verify chain.json # Structural verification +capsule verify --full --db ~/.quantumpipes/capsules.db # + SHA3-256 +capsule inspect --db capsules.db --seq 42 # Show capsule #42 +capsule keys info # Key epoch history +capsule keys rotate # Rotate to new key +capsule hash document.pdf # SHA3-256 of any file +``` + +Use `capsule verify --quiet && deploy` as a CI/CD gate. Exit code `0` means the chain is intact. + +--- + ## What's Next - [High-Level API](./high-level-api.md) — `Capsules`, `@audit`, `current()`, FastAPI integration diff --git a/reference/python/pyproject.toml b/reference/python/pyproject.toml index 184830a..9faa45c 100644 --- a/reference/python/pyproject.toml +++ b/reference/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "qp-capsule" -version = "1.2.0" +version = "1.3.0" description = "Capsule Protocol Specification (CPS) — tamper-evident audit records for AI operations. Create, seal, verify, and chain Capsules in Python." readme = "README.md" license = "Apache-2.0" @@ -66,6 +66,9 @@ dev = [ "httpx>=0.28.1", ] +[project.scripts] +capsule = "qp_capsule.cli:_entry" + [project.urls] Homepage = "https://github.com/quantumpipes/capsule" Documentation = "https://github.com/quantumpipes/capsule/tree/main/reference/python/docs" diff --git a/reference/python/src/qp_capsule/__init__.py b/reference/python/src/qp_capsule/__init__.py index ed87b57..cf97a91 100644 --- a/reference/python/src/qp_capsule/__init__.py +++ b/reference/python/src/qp_capsule/__init__.py @@ -36,7 +36,7 @@ Spec: https://github.com/quantumpipes/capsule """ -__version__ = "1.2.0" +__version__ = "1.3.0" __author__ = "Quantum Pipes Technologies, LLC" __license__ = "Apache-2.0" @@ -57,9 +57,11 @@ from qp_capsule.exceptions import ( CapsuleError, ChainError, + KeyringError, SealError, StorageError, ) +from qp_capsule.keyring import Epoch, Keyring from qp_capsule.protocol import CapsuleStorageProtocol from qp_capsule.seal import Seal, compute_hash @@ -91,6 +93,9 @@ # Seal "Seal", "compute_hash", + # Keyring + "Keyring", + "Epoch", # Protocol "CapsuleStorageProtocol", # Chain @@ -105,6 +110,7 @@ "SealError", "ChainError", "StorageError", + "KeyringError", # High-Level API "Capsules", ] diff --git a/reference/python/src/qp_capsule/cli.py b/reference/python/src/qp_capsule/cli.py new file mode 100644 index 0000000..d809b85 --- /dev/null +++ b/reference/python/src/qp_capsule/cli.py @@ -0,0 +1,646 @@ +# Copyright 2026 Quantum Pipes Technologies, LLC +# SPDX-License-Identifier: Apache-2.0 +# +# Patent Pending — See PATENTS.md for details. +# Licensed under the Apache License, Version 2.0 with patent grant (Section 3). + +""" +CLI: Command-line interface for Capsule verification, inspection, and key management. + +Usage:: + + capsule verify chain.json # Structural check + capsule verify --full chain.json # + content hashes + capsule verify --signatures --db capsules.db # + Ed25519 sigs + capsule inspect --db capsules.db --seq 2 # Show capsule #2 + capsule keys info # Key metadata + capsule keys rotate # Rotate to new epoch + capsule keys export-public # Export public key + capsule hash document.pdf # SHA3-256 + +Exit codes: 0 = pass, 1 = fail, 2 = error +""" + +from __future__ import annotations + +import argparse +import asyncio +import hashlib +import json +import os +import sys +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, TextIO + +from qp_capsule.capsule import Capsule +from qp_capsule.keyring import Keyring +from qp_capsule.seal import Seal, compute_hash + +_NO_COLOR = False + + +# --------------------------------------------------------------------------- +# ANSI helpers (stdlib only -- no rich, no click) +# --------------------------------------------------------------------------- + + +def _supports_color(stream: TextIO) -> bool: + """Check if the stream supports ANSI escape codes.""" + if os.environ.get("NO_COLOR"): + return False + if os.environ.get("FORCE_COLOR"): + return True + return hasattr(stream, "isatty") and stream.isatty() + + +def _c(code: str, text: str) -> str: + if _NO_COLOR: + return text + return f"\033[{code}m{text}\033[0m" + + +def _green(t: str) -> str: + return _c("32", t) + + +def _red(t: str) -> str: + return _c("31", t) + + +def _bold(t: str) -> str: + return _c("1", t) + + +def _dim(t: str) -> str: + return _c("2", t) + + +def _yellow(t: str) -> str: + return _c("33", t) + + +# --------------------------------------------------------------------------- +# Verification result types +# --------------------------------------------------------------------------- + + +@dataclass +class VerifyError: + """A single verification error.""" + + sequence: int + capsule_id: str + error: str + + def to_dict(self) -> dict[str, Any]: + return { + "sequence": self.sequence, + "capsule_id": self.capsule_id, + "error": self.error, + } + + +@dataclass +class VerifyResult: + """Complete chain verification result.""" + + valid: bool + level: str + capsules_verified: int + total_capsules: int + errors: list[VerifyError] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "valid": self.valid, + "level": self.level, + "capsules_verified": self.capsules_verified, + "total_capsules": self.total_capsules, + "errors": [e.to_dict() for e in self.errors], + } + + +# --------------------------------------------------------------------------- +# Capsule loading helpers +# --------------------------------------------------------------------------- + + +def _capsule_from_full_dict(data: dict[str, Any]) -> Capsule: + """Reconstruct a Capsule from a dict that includes seal metadata.""" + capsule = Capsule.from_dict(data) + capsule.hash = data.get("hash", "") + capsule.signature = data.get("signature", "") + capsule.signature_pq = data.get("signature_pq", "") + signed_at = data.get("signed_at") + if signed_at: + capsule.signed_at = datetime.fromisoformat(signed_at) + capsule.signed_by = data.get("signed_by", "") + return capsule + + +def _capsule_to_full_dict(capsule: Capsule) -> dict[str, Any]: + """Serialize a Capsule to a dict including seal metadata.""" + d = capsule.to_dict() + d["hash"] = capsule.hash + d["signature"] = capsule.signature + d["signature_pq"] = capsule.signature_pq + d["signed_at"] = capsule.signed_at.isoformat() if capsule.signed_at else None + d["signed_by"] = capsule.signed_by + return d + + +def _load_capsules_from_json(path: Path) -> list[Capsule]: + """Load capsules from a JSON file (array of dicts or single dict).""" + data = json.loads(path.read_text("utf-8")) + if isinstance(data, dict): + data = [data] + if not isinstance(data, list): + raise ValueError(f"Expected JSON array or object, got {type(data).__name__}") + return [_capsule_from_full_dict(d) for d in data] + + +async def _load_capsules_from_db(db_path: str) -> list[Capsule]: + """Load all capsules from SQLite, ordered by sequence.""" + from qp_capsule.storage import CapsuleStorage + + storage = CapsuleStorage(db_path=Path(db_path)) + try: + return list(await storage.get_all_ordered()) + finally: + await storage.close() + + +async def _load_capsules_from_pg(pg_url: str) -> list[Capsule]: # pragma: no cover + """Load all capsules from PostgreSQL, ordered by sequence.""" + from qp_capsule.storage_pg import PostgresCapsuleStorage + + storage = PostgresCapsuleStorage(pg_url) + try: + return list(await storage.get_all_ordered()) + finally: + await storage.close() + + +# --------------------------------------------------------------------------- +# Core verification logic (pure, no I/O) +# --------------------------------------------------------------------------- + + +def verify_chain( + capsules: list[Capsule], + *, + level: str = "structural", + seal: Seal | None = None, +) -> VerifyResult: + """ + Verify a chain of capsules. + + Levels: + structural: sequence + previous_hash linkage + full: structural + recompute SHA3-256 + signatures: full + Ed25519 verification + """ + total = len(capsules) + errors: list[VerifyError] = [] + + if not capsules: + return VerifyResult(valid=True, level=level, capsules_verified=0, total_capsules=0) + + do_content = level in ("full", "signatures") + do_sigs = level == "signatures" + + for i, capsule in enumerate(capsules): + if capsule.sequence != i: + msg = f"Sequence gap: expected {i}, got {capsule.sequence}" + errors.append(VerifyError(i, str(capsule.id), msg)) + break + + if i == 0: + if capsule.previous_hash is not None: + msg = "Genesis capsule has previous_hash (should be null)" + errors.append(VerifyError(0, str(capsule.id), msg)) + break + else: + if capsule.previous_hash != capsules[i - 1].hash: + msg = f"Chain broken: previous_hash mismatch at sequence {i}" + errors.append(VerifyError(i, str(capsule.id), msg)) + break + + if do_content: + computed = compute_hash(capsule.to_dict()) + if computed != capsule.hash: + errors.append( + VerifyError(i, str(capsule.id), f"Content hash mismatch at sequence {i}") + ) + break + + if do_sigs and seal is not None: + if not seal.verify(capsule): + msg = f"Signature verification failed at sequence {i}" + errors.append(VerifyError(i, str(capsule.id), msg)) + break + + return VerifyResult( + valid=len(errors) == 0, + level=level, + capsules_verified=total if not errors else errors[0].sequence, + total_capsules=total, + errors=errors, + ) + + +# --------------------------------------------------------------------------- +# Output formatting +# --------------------------------------------------------------------------- + + +_LEVEL_DESCRIPTIONS: dict[str, str] = { + "structural": "sequence + hash linkage", + "full": "structural + SHA3-256 recomputation", + "signatures": "full + Ed25519 verification", +} + + +def _print_verify_result(result: VerifyResult, capsules: list[Capsule]) -> None: + """Print colored verification result to terminal.""" + print() + print(_bold("Capsule Chain Verification")) + print("=" * 50) + desc = _LEVEL_DESCRIPTIONS.get(result.level, "") + print(f" Level: {result.level}" + (f" ({desc})" if desc else "")) + print() + + for i, capsule in enumerate(capsules): + h = capsule.hash[:12] + "..." if capsule.hash else "no hash" + if i < result.capsules_verified: + label = "genesis" if i == 0 else f"seq {i}" + print(f" {_green('✓')} Capsule #{i} ({label}) {_dim(h)}") + elif result.errors and result.errors[0].sequence == i: + print(f" {_red('✗')} Capsule #{i}: {result.errors[0].error}") + else: + print(f" {_dim('·')} Capsule #{i} {_dim('(skipped)')}") + + print() + if result.valid: + print(f" {_green('PASS')} -- {result.capsules_verified} capsules verified") + else: + print( + f" {_red('FAIL')} -- {result.capsules_verified} of " + f"{result.total_capsules} capsules verified" + ) + print() + + +def _print_inspect(capsule: Capsule) -> None: + """Print full 6-section capsule inspection.""" + print() + print(_bold("Capsule Inspection")) + print("=" * 50) + + print(f" ID: {capsule.id}") + print(f" Type: {capsule.type.value}") + print(f" Domain: {capsule.domain}") + print(f" Sequence: {capsule.sequence}") + pid = str(capsule.parent_id) if capsule.parent_id else "none" + print(f" Parent: {pid}") + + _section_bar("Seal") + print(f" Hash: {capsule.hash or 'not sealed'}") + print(f" Signed: {capsule.signed_at.isoformat() if capsule.signed_at else 'n/a'}") + print(f" Key: {capsule.signed_by or 'n/a'}") + + t = capsule.trigger + _section_bar("1. Trigger") + print(f" Type: {t.type}") + print(f" Source: {t.source}") + print(f" Request: {t.request}") + print(f" Time: {t.timestamp.isoformat()}") + + c = capsule.context + _section_bar("2. Context") + print(f" Agent: {c.agent_id or 'n/a'}") + print(f" Session: {c.session_id or 'n/a'}") + + r = capsule.reasoning + _section_bar("3. Reasoning") + print(f" Analysis: {r.analysis or 'n/a'}") + n_opts = len(r.options) + n_sel = sum(1 for o in r.options if o.selected) + print(f" Options: {n_opts} considered, {n_sel} selected") + print(f" Selected: {r.selected_option or 'n/a'}") + print(f" Confidence: {r.confidence}") + print(f" Model: {r.model or 'n/a'}") + + a = capsule.authority + _section_bar("4. Authority") + print(f" Type: {a.type}") + print(f" Approver: {a.approver or 'n/a'}") + + e = capsule.execution + _section_bar("5. Execution") + print(f" Tool Calls: {len(e.tool_calls)}") + print(f" Duration: {e.duration_ms}ms") + + o = capsule.outcome + _section_bar("6. Outcome") + print(f" Status: {o.status}") + print(f" Summary: {o.summary or 'n/a'}") + if o.error: + print(f" Error: {o.error}") + print() + + +def _section_bar(title: str) -> None: + bar_width = 50 + label = f" {title} " + dashes = bar_width - len(label) - 2 + print(f" {_dim('──' + label + '─' * max(dashes, 0))}") + + +# --------------------------------------------------------------------------- +# Command implementations +# --------------------------------------------------------------------------- + + +def cmd_verify(args: argparse.Namespace) -> int: + """Execute the ``verify`` command.""" + has_db = bool(getattr(args, "db", None)) + has_pg = bool(getattr(args, "pg", None)) + sources = sum([bool(args.source), has_db, has_pg]) + if sources == 0: + print("Error: specify a source (JSON file, --db, or --pg)", file=sys.stderr) + return 2 + if sources > 1: + print("Error: specify only one source", file=sys.stderr) + return 2 + + try: + if args.source: + capsules = _load_capsules_from_json(Path(args.source)) + elif args.db: + capsules = asyncio.run(_load_capsules_from_db(args.db)) + else: # pragma: no cover + capsules = asyncio.run(_load_capsules_from_pg(args.pg)) + except Exception as e: + print(f"Error loading capsules: {e}", file=sys.stderr) + return 2 + + seal = None + if args.level == "signatures": + keyring = Keyring() + seal = Seal(keyring=keyring) + + result = verify_chain(capsules, level=args.level, seal=seal) + + if getattr(args, "json_output", False): + print(json.dumps(result.to_dict(), indent=2)) + elif not getattr(args, "quiet", False): + _print_verify_result(result, capsules) + + return 0 if result.valid else 1 + + +def cmd_inspect(args: argparse.Namespace) -> int: + """Execute the ``inspect`` command.""" + try: + if args.source: + capsules = _load_capsules_from_json(Path(args.source)) + elif getattr(args, "db", None): + capsules = asyncio.run(_load_capsules_from_db(args.db)) + elif getattr(args, "pg", None): # pragma: no cover + capsules = asyncio.run(_load_capsules_from_pg(args.pg)) + else: + print( + "Error: specify a source (JSON file, --db, or --pg)", + file=sys.stderr, + ) + return 2 + except Exception as e: + print(f"Error loading capsules: {e}", file=sys.stderr) + return 2 + + target: Capsule | None = None + seq = getattr(args, "seq", None) + cid = getattr(args, "id", None) + + if seq is not None: + for c in capsules: + if c.sequence == seq: + target = c + break + if target is None: + print(f"Error: no capsule with sequence {seq}", file=sys.stderr) + return 2 + elif cid is not None: + for c in capsules: + if str(c.id) == cid: + target = c + break + if target is None: + print(f"Error: no capsule with id {cid}", file=sys.stderr) + return 2 + elif len(capsules) == 1: + target = capsules[0] + else: + print("Error: multiple capsules found, use --seq or --id", file=sys.stderr) + return 2 + + _print_inspect(target) + return 0 + + +def cmd_keys(args: argparse.Namespace) -> int: + """Execute the ``keys`` subcommand.""" + sub = getattr(args, "keys_command", None) + if sub is None: + print("Error: specify a keys subcommand (info, rotate, export-public)", file=sys.stderr) + return 2 + + keyring = Keyring() + + if sub == "info": + return _keys_info(keyring) + if sub == "rotate": + return _keys_rotate(keyring) + if sub == "export-public": + return _keys_export(keyring) + return 2 # pragma: no cover + + +def _keys_info(keyring: Keyring) -> int: + """Display keyring metadata and epoch history.""" + print() + print(_bold("Key Management Info")) + print("=" * 50) + print(f" Keyring: {keyring.path}") + + epochs = keyring.epochs + if not epochs: + print(f" Status: {_yellow('No keys')}") + print() + print(" Run 'capsule keys rotate' to generate a key pair.") + print() + return 0 + + active = keyring.get_active() + print(f" Version: {Keyring.KEYRING_VERSION}") + print(f" Active: Epoch {keyring.active_epoch}") + print() + + for ep in epochs: + status_str = _green("active") if ep.status == "active" else _dim("retired") + start = ep.created_at[:19] + end = ep.rotated_at[:19] if ep.rotated_at else "present" + print( + f" Epoch {ep.epoch} {ep.algorithm} " + f"{ep.fingerprint} {status_str} ({start} to {end})" + ) + + if active: + print() + print(f" Public key: {_dim(active.public_key_hex)}") + print() + return 0 + + +def _keys_rotate(keyring: Keyring) -> int: + """Rotate to a new key epoch.""" + old_active = keyring.get_active() + new_epoch = keyring.rotate() + + print() + print(_bold("Key Rotation")) + print("=" * 50) + if old_active: + print( + f" Previous: Epoch {old_active.epoch} " + f"({old_active.fingerprint}) {_dim('→ retired')}" + ) + print( + f" New: Epoch {new_epoch.epoch} " + f"({new_epoch.fingerprint}) {_green('→ active')}" + ) + print() + print(f" Private key: {keyring.key_path}") + print(f" Keyring: {keyring.path}") + print() + return 0 + + +def _keys_export(keyring: Keyring) -> int: + """Export the active public key (hex, pipeable).""" + pub = keyring.export_public_key() + if pub is None: + print("Error: no active key. Run 'capsule keys rotate' first.", file=sys.stderr) + return 2 + print(pub) + return 0 + + +def cmd_hash(args: argparse.Namespace) -> int: + """Execute the ``hash`` command.""" + path = Path(args.file) + if not path.exists(): + print(f"Error: file not found: {path}", file=sys.stderr) + return 2 + + data = path.read_bytes() + print(hashlib.sha3_256(data).hexdigest()) + return 0 + + +# --------------------------------------------------------------------------- +# Parser and entry point +# --------------------------------------------------------------------------- + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="capsule", + description="Capsule Protocol -- verify, inspect, and manage cryptographic audit chains.", + ) + parser.add_argument( + "--version", action="version", + version=f"capsule {__import__('qp_capsule').__version__}", + ) + sub = parser.add_subparsers(dest="command") + + # -- verify -- + vp = sub.add_parser("verify", help="Verify chain integrity") + vp.add_argument("source", nargs="?", default=None, help="JSON file path") + vp.add_argument("--db", metavar="PATH", default=None, help="SQLite database path") + vp.add_argument("--pg", metavar="URL", default=None, help="PostgreSQL connection URL") + lvl = vp.add_mutually_exclusive_group() + lvl.add_argument( + "--structural", action="store_const", const="structural", dest="level", + help="Sequence + hash linkage (default)", + ) + lvl.add_argument( + "--full", action="store_const", const="full", dest="level", + help="Structural + recompute SHA3-256 from content", + ) + lvl.add_argument( + "--signatures", action="store_const", const="signatures", dest="level", + help="Full + verify Ed25519 signatures via keyring", + ) + vp.set_defaults(level="structural") + vp.add_argument( + "--json", action="store_true", dest="json_output", + help="Machine-readable JSON output", + ) + vp.add_argument( + "--quiet", action="store_true", + help="Exit code only (0=pass, 1=fail, 2=error)", + ) + + # -- inspect -- + ip = sub.add_parser("inspect", help="Inspect a single capsule") + ip.add_argument("source", nargs="?", default=None, help="JSON file path") + ip.add_argument("--db", metavar="PATH", default=None, help="SQLite database path") + ip.add_argument("--pg", metavar="URL", default=None, help="PostgreSQL connection URL") + ip.add_argument("--seq", type=int, default=None, help="Select by sequence number") + ip.add_argument("--id", default=None, help="Select by capsule UUID") + + # -- keys -- + kp = sub.add_parser("keys", help="Key management") + ks = kp.add_subparsers(dest="keys_command") + ks.add_parser("info", help="Show key metadata and epoch history") + ks.add_parser("rotate", help="Rotate to a new key epoch") + ks.add_parser("export-public", help="Export the active public key (hex)") + + # -- hash -- + hp = sub.add_parser("hash", help="Compute SHA3-256 of a file") + hp.add_argument("file", help="File to hash") + + return parser + + +def main(argv: list[str] | None = None) -> int: + """CLI entry point. Returns exit code (0=pass, 1=fail, 2=error).""" + global _NO_COLOR # noqa: PLW0603 + _NO_COLOR = not _supports_color(sys.stdout) + + parser = _build_parser() + args = parser.parse_args(argv) + + dispatch: dict[str | None, Any] = { + "verify": cmd_verify, + "inspect": cmd_inspect, + "keys": cmd_keys, + "hash": cmd_hash, + } + + handler = dispatch.get(args.command) + if handler is None: + parser.print_help() + return 2 + + result: int = handler(args) + return result + + +def _entry() -> None: # pragma: no cover + """Console script entry point (installed as ``capsule``).""" + sys.exit(main()) diff --git a/reference/python/src/qp_capsule/exceptions.py b/reference/python/src/qp_capsule/exceptions.py index fdb8267..7636f9b 100644 --- a/reference/python/src/qp_capsule/exceptions.py +++ b/reference/python/src/qp_capsule/exceptions.py @@ -23,3 +23,7 @@ class ChainError(CapsuleError): class StorageError(CapsuleError): """Capsule storage operation failed.""" + + +class KeyringError(CapsuleError): + """Keyring operation failed (load, save, rotate, lookup).""" diff --git a/reference/python/src/qp_capsule/keyring.py b/reference/python/src/qp_capsule/keyring.py new file mode 100644 index 0000000..129cdf5 --- /dev/null +++ b/reference/python/src/qp_capsule/keyring.py @@ -0,0 +1,341 @@ +# Copyright 2026 Quantum Pipes Technologies, LLC +# SPDX-License-Identifier: Apache-2.0 +# +# Patent Pending — See PATENTS.md for details. +# Licensed under the Apache License, Version 2.0 with patent grant (Section 3). + +""" +Keyring: Epoch-based key lifecycle management. + +Manages Ed25519 key pairs across rotation epochs, enabling: + - Automated key rotation (NIST SP 800-57 lifecycle) + - Backward-compatible verification (old capsules verify with old keys) + - Seamless migration from existing single-key installations + +NIST SP 800-57 Alignment: + Generation: ``capsule keys rotate`` or auto-generate on first seal() + Active: Current epoch, used for new capsules + Retired: Old epoch, public key retained for verification only + Destroyed: Private key securely deleted on rotation + +Keyring file: ``~/.quantumpipes/keyring.json`` +""" + +from __future__ import annotations + +import contextlib +import json +import os +import tempfile +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from nacl.signing import SigningKey + +from qp_capsule.exceptions import KeyringError +from qp_capsule.paths import default_key_path, default_keyring_path + + +def _make_fingerprint(public_key_hex: str) -> str: + """Create a short fingerprint from a public key hex string.""" + return f"qp_key_{public_key_hex[:4]}" + + +@dataclass +class Epoch: + """A single key epoch in the keyring.""" + + epoch: int + algorithm: str + public_key_hex: str + fingerprint: str + created_at: str + rotated_at: str | None + status: str + + def to_dict(self) -> dict[str, Any]: + return { + "epoch": self.epoch, + "algorithm": self.algorithm, + "public_key_hex": self.public_key_hex, + "fingerprint": self.fingerprint, + "created_at": self.created_at, + "rotated_at": self.rotated_at, + "status": self.status, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> Epoch: + return cls( + epoch=data["epoch"], + algorithm=data["algorithm"], + public_key_hex=data["public_key_hex"], + fingerprint=data["fingerprint"], + created_at=data["created_at"], + rotated_at=data.get("rotated_at"), + status=data["status"], + ) + + +class Keyring: + """ + Epoch-based key lifecycle manager. + + Manages a keyring of Ed25519 key pairs with: + - Key rotation with epoch tracking + - Backward-compatible verification via fingerprint lookup + - Automatic migration from pre-keyring key files + - Atomic writes for crash safety + """ + + KEYRING_VERSION = 1 + + def __init__( + self, + keyring_path: Path | None = None, + key_path: Path | None = None, + ): + self._keyring_path = keyring_path or default_keyring_path() + self._key_path = key_path or default_key_path() + self._version: int = self.KEYRING_VERSION + self._active_epoch: int = 0 + self._epochs: list[Epoch] = [] + self._loaded = False + + @property + def path(self) -> Path: + """Path to the keyring file.""" + return self._keyring_path + + @property + def key_path(self) -> Path: + """Path to the Ed25519 private key file.""" + return self._key_path + + @property + def active_epoch(self) -> int: + """The current active epoch number.""" + self._ensure_loaded() + return self._active_epoch + + @property + def epochs(self) -> list[Epoch]: + """All epochs (returns a copy).""" + self._ensure_loaded() + return list(self._epochs) + + def _ensure_loaded(self) -> None: + if not self._loaded: + self.load() + + def load(self) -> None: + """ + Load keyring from disk, migrating from existing keys if needed. + + Priority: + 1. Load existing keyring.json + 2. Migrate from existing key file (create epoch 0) + 3. Create empty keyring (keys generated on first seal) + """ + if self._keyring_path.exists(): + self._load_from_file() + elif self._key_path.exists(): + self._migrate_existing_key() + else: + self._version = self.KEYRING_VERSION + self._active_epoch = 0 + self._epochs = [] + self._loaded = True + + def _load_from_file(self) -> None: + try: + data = json.loads(self._keyring_path.read_text("utf-8")) + except (json.JSONDecodeError, OSError) as e: + raise KeyringError(f"Failed to read keyring: {e}") from e + + version = data.get("version") + if version != self.KEYRING_VERSION: + raise KeyringError( + f"Unsupported keyring version: {version} " + f"(expected {self.KEYRING_VERSION})" + ) + + self._version = data["version"] + self._active_epoch = data["active_epoch"] + self._epochs = [Epoch.from_dict(e) for e in data.get("epochs", [])] + + def _migrate_existing_key(self) -> None: + """Create epoch 0 from an existing key file (seamless upgrade).""" + try: + key_bytes = self._key_path.read_bytes() + signing_key = SigningKey(key_bytes) + public_hex = signing_key.verify_key.encode().hex() + now = datetime.now(UTC).isoformat() + + self._version = self.KEYRING_VERSION + self._active_epoch = 0 + self._epochs = [ + Epoch( + epoch=0, + algorithm="ed25519", + public_key_hex=public_hex, + fingerprint=_make_fingerprint(public_hex), + created_at=now, + rotated_at=None, + status="active", + ) + ] + self._save() + except KeyringError: + raise # pragma: no cover + except Exception as e: + raise KeyringError(f"Failed to migrate existing key: {e}") from e + + def _save(self) -> None: + """Atomically write keyring to disk (write temp, then rename).""" + data = { + "version": self._version, + "active_epoch": self._active_epoch, + "epochs": [e.to_dict() for e in self._epochs], + } + + self._keyring_path.parent.mkdir(parents=True, exist_ok=True) + + fd, tmp_path = tempfile.mkstemp( + dir=str(self._keyring_path.parent), + suffix=".tmp", + ) + try: + with os.fdopen(fd, "w") as f: + json.dump(data, f, indent=2) + f.write("\n") + os.replace(tmp_path, str(self._keyring_path)) + except Exception: # pragma: no cover + with contextlib.suppress(OSError): + os.unlink(tmp_path) + raise + + def get_active(self) -> Epoch | None: + """Get the active epoch, or None if no epochs exist.""" + self._ensure_loaded() + for epoch in self._epochs: + if epoch.status == "active": + return epoch + return None + + def lookup(self, fingerprint: str) -> Epoch | None: + """ + Look up an epoch by fingerprint. + + Matches on the ``qp_key_XXXX`` format and on the legacy + 16-char hex prefix used by pre-keyring capsules. + """ + self._ensure_loaded() + for epoch in self._epochs: + if epoch.fingerprint == fingerprint: + return epoch + for epoch in self._epochs: + if epoch.public_key_hex[:16] == fingerprint: + return epoch + return None + + def lookup_public_key(self, fingerprint: str) -> str | None: + """Look up a public key hex string by fingerprint.""" + epoch = self.lookup(fingerprint) + return epoch.public_key_hex if epoch else None + + def rotate(self) -> Epoch: + """ + Rotate to a new key pair. + + 1. Generate new Ed25519 key pair + 2. Retire current active epoch + 3. Add new epoch as active + 4. Write new private key (securely replaces old) + 5. Save keyring atomically + """ + self._ensure_loaded() + + now = datetime.now(UTC).isoformat() + + for epoch in self._epochs: + if epoch.status == "active": + epoch.rotated_at = now + epoch.status = "retired" + + new_signing_key = SigningKey.generate() + public_hex = new_signing_key.verify_key.encode().hex() + new_epoch_num = (max(e.epoch for e in self._epochs) + 1) if self._epochs else 0 + + new_epoch = Epoch( + epoch=new_epoch_num, + algorithm="ed25519", + public_key_hex=public_hex, + fingerprint=_make_fingerprint(public_hex), + created_at=now, + rotated_at=None, + status="active", + ) + + self._epochs.append(new_epoch) + self._active_epoch = new_epoch_num + + self._key_path.parent.mkdir(parents=True, exist_ok=True) + old_umask = os.umask(0o077) + try: + self._key_path.write_bytes(bytes(new_signing_key)) + finally: + os.umask(old_umask) + self._key_path.chmod(0o600) + + self._save() + return new_epoch + + def register_key(self, signing_key: SigningKey) -> Epoch: + """ + Register an existing key in the keyring. Idempotent. + + Called by Seal when generating a key for a keyring that + does not yet track it. + """ + self._ensure_loaded() + + public_hex = signing_key.verify_key.encode().hex() + + for epoch in self._epochs: + if epoch.public_key_hex == public_hex: + return epoch + + new_epoch_num = (max(e.epoch for e in self._epochs) + 1) if self._epochs else 0 + now = datetime.now(UTC).isoformat() + + epoch = Epoch( + epoch=new_epoch_num, + algorithm="ed25519", + public_key_hex=public_hex, + fingerprint=_make_fingerprint(public_hex), + created_at=now, + rotated_at=None, + status="active", + ) + + self._epochs.append(epoch) + self._active_epoch = new_epoch_num + self._save() + return epoch + + def export_public_key(self) -> str | None: + """Export the active epoch's public key as a hex string.""" + active = self.get_active() + return active.public_key_hex if active else None + + def to_dict(self) -> dict[str, Any]: + """Serialize keyring to dict.""" + self._ensure_loaded() + return { + "version": self._version, + "active_epoch": self._active_epoch, + "epochs": [e.to_dict() for e in self._epochs], + } diff --git a/reference/python/src/qp_capsule/paths.py b/reference/python/src/qp_capsule/paths.py index 18ec131..607a0cd 100644 --- a/reference/python/src/qp_capsule/paths.py +++ b/reference/python/src/qp_capsule/paths.py @@ -49,6 +49,14 @@ def default_key_path() -> Path: return Path.home() / ".quantumpipes" / "key" +def default_keyring_path() -> Path: + """Get default keyring path from environment or home directory.""" + data_dir = os.environ.get("QUANTUMPIPES_DATA_DIR") + if data_dir: + return resolve_data_dir(data_dir) / "keyring.json" + return Path.home() / ".quantumpipes" / "keyring.json" + + def default_db_path() -> Path: """Get default database path from environment or home directory.""" data_dir = os.environ.get("QUANTUMPIPES_DATA_DIR") diff --git a/reference/python/src/qp_capsule/seal.py b/reference/python/src/qp_capsule/seal.py index 7da77ae..d306a2b 100644 --- a/reference/python/src/qp_capsule/seal.py +++ b/reference/python/src/qp_capsule/seal.py @@ -47,6 +47,7 @@ if TYPE_CHECKING: from qp_capsule.capsule import Capsule + from qp_capsule.keyring import Keyring # Optional post-quantum cryptography (FIPS 204 ML-DSA-65). # Available when installed with: pip install qp-capsule[pq] @@ -76,7 +77,13 @@ class Seal: - Permissions restricted to owner (0o600) """ - def __init__(self, key_path: Path | None = None, enable_pq: bool | None = None): + def __init__( + self, + key_path: Path | None = None, + enable_pq: bool | None = None, + *, + keyring: Keyring | None = None, + ): """ Initialize the Seal. @@ -87,10 +94,14 @@ def __init__(self, key_path: Path | None = None, enable_pq: bool | None = None): None = auto-detect (use if oqs library is available) True = require PQ (raises if unavailable) False = disable PQ even if available + keyring: Optional Keyring for epoch-aware verification. + When provided, verify() uses the capsule's signed_by + fingerprint to look up the correct epoch's public key. """ self.key_path = key_path or default_key_path() self._signing_key: SigningKey | None = None self._verify_key: VerifyKey | None = None + self._keyring = keyring # PQ state self._pq_secret_key: bytes | None = None @@ -123,6 +134,7 @@ def _ensure_keys(self) -> tuple[SigningKey, VerifyKey]: - Generated with secure random if not exists - Stored with restricted permissions (owner only) - Loaded from disk on subsequent calls + - Registered with keyring on first load (if keyring provided) Security: - Uses umask to ensure file is created with 0o600 permissions @@ -152,6 +164,9 @@ def _ensure_keys(self) -> tuple[SigningKey, VerifyKey]: self._verify_key = self._signing_key.verify_key + if self._keyring is not None: + self._keyring.register_key(self._signing_key) + if self._verify_key is None: raise SealError("Verify key not initialized") return self._signing_key, self._verify_key @@ -230,9 +245,13 @@ def get_key_fingerprint(self) -> str: """ Get a short fingerprint of the public key. - Returns: - First 16 characters of hex-encoded public key + Returns the keyring's ``qp_key_XXXX`` format when a keyring is + available, otherwise falls back to the legacy 16-char hex prefix. """ + if self._keyring is not None: + active = self._keyring.get_active() + if active is not None: + return active.fingerprint return self.get_public_key()[:16] def seal(self, capsule: Capsule) -> Capsule: @@ -338,9 +357,13 @@ def verify(self, capsule: Capsule, verify_pq: bool = False) -> bool: 1. Check Capsule is sealed (Ed25519 required, PQ depends on mode) 2. Recompute hash from content 3. Verify hash matches stored hash - 4. Verify Ed25519 signature (always) + 4. Verify Ed25519 signature (epoch-aware via keyring, or local key) 5. Verify post-quantum signature (if requested and present) + When a keyring is configured, the capsule's ``signed_by`` fingerprint + is used to look up the correct epoch's public key. This enables + verification of capsules signed across key rotations. + Args: capsule: The Capsule to verify verify_pq: If True, also verify post-quantum signature (if present) @@ -348,7 +371,6 @@ def verify(self, capsule: Capsule, verify_pq: bool = False) -> bool: Returns: True if seal is valid, False otherwise """ - # Check Capsule is sealed if not capsule.is_sealed(): return False @@ -363,14 +385,23 @@ def verify(self, capsule: Capsule, verify_pq: bool = False) -> bool: if computed_hash != capsule.hash: return False - # 3. Verify Ed25519 signature (required) - _, verify_key = self._ensure_keys() - verify_key.verify( + # 3. Determine verification key (keyring lookup or local key) + resolve_key: VerifyKey | None = None + if self._keyring is not None and capsule.signed_by: + pub_hex = self._keyring.lookup_public_key(capsule.signed_by) + if pub_hex is not None: + resolve_key = VerifyKey(bytes.fromhex(pub_hex)) + + if resolve_key is None: + _, resolve_key = self._ensure_keys() + + # 4. Verify Ed25519 signature + resolve_key.verify( capsule.hash.encode("utf-8"), bytes.fromhex(capsule.signature), ) - # 4. Verify post-quantum signature (if requested and present) + # 5. Verify post-quantum signature (if requested and present) if verify_pq and capsule.signature_pq: return self._verify_dilithium(capsule.hash, capsule.signature_pq) diff --git a/reference/python/src/qp_capsule/storage.py b/reference/python/src/qp_capsule/storage.py index 7ced9e3..6e81fbe 100644 --- a/reference/python/src/qp_capsule/storage.py +++ b/reference/python/src/qp_capsule/storage.py @@ -22,6 +22,7 @@ import json from collections.abc import Sequence from pathlib import Path +from typing import Any from uuid import UUID from sqlalchemy import Integer, String, Text, desc, func, select @@ -80,15 +81,17 @@ class CapsuleStorage: capsules = await storage.list(limit=10) """ - def __init__(self, db_path: Path | None = None): + def __init__(self, db_path: Path | None = None, **engine_kwargs: Any): """ Initialize storage. Args: db_path: Path to SQLite database file. Defaults to $QUANTUMPIPES_DATA_DIR/capsules.db or ~/.quantumpipes/capsules.db + **engine_kwargs: Forwarded to ``create_async_engine`` (e.g. ``poolclass``). """ self.db_path = db_path or default_db_path() + self._engine_kwargs = engine_kwargs self._engine: AsyncEngine | None = None self._session_factory: async_sessionmaker[AsyncSession] | None = None @@ -106,6 +109,7 @@ async def _ensure_db(self) -> None: self._engine = create_async_engine( f"sqlite+aiosqlite:///{self.db_path}", echo=False, + **self._engine_kwargs, ) async with self._engine.begin() as conn: diff --git a/reference/python/tests/conftest.py b/reference/python/tests/conftest.py index ab4e543..a685e90 100644 --- a/reference/python/tests/conftest.py +++ b/reference/python/tests/conftest.py @@ -7,20 +7,44 @@ Provides temporary storage and seal instances that are isolated per test. """ +import asyncio +import warnings from pathlib import Path import pytest +from sqlalchemy.pool import NullPool from qp_capsule.chain import CapsuleChain from qp_capsule.seal import Seal from qp_capsule.storage import CapsuleStorage +@pytest.fixture(autouse=True) +def _close_stale_event_loops(): + """Close orphaned event loops between tests. + + pytest-asyncio's per-test loops can survive teardown in the global event + loop policy. When a later sync test calls ``asyncio.run()``, the stale + loop is dereferenced without closing, and its self-pipe sockets trigger + ``ResourceWarning`` on GC. Closing here keeps ``filterwarnings=error`` + strict. + """ + yield + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + try: + loop = asyncio.get_event_loop() + except RuntimeError: + return + if not loop.is_running() and not loop.is_closed(): + loop.close() + + @pytest.fixture async def temp_storage(tmp_path: Path): """Create temporary SQLite storage for testing. Closes on teardown.""" db_path = tmp_path / "test_capsules.db" - storage = CapsuleStorage(db_path=db_path) + storage = CapsuleStorage(db_path=db_path, poolclass=NullPool) yield storage await storage.close() diff --git a/reference/python/tests/test_cli.py b/reference/python/tests/test_cli.py new file mode 100644 index 0000000..20d5b1f --- /dev/null +++ b/reference/python/tests/test_cli.py @@ -0,0 +1,789 @@ +""" +Tests for the capsule CLI. + +Tests cover the CLI entry point, verification logic, inspection, +key management, and the hash utility. +""" + +import json +import tempfile +from io import StringIO +from pathlib import Path + +import pytest +from nacl.signing import SigningKey + +from qp_capsule.capsule import Capsule, TriggerSection +from qp_capsule.cli import ( + VerifyError, + VerifyResult, + _build_parser, + _capsule_from_full_dict, + _capsule_to_full_dict, + _load_capsules_from_json, + _supports_color, + cmd_hash, + cmd_inspect, + cmd_keys, + cmd_verify, + main, + verify_chain, +) +from qp_capsule.keyring import Keyring +from qp_capsule.seal import Seal + + +@pytest.fixture +def temp_dir(): + with tempfile.TemporaryDirectory() as d: + yield Path(d) + + +@pytest.fixture +def key_path(temp_dir): + return temp_dir / "key" + + +@pytest.fixture +def keyring_path(temp_dir): + return temp_dir / "keyring.json" + + +@pytest.fixture +def seal(key_path): + return Seal(key_path=key_path) + + +def _make_sealed_chain(seal: Seal, count: int) -> list[Capsule]: + """Create a sealed chain of capsules.""" + capsules: list[Capsule] = [] + for i in range(count): + c = Capsule(trigger=TriggerSection(type="test", source="test-cli", request=f"req_{i}")) + c.sequence = i + c.previous_hash = capsules[-1].hash if capsules else None + seal.seal(c) + capsules.append(c) + return capsules + + +def _write_chain_json(capsules: list[Capsule], path: Path) -> Path: + """Write a sealed chain to a JSON file.""" + data = [_capsule_to_full_dict(c) for c in capsules] + path.write_text(json.dumps(data, indent=2)) + return path + + +# --------------------------------------------------------------------------- +# VerifyResult / VerifyError serialization +# --------------------------------------------------------------------------- + + +class TestVerifyResult: + def test_to_dict(self): + r = VerifyResult(valid=True, level="structural", capsules_verified=3, total_capsules=3) + d = r.to_dict() + assert d["valid"] is True + assert d["capsules_verified"] == 3 + + def test_with_errors(self): + err = VerifyError(sequence=2, capsule_id="abc", error="broken") + r = VerifyResult( + valid=False, level="full", capsules_verified=2, total_capsules=5, errors=[err] + ) + d = r.to_dict() + assert d["valid"] is False + assert len(d["errors"]) == 1 + assert d["errors"][0]["sequence"] == 2 + + +# --------------------------------------------------------------------------- +# Capsule dict helpers +# --------------------------------------------------------------------------- + + +class TestCapsuleDictHelpers: + def test_roundtrip(self, seal): + c = Capsule(trigger=TriggerSection(type="test", source="s", request="r")) + c.sequence = 0 + c.previous_hash = None + seal.seal(c) + + d = _capsule_to_full_dict(c) + restored = _capsule_from_full_dict(d) + + assert str(restored.id) == str(c.id) + assert restored.hash == c.hash + assert restored.signature == c.signature + assert restored.signed_by == c.signed_by + + def test_from_dict_without_seal_fields(self): + d = Capsule().to_dict() + c = _capsule_from_full_dict(d) + assert c.hash == "" + assert c.signature == "" + assert c.signed_by == "" + + +# --------------------------------------------------------------------------- +# JSON loading +# --------------------------------------------------------------------------- + + +class TestLoadFromJson: + def test_load_array(self, seal, temp_dir): + chain = _make_sealed_chain(seal, 3) + path = _write_chain_json(chain, temp_dir / "chain.json") + loaded = _load_capsules_from_json(path) + assert len(loaded) == 3 + assert loaded[0].hash == chain[0].hash + + def test_load_single_object(self, seal, temp_dir): + chain = _make_sealed_chain(seal, 1) + d = _capsule_to_full_dict(chain[0]) + path = temp_dir / "single.json" + path.write_text(json.dumps(d)) + loaded = _load_capsules_from_json(path) + assert len(loaded) == 1 + + def test_load_invalid_json_type(self, temp_dir): + path = temp_dir / "bad.json" + path.write_text('"just a string"') + with pytest.raises(ValueError, match="Expected JSON array"): + _load_capsules_from_json(path) + + +# --------------------------------------------------------------------------- +# Core verification logic +# --------------------------------------------------------------------------- + + +class TestVerifyChain: + def test_empty_chain(self): + r = verify_chain([]) + assert r.valid is True + assert r.capsules_verified == 0 + + def test_valid_structural(self, seal): + chain = _make_sealed_chain(seal, 5) + r = verify_chain(chain, level="structural") + assert r.valid is True + assert r.capsules_verified == 5 + + def test_valid_full(self, seal): + chain = _make_sealed_chain(seal, 3) + r = verify_chain(chain, level="full") + assert r.valid is True + + def test_valid_signatures(self, seal): + chain = _make_sealed_chain(seal, 3) + r = verify_chain(chain, level="signatures", seal=seal) + assert r.valid is True + + def test_sequence_gap(self, seal): + chain = _make_sealed_chain(seal, 3) + chain[1].sequence = 5 + r = verify_chain(chain, level="structural") + assert r.valid is False + assert r.capsules_verified == 1 + assert "Sequence gap" in r.errors[0].error + + def test_genesis_with_previous_hash(self, seal): + chain = _make_sealed_chain(seal, 1) + chain[0].previous_hash = "should_be_none" + r = verify_chain(chain, level="structural") + assert r.valid is False + assert "Genesis" in r.errors[0].error + + def test_broken_previous_hash(self, seal): + chain = _make_sealed_chain(seal, 3) + chain[2].previous_hash = "wrong" + r = verify_chain(chain, level="structural") + assert r.valid is False + assert r.capsules_verified == 2 + + def test_content_hash_mismatch(self, seal): + chain = _make_sealed_chain(seal, 3) + chain[1].trigger.request = "tampered" + r = verify_chain(chain, level="full") + assert r.valid is False + assert "hash mismatch" in r.errors[0].error + + def test_signature_failure(self, seal, key_path): + chain = _make_sealed_chain(seal, 2) + chain[0].signature = "00" * 64 + r = verify_chain(chain, level="signatures", seal=seal) + assert r.valid is False + assert "Signature" in r.errors[0].error + + +# --------------------------------------------------------------------------- +# cmd_verify +# --------------------------------------------------------------------------- + + +class TestCmdVerify: + def test_verify_json_structural(self, seal, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 3) + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", str(path)]) + assert cmd_verify(args) == 0 + + def test_verify_json_full(self, seal, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 3) + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", "--full", str(path)]) + assert cmd_verify(args) == 0 + + def test_verify_json_output(self, seal, temp_dir, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 2) + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", "--json", str(path)]) + assert cmd_verify(args) == 0 + out = json.loads(capsys.readouterr().out) + assert out["valid"] is True + + def test_verify_quiet_pass(self, seal, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 2) + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", "--quiet", str(path)]) + assert cmd_verify(args) == 0 + + def test_verify_quiet_fail(self, seal, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 2) + chain[1].previous_hash = "bad" + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", "--quiet", str(path)]) + assert cmd_verify(args) == 1 + + def test_verify_no_source(self, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + args = _build_parser().parse_args(["verify"]) + assert cmd_verify(args) == 2 + + def test_verify_multiple_sources(self, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + path = temp_dir / "c.json" + path.write_text("[]") + args = _build_parser().parse_args(["verify", str(path), "--db", "other.db"]) + assert cmd_verify(args) == 2 + + def test_verify_bad_file(self, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + args = _build_parser().parse_args(["verify", str(temp_dir / "nonexistent.json")]) + assert cmd_verify(args) == 2 + + def test_verify_broken_chain_colored(self, seal, temp_dir, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 3) + chain[1].previous_hash = "wrong" + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", str(path)]) + result = cmd_verify(args) + assert result == 1 + out = capsys.readouterr().out + assert "FAIL" in out + + def test_verify_db_structural(self, seal, temp_dir, monkeypatch): + """Verify from SQLite database.""" + monkeypatch.setenv("NO_COLOR", "1") + + async def _setup_db(): + from qp_capsule.storage import CapsuleStorage + + db_path = temp_dir / "test.db" + storage = CapsuleStorage(db_path=db_path) + for i in range(3): + c = Capsule( + trigger=TriggerSection(type="test", source="db-test", request=f"r{i}") + ) + c.sequence = i + c.previous_hash = None + if i > 0: + latest = await storage.get_latest() + c.previous_hash = latest.hash if latest else None + seal.seal(c) + await storage.store(c) + await storage.close() + return db_path + + import asyncio + + db_path = asyncio.run(_setup_db()) + args = _build_parser().parse_args(["verify", "--db", str(db_path)]) + assert cmd_verify(args) == 0 + + def test_verify_full_json_broken(self, seal, temp_dir, capsys, monkeypatch): + """--full --json on a broken chain produces structured error output.""" + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 3) + chain[1].trigger.request = "tampered" + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", "--full", "--json", str(path)]) + assert cmd_verify(args) == 1 + out = json.loads(capsys.readouterr().out) + assert out["valid"] is False + assert len(out["errors"]) == 1 + assert "hash mismatch" in out["errors"][0]["error"] + + def test_verify_colored_shows_skipped_capsules(self, seal, temp_dir, capsys, monkeypatch): + """Colored output shows skipped capsules beyond the break point.""" + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 5) + chain[2].previous_hash = "broken" + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", str(path)]) + cmd_verify(args) + out = capsys.readouterr().out + assert "skipped" in out + + def test_verify_signatures_with_keyring(self, temp_dir, monkeypatch): + """Signature verification using keyring via QUANTUMPIPES_DATA_DIR.""" + monkeypatch.setenv("NO_COLOR", "1") + monkeypatch.setenv("QUANTUMPIPES_DATA_DIR", str(temp_dir)) + + key_path = temp_dir / "key" + kr = Keyring(keyring_path=temp_dir / "keyring.json", key_path=key_path) + seal_obj = Seal(key_path=key_path, keyring=kr) + + chain = _make_sealed_chain(seal_obj, 3) + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", "--signatures", str(path)]) + assert cmd_verify(args) == 0 + + +# --------------------------------------------------------------------------- +# cmd_inspect +# --------------------------------------------------------------------------- + + +class TestCmdInspect: + def test_inspect_single_json(self, seal, temp_dir, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 1) + d = _capsule_to_full_dict(chain[0]) + path = temp_dir / "single.json" + path.write_text(json.dumps(d)) + + args = _build_parser().parse_args(["inspect", str(path)]) + assert cmd_inspect(args) == 0 + out = capsys.readouterr().out + assert "Capsule Inspection" in out + assert "Trigger" in out + assert "Outcome" in out + + def test_inspect_by_seq(self, seal, temp_dir, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 5) + path = _write_chain_json(chain, temp_dir / "chain.json") + + args = _build_parser().parse_args(["inspect", str(path), "--seq", "2"]) + assert cmd_inspect(args) == 0 + out = capsys.readouterr().out + assert "Sequence: 2" in out + + def test_inspect_by_id(self, seal, temp_dir, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 3) + target_id = str(chain[1].id) + path = _write_chain_json(chain, temp_dir / "chain.json") + + args = _build_parser().parse_args(["inspect", str(path), "--id", target_id]) + assert cmd_inspect(args) == 0 + out = capsys.readouterr().out + assert target_id in out + + def test_inspect_seq_not_found(self, seal, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 2) + path = _write_chain_json(chain, temp_dir / "chain.json") + + args = _build_parser().parse_args(["inspect", str(path), "--seq", "99"]) + assert cmd_inspect(args) == 2 + + def test_inspect_id_not_found(self, seal, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 2) + path = _write_chain_json(chain, temp_dir / "chain.json") + + args = _build_parser().parse_args(["inspect", str(path), "--id", "nonexistent"]) + assert cmd_inspect(args) == 2 + + def test_inspect_ambiguous_no_selector(self, seal, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 3) + path = _write_chain_json(chain, temp_dir / "chain.json") + + args = _build_parser().parse_args(["inspect", str(path)]) + assert cmd_inspect(args) == 2 + + def test_inspect_no_source(self, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + args = _build_parser().parse_args(["inspect"]) + assert cmd_inspect(args) == 2 + + def test_inspect_bad_file(self, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + args = _build_parser().parse_args(["inspect", str(temp_dir / "gone.json")]) + assert cmd_inspect(args) == 2 + + def test_inspect_with_outcome_error(self, seal, temp_dir, capsys, monkeypatch): + """Inspect prints the error field when outcome has an error.""" + monkeypatch.setenv("NO_COLOR", "1") + c = Capsule(trigger=TriggerSection(type="test", source="s", request="r")) + c.sequence = 0 + c.previous_hash = None + c.outcome.status = "failure" + c.outcome.error = "something went wrong" + seal.seal(c) + d = _capsule_to_full_dict(c) + path = temp_dir / "err.json" + path.write_text(json.dumps(d)) + + args = _build_parser().parse_args(["inspect", str(path)]) + assert cmd_inspect(args) == 0 + out = capsys.readouterr().out + assert "something went wrong" in out + + def test_inspect_from_db_by_id(self, seal, temp_dir, capsys, monkeypatch): + """Inspect a capsule from SQLite by ID.""" + monkeypatch.setenv("NO_COLOR", "1") + + async def _setup_db(): + from qp_capsule.storage import CapsuleStorage + + db_path = temp_dir / "inspect.db" + storage = CapsuleStorage(db_path=db_path) + c = Capsule(trigger=TriggerSection(type="test", source="db", request="check")) + c.sequence = 0 + c.previous_hash = None + seal.seal(c) + await storage.store(c) + await storage.close() + return db_path, str(c.id) + + import asyncio + + db_path, cid = asyncio.run(_setup_db()) + args = _build_parser().parse_args(["inspect", "--db", str(db_path), "--id", cid]) + assert cmd_inspect(args) == 0 + out = capsys.readouterr().out + assert "Capsule Inspection" in out + + def test_inspect_from_db_by_seq(self, seal, temp_dir, capsys, monkeypatch): + """Inspect a capsule from SQLite by sequence number.""" + monkeypatch.setenv("NO_COLOR", "1") + + async def _setup_db(): + from qp_capsule.storage import CapsuleStorage + + db_path = temp_dir / "inspect_seq.db" + storage = CapsuleStorage(db_path=db_path) + for i in range(3): + c = Capsule( + trigger=TriggerSection(type="test", source="db", request=f"seq{i}") + ) + c.sequence = i + c.previous_hash = None + if i > 0: + latest = await storage.get_latest() + c.previous_hash = latest.hash if latest else None + seal.seal(c) + await storage.store(c) + await storage.close() + return db_path + + import asyncio + + db_path = asyncio.run(_setup_db()) + args = _build_parser().parse_args(["inspect", "--db", str(db_path), "--seq", "1"]) + assert cmd_inspect(args) == 0 + out = capsys.readouterr().out + assert "Sequence: 1" in out + + +# --------------------------------------------------------------------------- +# cmd_keys +# --------------------------------------------------------------------------- + + +class TestCmdKeys: + def test_keys_info_empty(self, keyring_path, key_path, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + monkeypatch.setenv("QUANTUMPIPES_DATA_DIR", str(keyring_path.parent)) + + args = _build_parser().parse_args(["keys", "info"]) + assert cmd_keys(args) == 0 + out = capsys.readouterr().out + assert "No keys" in out + + def test_keys_info_with_epochs(self, key_path, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + monkeypatch.setenv("QUANTUMPIPES_DATA_DIR", str(key_path.parent)) + + sk = SigningKey.generate() + key_path.write_bytes(bytes(sk)) + + args = _build_parser().parse_args(["keys", "info"]) + assert cmd_keys(args) == 0 + out = capsys.readouterr().out + assert "Epoch 0" in out + assert "active" in out + + def test_keys_rotate(self, key_path, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + monkeypatch.setenv("QUANTUMPIPES_DATA_DIR", str(key_path.parent)) + + args = _build_parser().parse_args(["keys", "rotate"]) + assert cmd_keys(args) == 0 + out = capsys.readouterr().out + assert "Rotation" in out + assert "active" in out + + def test_keys_rotate_with_existing(self, key_path, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + monkeypatch.setenv("QUANTUMPIPES_DATA_DIR", str(key_path.parent)) + + sk = SigningKey.generate() + key_path.write_bytes(bytes(sk)) + + args = _build_parser().parse_args(["keys", "rotate"]) + assert cmd_keys(args) == 0 + out = capsys.readouterr().out + assert "retired" in out + + def test_keys_export(self, key_path, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + monkeypatch.setenv("QUANTUMPIPES_DATA_DIR", str(key_path.parent)) + + sk = SigningKey.generate() + key_path.write_bytes(bytes(sk)) + + args = _build_parser().parse_args(["keys", "export-public"]) + assert cmd_keys(args) == 0 + out = capsys.readouterr().out.strip() + assert len(out) == 64 + + def test_keys_export_no_key(self, keyring_path, key_path, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + monkeypatch.setenv("QUANTUMPIPES_DATA_DIR", str(keyring_path.parent)) + + args = _build_parser().parse_args(["keys", "export-public"]) + assert cmd_keys(args) == 2 + + def test_keys_no_subcommand(self, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + args = _build_parser().parse_args(["keys"]) + assert cmd_keys(args) == 2 + + +# --------------------------------------------------------------------------- +# cmd_hash +# --------------------------------------------------------------------------- + + +class TestCmdHash: + def test_hash_file(self, temp_dir, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + path = temp_dir / "doc.txt" + path.write_text("hello world") + + import hashlib + + expected = hashlib.sha3_256(b"hello world").hexdigest() + + args = _build_parser().parse_args(["hash", str(path)]) + assert cmd_hash(args) == 0 + out = capsys.readouterr().out.strip() + assert out == expected + + def test_hash_missing_file(self, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + args = _build_parser().parse_args(["hash", str(temp_dir / "gone.txt")]) + assert cmd_hash(args) == 2 + + +# --------------------------------------------------------------------------- +# main() integration +# --------------------------------------------------------------------------- + + +class TestMain: + def test_no_command(self, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + assert main([]) == 2 + + def test_version_flag(self, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + with pytest.raises(SystemExit, match="0"): + main(["--version"]) + out = capsys.readouterr().out + assert "capsule" in out + assert "1.3.0" in out + + def test_verify_via_main(self, seal, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 2) + path = _write_chain_json(chain, temp_dir / "c.json") + assert main(["verify", "--quiet", str(path)]) == 0 + + def test_hash_via_main(self, temp_dir, capsys, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + path = temp_dir / "f.bin" + path.write_bytes(b"\x00\x01\x02") + assert main(["hash", str(path)]) == 0 + + +# --------------------------------------------------------------------------- +# ANSI color support detection +# --------------------------------------------------------------------------- + + +class TestDefaultKeyringPath: + def test_default_keyring_path_without_env(self, monkeypatch): + monkeypatch.delenv("QUANTUMPIPES_DATA_DIR", raising=False) + from qp_capsule.paths import default_keyring_path + + p = default_keyring_path() + assert str(p).endswith("keyring.json") + assert ".quantumpipes" in str(p) + + +class TestColorSupport: + def test_no_color_env(self, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + assert _supports_color(StringIO()) is False + + def test_force_color_env(self, monkeypatch): + monkeypatch.delenv("NO_COLOR", raising=False) + monkeypatch.setenv("FORCE_COLOR", "1") + assert _supports_color(StringIO()) is True + + def test_non_tty(self, monkeypatch): + monkeypatch.delenv("NO_COLOR", raising=False) + monkeypatch.delenv("FORCE_COLOR", raising=False) + assert _supports_color(StringIO()) is False + + def test_ansi_no_color_passthrough(self): + """_c() returns plain text when _NO_COLOR is True.""" + import qp_capsule.cli as cli_mod + + old = cli_mod._NO_COLOR + cli_mod._NO_COLOR = True + try: + assert cli_mod._c("32", "hello") == "hello" + assert cli_mod._green("ok") == "ok" + assert cli_mod._red("err") == "err" + assert cli_mod._bold("b") == "b" + assert cli_mod._dim("d") == "d" + assert cli_mod._yellow("y") == "y" + finally: + cli_mod._NO_COLOR = old + + +# --------------------------------------------------------------------------- +# Seal + Keyring integration via CLI +# --------------------------------------------------------------------------- + + +class TestVerifyOutput: + """Test verify output formatting details.""" + + def test_level_description_in_output(self, seal, temp_dir, capsys, monkeypatch): + """Verify output includes the human-readable level description.""" + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 2) + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", "--full", str(path)]) + cmd_verify(args) + out = capsys.readouterr().out + assert "SHA3-256 recomputation" in out + + def test_structural_description_in_output(self, seal, temp_dir, capsys, monkeypatch): + """Default structural level shows its description.""" + monkeypatch.setenv("NO_COLOR", "1") + chain = _make_sealed_chain(seal, 1) + path = _write_chain_json(chain, temp_dir / "c.json") + + args = _build_parser().parse_args(["verify", str(path)]) + cmd_verify(args) + out = capsys.readouterr().out + assert "sequence + hash linkage" in out + + def test_empty_chain_json(self, temp_dir, capsys, monkeypatch): + """--json on an empty chain produces valid JSON with 0 capsules.""" + monkeypatch.setenv("NO_COLOR", "1") + path = temp_dir / "empty.json" + path.write_text("[]") + + args = _build_parser().parse_args(["verify", "--json", str(path)]) + assert cmd_verify(args) == 0 + out = json.loads(capsys.readouterr().out) + assert out["valid"] is True + assert out["capsules_verified"] == 0 + assert out["total_capsules"] == 0 + + def test_signatures_without_seal_skips_sigs(self, seal): + """verify_chain(level='signatures') without seal= skips signature checks gracefully.""" + chain = _make_sealed_chain(seal, 3) + r = verify_chain(chain, level="signatures", seal=None) + assert r.valid is True + assert r.capsules_verified == 3 + + +class TestKeyringVerification: + """Test that signature verification works across key rotations.""" + + def test_verify_with_keyring(self, temp_dir, monkeypatch): + monkeypatch.setenv("NO_COLOR", "1") + key_path = temp_dir / "key" + kr_path = temp_dir / "keyring.json" + + kr = Keyring(keyring_path=kr_path, key_path=key_path) + seal_obj = Seal(key_path=key_path, keyring=kr) + + chain = _make_sealed_chain(seal_obj, 3) + + r = verify_chain(chain, level="signatures", seal=seal_obj) + assert r.valid is True + + def test_verify_across_rotation(self, temp_dir, monkeypatch): + """Capsules signed with old key verify after rotation.""" + monkeypatch.setenv("NO_COLOR", "1") + key_path = temp_dir / "key" + kr_path = temp_dir / "keyring.json" + + kr = Keyring(keyring_path=kr_path, key_path=key_path) + seal_obj = Seal(key_path=key_path, keyring=kr) + + c0 = Capsule(trigger=TriggerSection(type="test", source="s", request="before rotation")) + c0.sequence = 0 + c0.previous_hash = None + seal_obj.seal(c0) + + kr.rotate() + + seal_obj2 = Seal(key_path=key_path, keyring=kr) + + c1 = Capsule(trigger=TriggerSection(type="test", source="s", request="after rotation")) + c1.sequence = 1 + c1.previous_hash = c0.hash + seal_obj2.seal(c1) + + r = verify_chain([c0, c1], level="signatures", seal=seal_obj2) + assert r.valid is True + assert r.capsules_verified == 2 diff --git a/reference/python/tests/test_keyring.py b/reference/python/tests/test_keyring.py new file mode 100644 index 0000000..b10e60d --- /dev/null +++ b/reference/python/tests/test_keyring.py @@ -0,0 +1,371 @@ +""" +Tests for epoch-based keyring management. + +Tests keyring creation, migration, rotation, lookup, and edge cases. +""" + +import json +import tempfile +from pathlib import Path + +import pytest +from nacl.signing import SigningKey + +from qp_capsule.exceptions import KeyringError +from qp_capsule.keyring import Epoch, Keyring, _make_fingerprint + + +@pytest.fixture +def temp_dir(): + """Provide a temporary directory for keys and keyring.""" + with tempfile.TemporaryDirectory() as d: + yield Path(d) + + +@pytest.fixture +def key_path(temp_dir): + return temp_dir / "key" + + +@pytest.fixture +def keyring_path(temp_dir): + return temp_dir / "keyring.json" + + +@pytest.fixture +def keyring(keyring_path, key_path): + return Keyring(keyring_path=keyring_path, key_path=key_path) + + +@pytest.fixture +def existing_key(key_path): + """Create a key file with a generated Ed25519 key.""" + sk = SigningKey.generate() + key_path.parent.mkdir(parents=True, exist_ok=True) + key_path.write_bytes(bytes(sk)) + return sk + + +class TestEpoch: + """Test Epoch dataclass serialization.""" + + def test_to_dict_roundtrip(self): + epoch = Epoch( + epoch=0, + algorithm="ed25519", + public_key_hex="abcd1234" * 8, + fingerprint="qp_key_abcd", + created_at="2026-01-01T00:00:00+00:00", + rotated_at=None, + status="active", + ) + d = epoch.to_dict() + restored = Epoch.from_dict(d) + assert restored.epoch == 0 + assert restored.algorithm == "ed25519" + assert restored.fingerprint == "qp_key_abcd" + assert restored.status == "active" + assert restored.rotated_at is None + + def test_from_dict_with_rotated_at(self): + d = { + "epoch": 1, + "algorithm": "ed25519", + "public_key_hex": "ff" * 32, + "fingerprint": "qp_key_ffff", + "created_at": "2026-01-01T00:00:00+00:00", + "rotated_at": "2026-02-01T00:00:00+00:00", + "status": "retired", + } + epoch = Epoch.from_dict(d) + assert epoch.status == "retired" + assert epoch.rotated_at == "2026-02-01T00:00:00+00:00" + + +class TestMakeFingerprint: + def test_creates_qp_key_prefix(self): + assert _make_fingerprint("abcd1234") == "qp_key_abcd" + + def test_uses_first_four_hex_chars(self): + assert _make_fingerprint("ff00aabb" + "00" * 28) == "qp_key_ff00" + + +class TestKeyringCreation: + """Test creating keyrings from scratch.""" + + def test_empty_keyring_when_nothing_exists(self, keyring): + assert keyring.epochs == [] + assert keyring.active_epoch == 0 + assert keyring.get_active() is None + + def test_keyring_path_property(self, keyring, keyring_path): + assert keyring.path == keyring_path + + def test_key_path_property(self, keyring, key_path): + assert keyring.key_path == key_path + + def test_to_dict_empty(self, keyring): + d = keyring.to_dict() + assert d["version"] == 1 + assert d["active_epoch"] == 0 + assert d["epochs"] == [] + + +class TestKeyringMigration: + """Test seamless migration from existing key files.""" + + def test_migrates_existing_key(self, keyring, existing_key): + epochs = keyring.epochs + assert len(epochs) == 1 + assert epochs[0].epoch == 0 + assert epochs[0].status == "active" + assert epochs[0].algorithm == "ed25519" + + expected_pub = existing_key.verify_key.encode().hex() + assert epochs[0].public_key_hex == expected_pub + + def test_migration_creates_keyring_file(self, keyring, keyring_path, existing_key): + _ = keyring.epochs + assert keyring_path.exists() + + data = json.loads(keyring_path.read_text("utf-8")) + assert data["version"] == 1 + assert len(data["epochs"]) == 1 + + def test_migration_fingerprint_format(self, keyring, existing_key): + epoch = keyring.epochs[0] + assert epoch.fingerprint.startswith("qp_key_") + assert epoch.fingerprint == f"qp_key_{epoch.public_key_hex[:4]}" + + def test_migration_not_triggered_if_keyring_exists(self, keyring_path, key_path, existing_key): + keyring_path.write_text('{"version": 1, "active_epoch": 0, "epochs": []}') + kr = Keyring(keyring_path=keyring_path, key_path=key_path) + assert kr.epochs == [] + + +class TestKeyringLoad: + """Test loading keyrings from disk.""" + + def test_load_valid_keyring(self, keyring_path, key_path): + data = { + "version": 1, + "active_epoch": 0, + "epochs": [ + { + "epoch": 0, + "algorithm": "ed25519", + "public_key_hex": "aa" * 32, + "fingerprint": "qp_key_aaaa", + "created_at": "2026-01-01T00:00:00+00:00", + "rotated_at": None, + "status": "active", + } + ], + } + keyring_path.write_text(json.dumps(data)) + + kr = Keyring(keyring_path=keyring_path, key_path=key_path) + assert len(kr.epochs) == 1 + assert kr.active_epoch == 0 + + def test_load_wrong_version_raises(self, keyring_path, key_path): + data = {"version": 99, "active_epoch": 0, "epochs": []} + keyring_path.write_text(json.dumps(data)) + + kr = Keyring(keyring_path=keyring_path, key_path=key_path) + with pytest.raises(KeyringError, match="Unsupported keyring version"): + _ = kr.epochs + + def test_load_corrupt_json_raises(self, keyring_path, key_path): + keyring_path.write_text("{{not valid json") + + kr = Keyring(keyring_path=keyring_path, key_path=key_path) + with pytest.raises(KeyringError, match="Failed to read keyring"): + _ = kr.epochs + + def test_load_is_idempotent(self, keyring, existing_key): + _ = keyring.epochs + epochs_again = keyring.epochs + assert len(epochs_again) == 1 + + def test_explicit_load_resets(self, keyring_path, key_path, existing_key): + kr = Keyring(keyring_path=keyring_path, key_path=key_path) + kr.load() + assert len(kr.epochs) == 1 + kr.load() + assert len(kr.epochs) == 1 + + +class TestKeyringRotation: + """Test key rotation with epoch tracking.""" + + def test_rotate_from_empty(self, keyring): + epoch = keyring.rotate() + assert epoch.epoch == 0 + assert epoch.status == "active" + assert epoch.fingerprint.startswith("qp_key_") + assert len(keyring.epochs) == 1 + + def test_rotate_retires_old(self, keyring, existing_key): + old = keyring.epochs[0] + assert old.status == "active" + + new = keyring.rotate() + epochs = keyring.epochs + + assert len(epochs) == 2 + assert epochs[0].status == "retired" + assert epochs[0].rotated_at is not None + assert epochs[1].status == "active" + assert new.epoch == 1 + + def test_double_rotate(self, keyring, existing_key): + keyring.rotate() + keyring.rotate() + epochs = keyring.epochs + + assert len(epochs) == 3 + assert epochs[0].status == "retired" + assert epochs[1].status == "retired" + assert epochs[2].status == "active" + assert keyring.active_epoch == 2 + + def test_rotate_writes_new_key_file(self, keyring, key_path, existing_key): + old_bytes = key_path.read_bytes() + keyring.rotate() + new_bytes = key_path.read_bytes() + assert old_bytes != new_bytes + + def test_rotate_saves_keyring(self, keyring, keyring_path, existing_key): + keyring.rotate() + data = json.loads(keyring_path.read_text("utf-8")) + assert len(data["epochs"]) == 2 + assert data["active_epoch"] == 1 + + def test_rotate_key_permissions(self, keyring, key_path, existing_key): + keyring.rotate() + stat = key_path.stat() + assert stat.st_mode & 0o777 == 0o600 + + +class TestKeyringLookup: + """Test epoch lookup by fingerprint.""" + + def test_lookup_by_new_format(self, keyring, existing_key): + epoch = keyring.epochs[0] + found = keyring.lookup(epoch.fingerprint) + assert found is not None + assert found.epoch == 0 + + def test_lookup_by_legacy_16char(self, keyring, existing_key): + epoch = keyring.epochs[0] + legacy_fp = epoch.public_key_hex[:16] + found = keyring.lookup(legacy_fp) + assert found is not None + assert found.epoch == 0 + + def test_lookup_nonexistent_returns_none(self, keyring, existing_key): + assert keyring.lookup("qp_key_zzzz") is None + + def test_lookup_public_key(self, keyring, existing_key): + epoch = keyring.epochs[0] + pub = keyring.lookup_public_key(epoch.fingerprint) + assert pub == epoch.public_key_hex + + def test_lookup_public_key_missing(self, keyring, existing_key): + assert keyring.lookup_public_key("nonexistent") is None + + def test_lookup_after_rotation(self, keyring, existing_key): + old_fp = keyring.epochs[0].fingerprint + new_epoch = keyring.rotate() + + assert keyring.lookup(old_fp) is not None + assert keyring.lookup(new_epoch.fingerprint) is not None + + +class TestKeyringRegister: + """Test idempotent key registration.""" + + def test_register_new_key(self, keyring): + sk = SigningKey.generate() + epoch = keyring.register_key(sk) + + assert epoch.epoch == 0 + assert epoch.status == "active" + assert epoch.public_key_hex == sk.verify_key.encode().hex() + + def test_register_same_key_is_idempotent(self, keyring): + sk = SigningKey.generate() + e1 = keyring.register_key(sk) + e2 = keyring.register_key(sk) + assert e1.epoch == e2.epoch + assert len(keyring.epochs) == 1 + + def test_register_appends_to_existing(self, keyring, existing_key): + new_sk = SigningKey.generate() + epoch = keyring.register_key(new_sk) + assert epoch.epoch == 1 + assert len(keyring.epochs) == 2 + + +class TestKeyringExport: + """Test public key export.""" + + def test_export_active_key(self, keyring, existing_key): + pub = keyring.export_public_key() + assert pub is not None + assert pub == existing_key.verify_key.encode().hex() + + def test_export_returns_none_when_empty(self, keyring): + assert keyring.export_public_key() is None + + def test_export_after_rotation(self, keyring, existing_key): + old_pub = keyring.export_public_key() + keyring.rotate() + new_pub = keyring.export_public_key() + assert new_pub is not None + assert new_pub != old_pub + + +class TestKeyringAtomicWrite: + """Test that saves are crash-safe.""" + + def test_save_creates_parent_dirs(self, temp_dir): + deep_path = temp_dir / "a" / "b" / "keyring.json" + kr = Keyring(keyring_path=deep_path, key_path=temp_dir / "key") + kr.rotate() + assert deep_path.exists() + + def test_keyring_file_is_valid_json(self, keyring, keyring_path, existing_key): + _ = keyring.epochs + data = json.loads(keyring_path.read_text("utf-8")) + assert "version" in data + assert "epochs" in data + + +class TestKeyringAllRetired: + """Test keyring when all epochs are retired.""" + + def test_get_active_returns_none_when_all_retired(self, keyring, existing_key): + _ = keyring.epochs + keyring.rotate() + for ep in keyring._epochs: + ep.status = "retired" + assert keyring.get_active() is None + + def test_export_returns_none_when_all_retired(self, keyring, existing_key): + _ = keyring.epochs + for ep in keyring._epochs: + ep.status = "retired" + assert keyring.export_public_key() is None + + +class TestKeyringMigrationEdgeCases: + """Test migration error handling.""" + + def test_corrupt_key_file_raises(self, temp_dir): + key_path = temp_dir / "key" + key_path.write_bytes(b"not a valid ed25519 key") + kr = Keyring(keyring_path=temp_dir / "keyring.json", key_path=key_path) + with pytest.raises(KeyringError, match="Failed to migrate"): + _ = kr.epochs diff --git a/reference/python/tests/test_protocol_structure.py b/reference/python/tests/test_protocol_structure.py index bc8ba22..de8f4a6 100644 --- a/reference/python/tests/test_protocol_structure.py +++ b/reference/python/tests/test_protocol_structure.py @@ -708,7 +708,7 @@ def test_typescript_release_workflow_exists(self): def test_typescript_release_triggers_on_tags(self): text = (REPO_ROOT / ".github" / "workflows" / "typescript-release.yaml").read_text() - assert '"v*"' in text, "TypeScript release must trigger on version tags" + assert '"ts-v*"' in text, "TypeScript release must trigger on version tags" def test_typescript_release_runs_conformance(self): text = (REPO_ROOT / ".github" / "workflows" / "typescript-release.yaml").read_text() diff --git a/reference/python/tests/test_seal.py b/reference/python/tests/test_seal.py index 92ec213..87162d7 100644 --- a/reference/python/tests/test_seal.py +++ b/reference/python/tests/test_seal.py @@ -224,6 +224,130 @@ def test_verify_with_key(self, seal, sample_capsule): assert other_seal.verify_with_key(sample_capsule, public_key) is True +class TestSealKeyringIntegration: + """Test Seal behavior when configured with a Keyring.""" + + def test_seal_uses_keyring_fingerprint_format(self, temp_key_path): + """Capsules sealed with a keyring get the qp_key_XXXX format.""" + from qp_capsule.keyring import Keyring + + kr = Keyring( + keyring_path=temp_key_path.parent / "keyring.json", + key_path=temp_key_path, + ) + s = Seal(key_path=temp_key_path, keyring=kr) + capsule = Capsule(trigger=TriggerSection(type="test", source="t", request="r")) + s.seal(capsule) + + assert capsule.signed_by.startswith("qp_key_") + + def test_fingerprint_falls_back_when_keyring_has_no_active(self, temp_key_path): + """get_key_fingerprint falls back to 16-char hex when keyring has no active epoch.""" + from qp_capsule.keyring import Keyring + + kr = Keyring( + keyring_path=temp_key_path.parent / "keyring.json", + key_path=temp_key_path, + ) + s = Seal(key_path=temp_key_path, keyring=kr) + s.seal(Capsule()) + + kr._epochs[0].status = "retired" + + fp = s.get_key_fingerprint() + assert not fp.startswith("qp_key_") + assert len(fp) == 16 + + def test_verify_falls_back_when_keyring_lookup_misses(self, temp_key_path): + """verify() falls back to local key when signed_by isn't in the keyring.""" + from qp_capsule.keyring import Keyring + + kr = Keyring( + keyring_path=temp_key_path.parent / "keyring.json", + key_path=temp_key_path, + ) + s = Seal(key_path=temp_key_path, keyring=kr) + + capsule = Capsule(trigger=TriggerSection(type="test", source="t", request="r")) + s.seal(capsule) + capsule.signed_by = "unknown_fingerprint" + + assert s.verify(capsule) is True + + def test_verify_uses_keyring_for_old_epoch(self, temp_key_path): + """verify() resolves old-epoch key from keyring for capsules signed before rotation.""" + from qp_capsule.keyring import Keyring + + kr = Keyring( + keyring_path=temp_key_path.parent / "keyring.json", + key_path=temp_key_path, + ) + s = Seal(key_path=temp_key_path, keyring=kr) + + capsule = Capsule(trigger=TriggerSection(type="test", source="t", request="r")) + s.seal(capsule) + old_fp = capsule.signed_by + + kr.rotate() + s2 = Seal(key_path=temp_key_path, keyring=kr) + + assert s2.verify(capsule) is True + assert capsule.signed_by == old_fp + + def test_verify_with_empty_signed_by_falls_back(self, temp_key_path): + """verify() falls back to local key when capsule has empty signed_by.""" + from qp_capsule.keyring import Keyring + + kr = Keyring( + keyring_path=temp_key_path.parent / "keyring.json", + key_path=temp_key_path, + ) + s = Seal(key_path=temp_key_path, keyring=kr) + + capsule = Capsule(trigger=TriggerSection(type="test", source="t", request="r")) + s.seal(capsule) + capsule.signed_by = "" + + assert s.verify(capsule) is True + + def test_ensure_keys_registers_with_keyring(self, temp_key_path): + """_ensure_keys() auto-registers the generated key in the keyring.""" + from qp_capsule.keyring import Keyring + + kr = Keyring( + keyring_path=temp_key_path.parent / "keyring.json", + key_path=temp_key_path, + ) + assert kr.epochs == [] + + s = Seal(key_path=temp_key_path, keyring=kr) + s.seal(Capsule()) + + assert len(kr.epochs) == 1 + assert kr.epochs[0].status == "active" + + def test_ensure_keys_with_existing_key_registers_once(self, temp_key_path): + """Loading an existing key also registers it, idempotently.""" + from nacl.signing import SigningKey as SK + + from qp_capsule.keyring import Keyring + + sk = SK.generate() + temp_key_path.parent.mkdir(parents=True, exist_ok=True) + temp_key_path.write_bytes(bytes(sk)) + + kr = Keyring( + keyring_path=temp_key_path.parent / "keyring.json", + key_path=temp_key_path, + ) + s = Seal(key_path=temp_key_path, keyring=kr) + s.seal(Capsule()) + s2 = Seal(key_path=temp_key_path, keyring=kr) + s2.seal(Capsule()) + + assert len(kr.epochs) == 1 + + class TestComputeHash: """Test standalone hash function.""" diff --git a/spec/README.md b/spec/README.md index b8396dd..686a809 100644 --- a/spec/README.md +++ b/spec/README.md @@ -391,7 +391,50 @@ A sealed Capsule is valid regardless of where it is stored. An attacker who copi --- -## 8. URI Scheme +## 8. Key Management Recommendations + +The CPS does not mandate a specific key management implementation. The following are recommendations for conformant implementations. + +### 8.1 Epoch-Based Rotation + +Implementations SHOULD support key rotation through an *epoch* model: + +- Each epoch represents a single Ed25519 key pair with a lifecycle: **active**, **retired** +- At any time, exactly one epoch is active (used for signing new Capsules) +- Retired epochs retain their public key for verification of previously-signed Capsules +- The private key from a retired epoch SHOULD be securely deleted + +### 8.2 Keyring + +Implementations SHOULD maintain a keyring file (or equivalent) that maps fingerprints to public keys across epochs. This enables verification of Capsules signed by any epoch without manual key management. + +### 8.3 Backward-Compatible Verification + +When verifying a sealed Capsule, implementations SHOULD: + +1. Read the Capsule's `signed_by` fingerprint +2. Look up the corresponding public key in the keyring +3. Verify the Ed25519 signature with the resolved key +4. Fall back to the local active key if the fingerprint is not found + +This allows a single verifier to validate Capsules from any epoch. + +### 8.4 NIST SP 800-57 Alignment + +| Lifecycle Phase | Recommendation | +|---|---| +| Generation | Generate Ed25519 key pair using cryptographically secure random | +| Active | Use for new Capsule signatures | +| Retired | Retain public key in keyring; delete private key | +| Destroyed | Overwrite private key file on rotation | + +### 8.5 Fingerprints + +Implementations SHOULD identify signing keys by a short fingerprint derived from the public key. The Python reference uses the format `qp_key_XXXX` (first 4 hex characters of the public key). Implementations MAY use different fingerprint formats but MUST ensure fingerprints are unique within a keyring. + +--- + +## 9. URI Scheme Capsules are content-addressable via the `capsule://` URI scheme. Every sealed Capsule can be referenced by its SHA3-256 hash: diff --git a/spec/VERSION b/spec/VERSION index 5625e59..7e32cd5 100644 --- a/spec/VERSION +++ b/spec/VERSION @@ -1 +1 @@ -1.2 +1.3