diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4395c4e..a5b5137 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -139,7 +139,6 @@ jobs: strategy: matrix: python-version: ["3.10", "3.11", "3.12"] - steps: - uses: actions/checkout@v5 @@ -156,6 +155,54 @@ jobs: working-directory: packages/hdp-langchain run: pytest tests/ -v + test-llama-index-callbacks-hdp: + name: Test llama-index-callbacks-hdp (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v5 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + working-directory: packages/llama-index-callbacks-hdp + run: pip install -e ".[dev]" + + - name: Run tests + working-directory: packages/llama-index-callbacks-hdp + run: pytest tests/ -v + + test-hdp-llamaindex: + name: Test hdp-llamaindex (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + needs: test-llama-index-callbacks-hdp + strategy: + matrix: + python-version: ["3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v5 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install primary package + working-directory: packages/llama-index-callbacks-hdp + run: pip install -e ".[dev]" + + - name: Install metapackage + working-directory: packages/hdp-llamaindex + run: pip install -e . + + - name: Verify metapackage imports + run: python -c "from hdp_llamaindex import HdpCallbackHandler, HdpInstrumentationHandler, HdpNodePostprocessor, verify_chain; print('hdp-llamaindex imports OK')" + test-hdp-physical-py: name: Test hdp-physical-py (Python ${{ matrix.python-version }}) runs-on: ubuntu-latest diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 12b7bad..e800c77 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,6 +9,8 @@ on: - 'python/hdp-langchain/v*' - 'python/hdp-autogen/v*' - 'python/hdp-physical/v*' + - 'python/llama-index-callbacks-hdp/v*' + - 'python/hdp-llamaindex/v*' - 'node/hdp-autogen/v*' - 'node/hdp-physical/v*' @@ -707,6 +709,169 @@ jobs: packages-dir: dist/ skip-existing: true + # ── llama-index-callbacks-hdp ───────────────────────────────────────────── + + test-llama-index-callbacks-hdp: + name: Test llama-index-callbacks-hdp + if: startsWith(github.ref, 'refs/tags/python/llama-index-callbacks-hdp/v') + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install dependencies + working-directory: packages/llama-index-callbacks-hdp + run: pip install -e ".[dev]" + + - name: Run tests + working-directory: packages/llama-index-callbacks-hdp + run: pytest tests/ -v + + vet-llama-index-callbacks-hdp: + name: Build & Vet llama-index-callbacks-hdp (ReleaseGuard) + needs: test-llama-index-callbacks-hdp + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install build tools + run: pip install build + + - name: Build distribution + working-directory: packages/llama-index-callbacks-hdp + run: python -m build + + - name: Vet artifacts with ReleaseGuard + uses: Helixar-AI/ReleaseGuard@94c067008f3ad516d4b61a6e7163d9d5518a4548 + with: + path: packages/llama-index-callbacks-hdp/dist + config: packages/llama-index-callbacks-hdp/.releaseguard.yml + sbom: 'true' + fix: 'true' + format: sarif + artifact-name: releaseguard-evidence-llama-index-callbacks-hdp + + - name: Upload vetted distribution + uses: actions/upload-artifact@v4 + with: + name: llama-index-callbacks-hdp-dist + path: packages/llama-index-callbacks-hdp/dist/ + retention-days: 1 + + publish-llama-index-callbacks-hdp: + name: Publish llama-index-callbacks-hdp to PyPI + needs: vet-llama-index-callbacks-hdp + runs-on: ubuntu-latest + permissions: + contents: read + id-token: write + environment: + name: pypi-llama-index-callbacks-hdp + url: https://pypi.org/project/llama-index-callbacks-hdp/ + steps: + - name: Download vetted distribution + uses: actions/download-artifact@v4 + with: + name: llama-index-callbacks-hdp-dist + path: dist/ + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + packages-dir: dist/ + + # ── hdp-llamaindex (metapackage) ─────────────────────────────────────────── + + test-hdp-llamaindex: + name: Test hdp-llamaindex + if: startsWith(github.ref, 'refs/tags/python/hdp-llamaindex/v') + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install primary package + working-directory: packages/llama-index-callbacks-hdp + run: pip install -e ".[dev]" + + - name: Install metapackage + working-directory: packages/hdp-llamaindex + run: pip install -e . + + - name: Verify metapackage imports + run: python -c "from hdp_llamaindex import HdpCallbackHandler, HdpInstrumentationHandler, HdpNodePostprocessor, verify_chain; print('OK')" + + vet-hdp-llamaindex: + name: Build & Vet hdp-llamaindex (ReleaseGuard) + needs: test-hdp-llamaindex + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install build tools + run: pip install build + + - name: Build distribution + working-directory: packages/hdp-llamaindex + run: python -m build + + - name: Vet artifacts with ReleaseGuard + uses: Helixar-AI/ReleaseGuard@94c067008f3ad516d4b61a6e7163d9d5518a4548 + with: + path: packages/hdp-llamaindex/dist + config: packages/hdp-llamaindex/.releaseguard.yml + sbom: 'true' + fix: 'true' + format: sarif + artifact-name: releaseguard-evidence-hdp-llamaindex + + - name: Upload vetted distribution + uses: actions/upload-artifact@v4 + with: + name: hdp-llamaindex-dist + path: packages/hdp-llamaindex/dist/ + retention-days: 1 + + publish-hdp-llamaindex: + name: Publish hdp-llamaindex to PyPI + needs: vet-hdp-llamaindex + runs-on: ubuntu-latest + permissions: + contents: read + id-token: write + environment: + name: pypi-hdp-llamaindex + url: https://pypi.org/project/hdp-llamaindex/ + steps: + - name: Download vetted distribution + uses: actions/download-artifact@v4 + with: + name: hdp-llamaindex-dist + path: dist/ + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + packages-dir: dist/ + # ── hdp-physical (TypeScript) ───────────────────────────────────────────── test-hdp-physical: diff --git a/packages/hdp-llamaindex/.releaseguard.yml b/packages/hdp-llamaindex/.releaseguard.yml new file mode 100644 index 0000000..81e8b4c --- /dev/null +++ b/packages/hdp-llamaindex/.releaseguard.yml @@ -0,0 +1,58 @@ +# ReleaseGuard policy for hdp-llamaindex Python artifacts +# https://github.com/Helixar-AI/ReleaseGuard +# +# Scans the built wheel and sdist before they are published to PyPI. +# Run locally: releaseguard check ./dist +version: 2 + +project: + name: hdp-llamaindex + mode: release + +inputs: + - path: ./dist + type: directory + +sbom: + enabled: true + ecosystems: [python] + formats: [cyclonedx] + enrich_cve: false + +scanning: + secrets: + enabled: true + metadata: + enabled: true + fail_on_source_maps: false + fail_on_internal_urls: false + fail_on_build_paths: false + unexpected_files: + enabled: true + deny: + - ".env" + - "*.bak" + - "*.tmp" + - "*.key" + - "*.pem" + licenses: + enabled: true + require: + - LICENSE + +transforms: + add_checksums: false + add_manifest: false + +policy: + fail_on: + - severity: critical + - category: secret + warn_on: + - severity: high + +output: + reports: + - cli + - sarif + directory: ./.releaseguard diff --git a/packages/hdp-llamaindex/README.md b/packages/hdp-llamaindex/README.md new file mode 100644 index 0000000..901298a --- /dev/null +++ b/packages/hdp-llamaindex/README.md @@ -0,0 +1,11 @@ +# hdp-llamaindex + +Metapackage for users who discover HDP first. + +```bash +pip install hdp-llamaindex +``` + +This installs `llama-index-callbacks-hdp` and re-exports all classes from the `hdp_llamaindex` namespace. + +For full documentation see [llama-index-callbacks-hdp](../llama-index-callbacks-hdp/README.md). diff --git a/packages/hdp-llamaindex/pyproject.toml b/packages/hdp-llamaindex/pyproject.toml new file mode 100644 index 0000000..b17e19c --- /dev/null +++ b/packages/hdp-llamaindex/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "hdp-llamaindex" +version = "0.1.0" +description = "HDP (Human Delegation Provenance) integration for LlamaIndex — metapackage" +readme = "README.md" +license = { text = "CC-BY-4.0" } +requires-python = ">=3.10" +dependencies = [ + "llama-index-callbacks-hdp>=0.1.0", +] + +[project.urls] +Homepage = "https://github.com/Helixar-AI/HDP" +Repository = "https://github.com/Helixar-AI/HDP" + +[tool.hatch.build.targets.wheel] +packages = ["src/hdp_llamaindex"] diff --git a/packages/hdp-llamaindex/src/hdp_llamaindex/__init__.py b/packages/hdp-llamaindex/src/hdp_llamaindex/__init__.py new file mode 100644 index 0000000..60e53b0 --- /dev/null +++ b/packages/hdp-llamaindex/src/hdp_llamaindex/__init__.py @@ -0,0 +1,43 @@ +"""hdp-llamaindex — convenience re-export of the HDP LlamaIndex integration. + +Install via `pip install hdp-llamaindex` if you discover HDP first. +All classes are importable from here or from `llama_index.callbacks.hdp`. +""" + +from llama_index.callbacks.hdp import ( + DataClassification, + HdpCallbackHandler, + HdpInstrumentationHandler, + HdpNodePostprocessor, + HdpPrincipal, + HdpScope, + HdpToken, + HDPScopeViolationError, + HopRecord, + HopVerification, + ScopePolicy, + VerificationResult, + clear_token, + get_token, + set_token, + verify_chain, +) + +__all__ = [ + "DataClassification", + "HdpCallbackHandler", + "HdpInstrumentationHandler", + "HdpNodePostprocessor", + "HdpPrincipal", + "HdpScope", + "HdpToken", + "HDPScopeViolationError", + "HopRecord", + "HopVerification", + "ScopePolicy", + "VerificationResult", + "clear_token", + "get_token", + "set_token", + "verify_chain", +] diff --git a/packages/llama-index-callbacks-hdp/.releaseguard.yml b/packages/llama-index-callbacks-hdp/.releaseguard.yml new file mode 100644 index 0000000..b89b454 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/.releaseguard.yml @@ -0,0 +1,58 @@ +# ReleaseGuard policy for llama-index-callbacks-hdp Python artifacts +# https://github.com/Helixar-AI/ReleaseGuard +# +# Scans the built wheel and sdist before they are published to PyPI. +# Run locally: releaseguard check ./dist +version: 2 + +project: + name: llama-index-callbacks-hdp + mode: release + +inputs: + - path: ./dist + type: directory + +sbom: + enabled: true + ecosystems: [python] + formats: [cyclonedx] + enrich_cve: false + +scanning: + secrets: + enabled: true + metadata: + enabled: true + fail_on_source_maps: false + fail_on_internal_urls: false + fail_on_build_paths: false + unexpected_files: + enabled: true + deny: + - ".env" + - "*.bak" + - "*.tmp" + - "*.key" + - "*.pem" + licenses: + enabled: true + require: + - LICENSE + +transforms: + add_checksums: false + add_manifest: false + +policy: + fail_on: + - severity: critical + - category: secret + warn_on: + - severity: high + +output: + reports: + - cli + - sarif + directory: ./.releaseguard diff --git a/packages/llama-index-callbacks-hdp/README.md b/packages/llama-index-callbacks-hdp/README.md new file mode 100644 index 0000000..a7c7e67 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/README.md @@ -0,0 +1,85 @@ +# llama-index-callbacks-hdp + +HDP (Human Delegation Provenance) integration for LlamaIndex — cryptographic authorization provenance for agents and RAG pipelines. + +HDP answers the question that observability tools like Arize Phoenix and Langfuse cannot: **who authorized this agent run, under what scope, and can you prove it offline?** + +Every tool call, retrieval step, and LLM invocation is recorded in a tamper-evident, cryptographically signed delegation chain. The chain is fully verifiable offline — no network calls, no central registry. + +## Installation + +```bash +pip install llama-index-callbacks-hdp +``` + +## Usage + +### Option 1 — Modern instrumentation dispatcher (LlamaIndex ≥0.10.20) + +```python +from llama_index.callbacks.hdp import HdpInstrumentationHandler, HdpPrincipal, ScopePolicy + +HdpInstrumentationHandler.init( + signing_key=ed25519_private_key_bytes, + principal=HdpPrincipal(id="alice@corp.com", id_type="email"), + scope=ScopePolicy( + intent="Research pipeline", + authorized_tools=["web_search", "retriever"], + max_hops=10, + ), + on_token_ready=lambda token: print(token["header"]["token_id"]), +) +``` + +### Option 2 — Legacy CallbackManager + +```python +from llama_index.callbacks.hdp import HdpCallbackHandler, HdpPrincipal, ScopePolicy +from llama_index.core import Settings +from llama_index.core.callbacks import CallbackManager + +handler = HdpCallbackHandler( + signing_key=ed25519_private_key_bytes, + principal=HdpPrincipal(id="alice@corp.com", id_type="email"), + scope=ScopePolicy(intent="Research pipeline"), +) +Settings.callback_manager = CallbackManager([handler]) +``` + +### Option 3 — Node postprocessor (inline retrieval enforcement) + +```python +from llama_index.callbacks.hdp import HdpNodePostprocessor + +postprocessor = HdpNodePostprocessor( + signing_key=ed25519_private_key_bytes, + strict=False, + check_data_classification=True, +) +query_engine = index.as_query_engine(node_postprocessors=[postprocessor]) +``` + +### Verifying a token + +```python +from llama_index.callbacks.hdp import verify_chain + +result = verify_chain(token_dict, public_key_bytes) +if result.valid: + print(f"Chain verified: {result.hop_count} hops") +``` + +## What makes HDP different from Arize/Langfuse? + +| Capability | Arize / Langfuse | HDP | +|---|---|---| +| Records what happened | ✓ | ✓ | +| Records who authorized it | ✗ | ✓ | +| Cryptographically signed | ✗ | ✓ | +| Verifiable offline | ✗ | ✓ | +| Scope enforcement | ✗ | ✓ | +| No central registry | n/a | ✓ | + +## License + +CC-BY-4.0 diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/__init__.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/__init__.py new file mode 100644 index 0000000..ffd13e9 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/__init__.py @@ -0,0 +1,32 @@ +"""HDP integration for LlamaIndex — cryptographic authorization provenance.""" + +from ._types import DataClassification, HdpPrincipal, HdpScope, HdpToken, HopRecord +from .callbacks import HdpCallbackHandler, HDPScopeViolationError, ScopePolicy +from .instrumentation import HdpInstrumentationHandler +from .postprocessor import HdpNodePostprocessor +from .session import clear_token, get_token, set_token +from .verify import HopVerification, VerificationResult, verify_chain + +__all__ = [ + # Core types + "DataClassification", + "HdpPrincipal", + "HdpScope", + "HdpToken", + "HopRecord", + # Policy + "ScopePolicy", + "HDPScopeViolationError", + # Integration layers + "HdpCallbackHandler", + "HdpInstrumentationHandler", + "HdpNodePostprocessor", + # Session + "get_token", + "set_token", + "clear_token", + # Verification + "verify_chain", + "VerificationResult", + "HopVerification", +] diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_crypto.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_crypto.py new file mode 100644 index 0000000..507d994 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_crypto.py @@ -0,0 +1,71 @@ +"""Cryptographic primitives for HDP — Ed25519 signing/verification with RFC 8785 canonical JSON. + +Matches the signing scheme in the TypeScript SDK (src/crypto/sign.ts + src/crypto/verify.ts): + - Root: canonicalize({header, principal, scope}) → Ed25519 → base64url + - Hop: canonicalize({chain: [...], root_sig: }) → Ed25519 → base64url +""" + +from __future__ import annotations + +import base64 +from typing import Any + +import jcs +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey + + +def _b64url(sig_bytes: bytes) -> str: + return base64.urlsafe_b64encode(sig_bytes).rstrip(b"=").decode() + + +def _canonicalize(obj: Any) -> bytes: + return jcs.canonicalize(obj) + + +def sign_root(unsigned_token: dict, private_key_bytes: bytes, kid: str) -> dict: + subset = {f: unsigned_token[f] for f in ["header", "principal", "scope"] if f in unsigned_token} + message = _canonicalize(subset) + key = Ed25519PrivateKey.from_private_bytes(private_key_bytes) + sig_bytes = key.sign(message) + return { + "alg": "Ed25519", + "kid": kid, + "value": _b64url(sig_bytes), + "signed_fields": ["header", "principal", "scope"], + } + + +def sign_hop(cumulative_chain: list[dict], root_sig_value: str, private_key_bytes: bytes) -> str: + payload = {"chain": cumulative_chain, "root_sig": root_sig_value} + message = _canonicalize(payload) + key = Ed25519PrivateKey.from_private_bytes(private_key_bytes) + sig_bytes = key.sign(message) + return _b64url(sig_bytes) + + +def _b64url_decode(s: str) -> bytes: + padding = 4 - len(s) % 4 + return base64.urlsafe_b64decode(s + "=" * padding) + + +def verify_root(token: dict, public_key: Ed25519PublicKey) -> bool: + try: + subset = {f: token[f] for f in ["header", "principal", "scope"] if f in token} + message = _canonicalize(subset) + sig_bytes = _b64url_decode(token["signature"]["value"]) + public_key.verify(sig_bytes, message) + return True + except (InvalidSignature, KeyError, Exception): + return False + + +def verify_hop(cumulative_chain: list[dict], root_sig_value: str, hop_signature: str, public_key: Ed25519PublicKey) -> bool: + try: + payload = {"chain": cumulative_chain, "root_sig": root_sig_value} + message = _canonicalize(payload) + sig_bytes = _b64url_decode(hop_signature) + public_key.verify(sig_bytes, message) + return True + except (InvalidSignature, Exception): + return False diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_types.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_types.py new file mode 100644 index 0000000..100b0d0 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_types.py @@ -0,0 +1,69 @@ +"""Python types mirroring the HDP TypeScript SDK schema.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal, Optional + +DataClassification = Literal["public", "internal", "confidential", "restricted"] +AgentType = Literal["orchestrator", "sub-agent", "tool-executor", "custom"] +PrincipalIdType = Literal["email", "uuid", "did", "poh", "opaque"] + + +@dataclass +class HdpHeader: + token_id: str + issued_at: int + expires_at: int + session_id: str + version: str = "0.1" + parent_token_id: Optional[str] = None + + +@dataclass +class HdpPrincipal: + id: str + id_type: PrincipalIdType + display_name: Optional[str] = None + metadata: Optional[dict[str, Any]] = None + + +@dataclass +class HdpScope: + intent: str + data_classification: DataClassification + network_egress: bool + persistence: bool + authorized_tools: Optional[list[str]] = None + authorized_resources: Optional[list[str]] = None + max_hops: Optional[int] = None + + +@dataclass +class HdpSignature: + alg: str + kid: str + value: str + signed_fields: list[str] = field(default_factory=lambda: ["header", "principal", "scope"]) + + +@dataclass +class HopRecord: + seq: int + agent_id: str + agent_type: AgentType + timestamp: int + action_summary: str + parent_hop: int + hop_signature: str + agent_fingerprint: Optional[str] = None + + +@dataclass +class HdpToken: + hdp: str + header: HdpHeader + principal: HdpPrincipal + scope: HdpScope + chain: list[HopRecord] + signature: HdpSignature diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/callbacks.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/callbacks.py new file mode 100644 index 0000000..3616019 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/callbacks.py @@ -0,0 +1,323 @@ +"""HdpCallbackHandler — legacy CallbackManager integration for LlamaIndex. + +For users on LlamaIndex <0.10.20 or who prefer configuring via Settings.callback_manager. + +Usage: + from llama_index.callbacks.hdp import HdpCallbackHandler, HdpPrincipal, ScopePolicy + from llama_index.core import Settings + from llama_index.core.callbacks import CallbackManager + + handler = HdpCallbackHandler( + signing_key=ed25519_private_key_bytes, + principal=HdpPrincipal(id="alice@corp.com", id_type="email"), + scope=ScopePolicy( + intent="Research pipeline", + authorized_tools=["web_search", "retriever"], + ), + ) + Settings.callback_manager = CallbackManager([handler]) + + # Run your query engine / agent as normal. Retrieve the token after: + token = handler.export_token() +""" + +from __future__ import annotations + +import logging +import time +import uuid +from typing import Any, Callable, Optional + +from llama_index.core.callbacks import CBEventType, EventPayload +from llama_index.core.callbacks.base_handler import BaseCallbackHandler + +from ._crypto import sign_hop, sign_root +from ._types import DataClassification, HdpPrincipal +from .session import get_token, set_token + +logger = logging.getLogger(__name__) + + +class HDPScopeViolationError(Exception): + """Raised when an agent attempts to use a tool outside the authorized scope.""" + + def __init__(self, tool: str, authorized_tools: list[str]) -> None: + self.tool = tool + self.authorized_tools = authorized_tools + super().__init__(f"Tool '{tool}' is not in the authorized scope {authorized_tools}") + + +class ScopePolicy: + """Human-readable policy that maps to the HDP scope field.""" + + def __init__( + self, + intent: str, + data_classification: DataClassification = "internal", + network_egress: bool = True, + persistence: bool = False, + authorized_tools: Optional[list[str]] = None, + authorized_resources: Optional[list[str]] = None, + max_hops: Optional[int] = None, + ) -> None: + self.intent = intent + self.data_classification = data_classification + self.network_egress = network_egress + self.persistence = persistence + self.authorized_tools = authorized_tools + self.authorized_resources = authorized_resources + self.max_hops = max_hops + + def to_dict(self) -> dict: + d: dict = { + "intent": self.intent, + "data_classification": self.data_classification, + "network_egress": self.network_egress, + "persistence": self.persistence, + } + if self.authorized_tools is not None: + d["authorized_tools"] = self.authorized_tools + if self.authorized_resources is not None: + d["authorized_resources"] = self.authorized_resources + if self.max_hops is not None: + d["max_hops"] = self.max_hops + return d + + +class HdpCallbackHandler(BaseCallbackHandler): + """HDP audit trail via LlamaIndex's legacy CallbackManager. + + Hooks into start_trace / end_trace for token lifecycle, and + on_event_start / on_event_end for tool calls and LLM events. + + All HDP operations are non-blocking by default: failures are logged + and execution continues. Set strict=True to raise HDPScopeViolationError + on scope violations. + """ + + def __init__( + self, + signing_key: bytes, + principal: HdpPrincipal, + scope: ScopePolicy, + key_id: str = "default", + expires_in_ms: int = 24 * 60 * 60 * 1000, + strict: bool = False, + on_token_ready: Optional[Callable[[dict], None]] = None, + ) -> None: + super().__init__(event_starts_to_ignore=[], event_ends_to_ignore=[]) + self._signing_key = signing_key + self._principal = principal + self._scope = scope + self._key_id = key_id + self._expires_in_ms = expires_in_ms + self._strict = strict + self._on_token_ready = on_token_ready + self._hop_seq = 0 + + # ------------------------------------------------------------------ + # BaseCallbackHandler abstract methods + # ------------------------------------------------------------------ + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """Issue the HDP root token. Called at the start of each query.""" + try: + session_id = trace_id or str(uuid.uuid4()) + now = int(time.time() * 1000) + unsigned: dict = { + "hdp": "0.1", + "header": { + "token_id": str(uuid.uuid4()), + "issued_at": now, + "expires_at": now + self._expires_in_ms, + "session_id": session_id, + "version": "0.1", + }, + "principal": self._build_principal_dict(), + "scope": self._scope.to_dict(), + "chain": [], + } + signature = sign_root(unsigned, self._signing_key, self._key_id) + token = {**unsigned, "signature": signature} + set_token(token) + self._hop_seq = 0 + logger.debug("HDP root token issued: %s", token["header"]["token_id"]) + except Exception as exc: + logger.warning("HDP start_trace failed (non-blocking): %s", exc) + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[dict[str, list[str]]] = None, + ) -> None: + """Finalize the HDP token and invoke on_token_ready if configured.""" + try: + token = get_token() + if token is not None and self._on_token_ready is not None: + self._on_token_ready(token) + except Exception as exc: + logger.warning("HDP end_trace failed (non-blocking): %s", exc) + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[dict[str, Any]] = None, + event_id: str = "", + parent_id: str = "", + **kwargs: Any, + ) -> str: + try: + if event_type == CBEventType.FUNCTION_CALL: + self._handle_tool_start(payload or {}) + elif event_type == CBEventType.LLM: + self._handle_llm_start(payload or {}) + elif event_type == CBEventType.QUERY: + self._handle_query_start(payload or {}) + elif event_type == CBEventType.EXCEPTION: + self._handle_exception(payload or {}) + except HDPScopeViolationError: + raise + except Exception as exc: + logger.warning("HDP on_event_start failed (non-blocking): %s", exc) + return event_id + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + try: + if event_type == CBEventType.FUNCTION_CALL: + self._handle_tool_end(payload or {}) + except Exception as exc: + logger.warning("HDP on_event_end failed (non-blocking): %s", exc) + + # ------------------------------------------------------------------ + # Inspection + # ------------------------------------------------------------------ + + def export_token(self) -> Optional[dict]: + """Return the current token dict from the ContextVar.""" + return get_token() + + # ------------------------------------------------------------------ + # Internal handlers + # ------------------------------------------------------------------ + + def _handle_tool_start(self, payload: dict) -> None: + tool = payload.get(EventPayload.TOOL) + tool_name: str = getattr(tool, "name", str(tool)) if tool is not None else "unknown-tool" + + authorized = self._scope.authorized_tools + if authorized is not None and tool_name not in authorized: + if self._strict: + raise HDPScopeViolationError(tool_name, authorized) + logger.warning( + "HDP scope violation: tool '%s' not in authorized_tools %s", + tool_name, + authorized, + ) + self._record_scope_violation(tool_name) + + self._extend_chain(action_summary=f"tool_call: {tool_name}") + + def _handle_tool_end(self, payload: dict) -> None: + output = payload.get(EventPayload.FUNCTION_OUTPUT) + if output is not None: + token = get_token() + if token and token.get("chain"): + last_hop = token["chain"][-1] + last_hop["metadata"] = { + **last_hop.get("metadata", {}), + "tool_output_preview": str(output)[:200], + } + + def _handle_llm_start(self, payload: dict) -> None: + model_name = payload.get(EventPayload.MODEL_NAME) or payload.get("model_name", "") + if model_name: + token = get_token() + if token and token.get("chain"): + last_hop = token["chain"][-1] + last_hop["metadata"] = { + **last_hop.get("metadata", {}), + "llm_model": model_name, + } + + def _handle_query_start(self, payload: dict) -> None: + query_str = payload.get(EventPayload.QUERY_STR, "") + if query_str: + token = get_token() + if token: + scope = token.get("scope", {}) + token["scope"] = { + **scope, + "extensions": { + **scope.get("extensions", {}), + "query_intent": str(query_str)[:500], + }, + } + + def _handle_exception(self, payload: dict) -> None: + exc = payload.get(EventPayload.EXCEPTION) + if exc is not None: + self._record_anomaly(f"exception: {type(exc).__name__}: {str(exc)[:200]}") + + def _extend_chain(self, action_summary: str) -> None: + token = get_token() + if token is None: + return + + max_hops = self._scope.max_hops + if max_hops is not None and self._hop_seq >= max_hops: + logger.warning("HDP max_hops (%d) reached — skipping hop", max_hops) + return + + self._hop_seq += 1 + unsigned_hop: dict = { + "seq": self._hop_seq, + "agent_id": "llama-index-agent", + "agent_type": "tool-executor", + "timestamp": int(time.time() * 1000), + "action_summary": action_summary, + "parent_hop": self._hop_seq - 1, + } + + current_chain: list = token.get("chain", []) + cumulative = [*current_chain, unsigned_hop] + hop_sig = sign_hop(cumulative, token["signature"]["value"], self._signing_key) + signed_hop = {**unsigned_hop, "hop_signature": hop_sig} + token = {**token, "chain": [*current_chain, signed_hop]} + set_token(token) + logger.debug("HDP hop %d recorded: %s", self._hop_seq, action_summary) + + def _record_scope_violation(self, tool: str) -> None: + token = get_token() + if token is None: + return + scope = token.get("scope", {}) + extensions = scope.get("extensions", {}) + violations: list = extensions.get("scope_violations", []) + violations.append({"tool": tool, "timestamp": int(time.time() * 1000)}) + token["scope"] = {**scope, "extensions": {**extensions, "scope_violations": violations}} + set_token(token) + + def _record_anomaly(self, description: str) -> None: + token = get_token() + if token is None: + return + scope = token.get("scope", {}) + extensions = scope.get("extensions", {}) + anomalies: list = extensions.get("anomalies", []) + anomalies.append({"description": description, "timestamp": int(time.time() * 1000)}) + token["scope"] = {**scope, "extensions": {**extensions, "anomalies": anomalies}} + set_token(token) + + def _build_principal_dict(self) -> dict: + d: dict = {"id": self._principal.id, "id_type": self._principal.id_type} + if self._principal.display_name is not None: + d["display_name"] = self._principal.display_name + if self._principal.metadata is not None: + d["metadata"] = self._principal.metadata + return d diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/instrumentation.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/instrumentation.py new file mode 100644 index 0000000..26f0b2e --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/instrumentation.py @@ -0,0 +1,335 @@ +"""HdpInstrumentationHandler — modern instrumentation dispatcher integration for LlamaIndex. + +For LlamaIndex >=0.10.20. Hooks into the root instrumentation dispatcher via +BaseEventHandler and BaseSpanHandler. + +Usage: + from llama_index.callbacks.hdp import HdpInstrumentationHandler, HdpPrincipal, ScopePolicy + + HdpInstrumentationHandler.init( + signing_key=ed25519_private_key_bytes, + principal=HdpPrincipal(id="alice@corp.com", id_type="email"), + scope=ScopePolicy( + intent="Research pipeline", + authorized_tools=["web_search", "retriever"], + ), + on_token_ready=lambda token: print(token["header"]["token_id"]), + ) + + # Run your query engine / agent as normal. The root dispatcher captures all events. +""" + +from __future__ import annotations + +import logging +import time +import uuid +from typing import Any, Callable, Optional, Type + +from llama_index.core.instrumentation.event_handlers import BaseEventHandler +from llama_index.core.instrumentation.events import BaseEvent +from llama_index.core.instrumentation.events.agent import ( + AgentRunStepEndEvent, + AgentRunStepStartEvent, + AgentToolCallEvent, +) +from llama_index.core.instrumentation.events.llm import ( + LLMChatEndEvent, + LLMChatStartEvent, +) +from llama_index.core.instrumentation.events.query import ( + QueryEndEvent, + QueryStartEvent, +) +from llama_index.core.instrumentation.span_handlers import BaseSpanHandler +from llama_index.core.instrumentation.span import BaseSpan + +from ._crypto import sign_hop, sign_root +from ._types import DataClassification, HdpPrincipal +from .callbacks import HDPScopeViolationError, ScopePolicy +from .session import get_token, set_token + +logger = logging.getLogger(__name__) + + +class _HdpSpan(BaseSpan): + """Minimal span that carries the HDP token ID for trace correlation.""" + hdp_token_id: Optional[str] = None + + +class HdpSpanHandler(BaseSpanHandler[_HdpSpan]): + """Tags each span with the active HDP token ID for cross-tool trace correlation.""" + + def new_span( + self, + id_: str, + bound_args: Any, + instance: Optional[Any] = None, + parent_span_id: Optional[str] = None, + tags: Optional[dict[str, Any]] = None, + **kwargs: Any, + ) -> Optional[_HdpSpan]: + try: + token = get_token() + token_id = token["header"]["token_id"] if token else None + return _HdpSpan( + id_=id_, + parent_id=parent_span_id, + tags={**(tags or {}), "hdp_token_id": token_id}, + hdp_token_id=token_id, + ) + except Exception as exc: + logger.debug("HDP span creation failed (non-blocking): %s", exc) + return None + + def prepare_to_exit_span( + self, + id_: str, + bound_args: Any, + instance: Optional[Any] = None, + result: Optional[Any] = None, + **kwargs: Any, + ) -> Optional[_HdpSpan]: + return self.open_spans.get(id_) + + def prepare_to_drop_span( + self, + id_: str, + bound_args: Any, + instance: Optional[Any] = None, + err: Optional[BaseException] = None, + **kwargs: Any, + ) -> Optional[_HdpSpan]: + span = self.open_spans.get(id_) + if span is not None: + logger.debug( + "HDP span dropped: %s (token: %s, error: %s)", + id_, + span.hdp_token_id, + err, + ) + return span + + +class HdpEventHandler(BaseEventHandler): + """Routes LlamaIndex instrumentation events to HDP chain operations.""" + + def __init__( + self, + signing_key: bytes, + principal: HdpPrincipal, + scope: ScopePolicy, + key_id: str, + expires_in_ms: int, + strict: bool, + on_token_ready: Optional[Callable[[dict], None]], + ) -> None: + self._signing_key = signing_key + self._principal = principal + self._scope = scope + self._key_id = key_id + self._expires_in_ms = expires_in_ms + self._strict = strict + self._on_token_ready = on_token_ready + self._hop_seq = 0 + + @classmethod + def class_name(cls) -> str: + return "HdpEventHandler" + + def handle(self, event: BaseEvent, **kwargs: Any) -> None: + try: + if isinstance(event, QueryStartEvent): + self._on_query_start(event) + elif isinstance(event, AgentToolCallEvent): + self._on_tool_call(event) + elif isinstance(event, LLMChatStartEvent): + self._on_llm_start(event) + elif isinstance(event, LLMChatEndEvent): + self._on_llm_end(event) + elif isinstance(event, QueryEndEvent): + self._on_query_end(event) + except Exception as exc: + logger.warning("HDP event handler failed (non-blocking): %s", exc) + + # ------------------------------------------------------------------ + # Event handlers + # ------------------------------------------------------------------ + + def _on_query_start(self, event: QueryStartEvent) -> None: + session_id = str(event.id_) if event.id_ else str(uuid.uuid4()) + now = int(time.time() * 1000) + unsigned: dict = { + "hdp": "0.1", + "header": { + "token_id": str(uuid.uuid4()), + "issued_at": now, + "expires_at": now + self._expires_in_ms, + "session_id": session_id, + "version": "0.1", + }, + "principal": self._build_principal_dict(), + "scope": self._scope.to_dict(), + "chain": [], + } + signature = sign_root(unsigned, self._signing_key, self._key_id) + token = {**unsigned, "signature": signature} + set_token(token) + self._hop_seq = 0 + logger.debug("HDP root token issued: %s", token["header"]["token_id"]) + + def _on_tool_call(self, event: AgentToolCallEvent) -> None: + tool_name: str = "" + if hasattr(event, "tool") and event.tool is not None: + tool_name = getattr(event.tool, "name", str(event.tool)) + elif hasattr(event, "tool_name"): + tool_name = str(event.tool_name) + tool_name = tool_name or "unknown-tool" + + authorized = self._scope.authorized_tools + if authorized is not None and tool_name not in authorized: + if self._strict: + raise HDPScopeViolationError(tool_name, authorized) + logger.warning( + "HDP scope violation: tool '%s' not in authorized_tools %s", + tool_name, + authorized, + ) + self._record_scope_violation(tool_name) + + self._extend_chain(action_summary=f"tool_call: {tool_name}") + + def _on_llm_start(self, event: LLMChatStartEvent) -> None: + model_name: str = "" + if hasattr(event, "model_dict") and event.model_dict: + model_name = str(event.model_dict.get("model", "")) + if model_name: + token = get_token() + if token and token.get("chain"): + last_hop = token["chain"][-1] + last_hop["metadata"] = {**last_hop.get("metadata", {}), "llm_model": model_name} + + def _on_llm_end(self, event: LLMChatEndEvent) -> None: + pass # token lifecycle managed by query events + + def _on_query_end(self, event: QueryEndEvent) -> None: + token = get_token() + if token is not None and self._on_token_ready is not None: + try: + self._on_token_ready(token) + except Exception as exc: + logger.warning("HDP on_token_ready callback failed: %s", exc) + + # ------------------------------------------------------------------ + # Helpers (shared with HdpCallbackHandler logic) + # ------------------------------------------------------------------ + + def _extend_chain(self, action_summary: str) -> None: + token = get_token() + if token is None: + return + + max_hops = self._scope.max_hops + if max_hops is not None and self._hop_seq >= max_hops: + logger.warning("HDP max_hops (%d) reached — skipping hop", max_hops) + return + + self._hop_seq += 1 + unsigned_hop: dict = { + "seq": self._hop_seq, + "agent_id": "llama-index-agent", + "agent_type": "tool-executor", + "timestamp": int(time.time() * 1000), + "action_summary": action_summary, + "parent_hop": self._hop_seq - 1, + } + + current_chain: list = token.get("chain", []) + cumulative = [*current_chain, unsigned_hop] + hop_sig = sign_hop(cumulative, token["signature"]["value"], self._signing_key) + signed_hop = {**unsigned_hop, "hop_signature": hop_sig} + token = {**token, "chain": [*current_chain, signed_hop]} + set_token(token) + logger.debug("HDP hop %d recorded: %s", self._hop_seq, action_summary) + + def _record_scope_violation(self, tool: str) -> None: + token = get_token() + if token is None: + return + scope = token.get("scope", {}) + extensions = scope.get("extensions", {}) + violations: list = extensions.get("scope_violations", []) + violations.append({"tool": tool, "timestamp": int(time.time() * 1000)}) + token["scope"] = {**scope, "extensions": {**extensions, "scope_violations": violations}} + set_token(token) + + def _build_principal_dict(self) -> dict: + d: dict = {"id": self._principal.id, "id_type": self._principal.id_type} + if self._principal.display_name is not None: + d["display_name"] = self._principal.display_name + if self._principal.metadata is not None: + d["metadata"] = self._principal.metadata + return d + + +class HdpInstrumentationHandler: + """Entry point for the modern LlamaIndex instrumentation integration. + + Call HdpInstrumentationHandler.init() once at application startup. + It wires HdpEventHandler and HdpSpanHandler to the root dispatcher + so all downstream LlamaIndex activity is captured automatically. + """ + + @classmethod + def init( + cls, + signing_key: bytes, + principal: HdpPrincipal, + scope: ScopePolicy, + key_id: str = "default", + expires_in_ms: int = 24 * 60 * 60 * 1000, + on_violation: str = "log", + on_token_ready: Optional[Callable[[dict], None]] = None, + ) -> "HdpInstrumentationHandler": + """Wire HDP handlers to the root LlamaIndex instrumentation dispatcher. + + Args: + signing_key: Ed25519 private key bytes. + principal: HdpPrincipal identifying the authorizing human. + scope: ScopePolicy (intent, authorized_tools, max_hops, etc.). + key_id: Key identifier for rotation support. + expires_in_ms: Token TTL in milliseconds (default 24h). + on_violation: "log" (default) or "raise". + on_token_ready: Optional callback invoked with the final token at query end. + + Returns: + The HdpInstrumentationHandler instance (holds references to wired handlers). + """ + import llama_index.core.instrumentation as instrument + + strict = on_violation == "raise" + + event_handler = HdpEventHandler( + signing_key=signing_key, + principal=principal, + scope=scope, + key_id=key_id, + expires_in_ms=expires_in_ms, + strict=strict, + on_token_ready=on_token_ready, + ) + span_handler = HdpSpanHandler() + + dispatcher = instrument.get_dispatcher() + dispatcher.add_event_handler(event_handler) + dispatcher.add_span_handler(span_handler) + + instance = cls() + instance._event_handler = event_handler + instance._span_handler = span_handler + instance._dispatcher = dispatcher + return instance + + def export_token(self) -> Optional[dict]: + """Return the active token from the ContextVar.""" + return get_token() diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/postprocessor.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/postprocessor.py new file mode 100644 index 0000000..5566a36 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/postprocessor.py @@ -0,0 +1,179 @@ +"""HdpNodePostprocessor — inline scope enforcement in the LlamaIndex RAG pipeline. + +Runs after retrieval, before synthesis. Validates scope and records retrieval +as a hop in the HDP delegation chain. + +Usage: + from llama_index.callbacks.hdp import HdpNodePostprocessor + + postprocessor = HdpNodePostprocessor( + signing_key=ed25519_private_key_bytes, # same key used for HdpCallbackHandler + strict=False, + ) + + query_engine = index.as_query_engine( + node_postprocessors=[postprocessor] + ) + +The postprocessor reads the active HDP token from the ContextVar. If no token +is present (HdpCallbackHandler or HdpInstrumentationHandler not configured), +it logs a warning and returns nodes unchanged. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any, List, Optional + +from llama_index.core.postprocessor.types import BaseNodePostprocessor +from llama_index.core.schema import NodeWithScore, QueryBundle + +from ._crypto import sign_hop +from .callbacks import HDPScopeViolationError +from .session import get_token, set_token + +logger = logging.getLogger(__name__) + +_CLASSIFICATION_LEVELS = {"public": 0, "internal": 1, "confidential": 2, "restricted": 3} + + +class HdpNodePostprocessor(BaseNodePostprocessor): + """Records retrieval hops and optionally enforces data classification scope. + + Each call to _postprocess_nodes extends the active HDP token's delegation + chain with a retrieval hop. This ensures every document retrieval is + cryptographically recorded as part of the authorization provenance. + + Args: + strict: If True, raise HDPScopeViolationError on classification + violations. If False (default), log and continue. + check_data_classification: If True (default), inspect each node's + metadata for a 'classification' key and validate it against + scope.data_classification. + """ + + strict: bool = False + check_data_classification: bool = True + + def __init__( + self, + signing_key: Optional[bytes] = None, + strict: bool = False, + check_data_classification: bool = True, + ) -> None: + super().__init__() + self._signing_key = signing_key + self.strict = strict + self.check_data_classification = check_data_classification + + @classmethod + def class_name(cls) -> str: + return "HdpNodePostprocessor" + + def _postprocess_nodes( + self, + nodes: List[NodeWithScore], + query_bundle: Optional[QueryBundle] = None, + ) -> List[NodeWithScore]: + token = get_token() + if token is None: + logger.warning( + "HDP: no active token in context — retrieval not recorded. " + "Configure HdpCallbackHandler or HdpInstrumentationHandler before querying." + ) + return nodes + + query_str = "" + if query_bundle is not None: + query_str = getattr(query_bundle, "query_str", "") or "" + + if self.check_data_classification: + nodes = self._check_classification(nodes, token) + + self._extend_chain(token, nodes, query_str) + return nodes + + def _check_classification( + self, + nodes: List[NodeWithScore], + token: dict, + ) -> List[NodeWithScore]: + allowed_classification = token.get("scope", {}).get("data_classification", "internal") + allowed_level = _CLASSIFICATION_LEVELS.get(allowed_classification, 1) + violating = [] + + for node in nodes: + node_classification = node.node.metadata.get("classification", "internal") if node.node.metadata else "internal" + node_level = _CLASSIFICATION_LEVELS.get(node_classification, 1) + if node_level > allowed_level: + violating.append((node, node_classification)) + + if violating: + violated_classes = [c for _, c in violating] + msg = ( + f"HDP: retrieved nodes with classification {violated_classes} " + f"exceed allowed level '{allowed_classification}'" + ) + if self.strict: + raise HDPScopeViolationError( + tool=f"retrieval[{violated_classes}]", + authorized_tools=[f"retrieval[<={allowed_classification}]"], + ) + logger.warning(msg) + self._record_classification_violation(token, violated_classes, allowed_classification) + + return nodes + + def _extend_chain(self, token: dict, nodes: List[NodeWithScore], query_str: str) -> None: + current_chain: list = token.get("chain", []) + next_seq = len(current_chain) + 1 + + summary_parts = [f"retrieval: {len(nodes)} nodes"] + if query_str: + summary_parts.append(f"query: {query_str[:80]}") + action_summary = ", ".join(summary_parts) + + unsigned_hop: dict = { + "seq": next_seq, + "agent_id": "llama-index-retriever", + "agent_type": "tool-executor", + "timestamp": int(time.time() * 1000), + "action_summary": action_summary, + "parent_hop": next_seq - 1, + } + + try: + if self._signing_key is None: + logger.debug( + "HDP postprocessor: no signing key configured — recording unsigned retrieval hop" + ) + token = {**token, "chain": [*current_chain, {**unsigned_hop, "hop_signature": ""}]} + set_token(token) + return + + cumulative = [*current_chain, unsigned_hop] + hop_sig = sign_hop(cumulative, token["signature"]["value"], self._signing_key) + signed_hop = {**unsigned_hop, "hop_signature": hop_sig} + token = {**token, "chain": [*current_chain, signed_hop]} + set_token(token) + logger.debug("HDP retrieval hop %d recorded", next_seq) + except Exception as exc: + logger.warning("HDP postprocessor chain extension failed (non-blocking): %s", exc) + + def _record_classification_violation( + self, + token: dict, + violated_classes: list, + allowed: str, + ) -> None: + scope = token.get("scope", {}) + extensions = scope.get("extensions", {}) + violations: list = extensions.get("classification_violations", []) + violations.append({ + "violated_classifications": violated_classes, + "allowed_classification": allowed, + "timestamp": int(time.time() * 1000), + }) + token["scope"] = {**scope, "extensions": {**extensions, "classification_violations": violations}} + set_token(token) diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/session.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/session.py new file mode 100644 index 0000000..0334cd8 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/session.py @@ -0,0 +1,29 @@ +"""Shared ContextVar session state for the HDP LlamaIndex integration. + +All three layers (instrumentation handler, legacy callback handler, node +postprocessor) share a single ContextVar to hold the active HDP token for +the duration of a query. ContextVar is asyncio-safe: each task gets its own +copy, preventing cross-request token leakage. +""" + +from __future__ import annotations + +from contextvars import ContextVar +from typing import Optional + +_hdp_token: ContextVar[Optional[dict]] = ContextVar("_hdp_token", default=None) + + +def get_token() -> Optional[dict]: + """Return the active HDP token dict, or None if no query is in progress.""" + return _hdp_token.get() + + +def set_token(token: dict) -> None: + """Store a token dict as the active HDP token for the current context.""" + _hdp_token.set(token) + + +def clear_token() -> None: + """Clear the active HDP token.""" + _hdp_token.set(None) diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/verify.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/verify.py new file mode 100644 index 0000000..17fbc20 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/verify.py @@ -0,0 +1,119 @@ +"""Offline chain verification for HDP tokens. + +Usage: + from llama_index.callbacks.hdp import verify_chain + + result = verify_chain(token_dict, public_key_bytes) + if result.valid: + print(f"Chain verified: {result.hop_count} hops") + else: + print(f"Violations: {result.violations}") +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field + +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + +from ._crypto import verify_hop, verify_root + + +@dataclass +class HopVerification: + seq: int + agent_id: str + valid: bool + reason: str = "" + + +@dataclass +class VerificationResult: + valid: bool + token_id: str + session_id: str + hop_count: int + hop_results: list[HopVerification] = field(default_factory=list) + violations: list[str] = field(default_factory=list) + + @property + def depth(self) -> int: + return self.hop_count + + +def verify_chain(token: dict, public_key: Ed25519PublicKey | bytes) -> VerificationResult: + """Verify a complete HDP token — root signature and every hop in the chain. + + Args: + token: Token dict as returned by export_token() on any HDP middleware. + public_key: The human's Ed25519 public key. Pass either an Ed25519PublicKey + instance or the raw 32-byte public key bytes. + + Returns: + VerificationResult with valid=True only if every signature checks out + and no structural violations are found. + """ + if isinstance(public_key, (bytes, bytearray)): + pub = _load_raw_public_key(public_key) + else: + pub = public_key + + token_id = token.get("header", {}).get("token_id", "unknown") + session_id = token.get("header", {}).get("session_id", "unknown") + chain: list[dict] = token.get("chain", []) + violations: list[str] = [] + hop_results: list[HopVerification] = [] + + if not verify_root(token, pub): + violations.append("Root signature invalid") + return VerificationResult( + valid=False, + token_id=token_id, + session_id=session_id, + hop_count=len(chain), + violations=violations, + ) + + expires_at = token.get("header", {}).get("expires_at", 0) + now_ms = int(time.time() * 1000) + if expires_at and now_ms > expires_at: + violations.append(f"Token expired at {expires_at}") + + max_hops = token.get("scope", {}).get("max_hops") + if max_hops is not None and len(chain) > max_hops: + violations.append(f"Chain depth {len(chain)} exceeds max_hops {max_hops}") + + root_sig_value: str = token["signature"]["value"] + for i, hop in enumerate(chain): + hop_sig = hop.get("hop_signature", "") + unsigned_hop = {k: v for k, v in hop.items() if k != "hop_signature"} + cumulative = [*chain[:i], unsigned_hop] + + ok = verify_hop(cumulative, root_sig_value, hop_sig, pub) + hop_results.append(HopVerification( + seq=hop.get("seq", i + 1), + agent_id=hop.get("agent_id", "unknown"), + valid=ok, + reason="" if ok else "Hop signature invalid", + )) + if not ok: + violations.append(f"Hop {hop.get('seq', i + 1)} ({hop.get('agent_id', '?')}) signature invalid") + + for j, hop in enumerate(chain): + if hop.get("seq") != j + 1: + violations.append(f"Non-sequential seq at position {j}: expected {j + 1}, got {hop.get('seq')}") + + return VerificationResult( + valid=len(violations) == 0, + token_id=token_id, + session_id=session_id, + hop_count=len(chain), + hop_results=hop_results, + violations=violations, + ) + + +def _load_raw_public_key(raw_bytes: bytes) -> Ed25519PublicKey: + import cryptography.hazmat.primitives.asymmetric.ed25519 as _ed + return _ed.Ed25519PublicKey.from_public_bytes(raw_bytes) diff --git a/packages/llama-index-callbacks-hdp/pyproject.toml b/packages/llama-index-callbacks-hdp/pyproject.toml new file mode 100644 index 0000000..3fcb8ae --- /dev/null +++ b/packages/llama-index-callbacks-hdp/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "llama-index-callbacks-hdp" +version = "0.1.0" +description = "HDP (Human Delegation Provenance) — cryptographic authorization provenance for LlamaIndex agents" +readme = "README.md" +license = { text = "CC-BY-4.0" } +requires-python = ">=3.10" +dependencies = [ + "llama-index-core>=0.10.20,<0.15", + "cryptography>=42.0.0", + "jcs>=0.2.1", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", +] + +[project.urls] +Homepage = "https://github.com/Helixar-AI/HDP" +Repository = "https://github.com/Helixar-AI/HDP" + +[tool.hatch.build.targets.wheel] +packages = ["llama_index"] + +[tool.llamahub] +contains_example = false +import_path = "llama_index.callbacks.hdp" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] diff --git a/packages/llama-index-callbacks-hdp/tests/__init__.py b/packages/llama-index-callbacks-hdp/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/llama-index-callbacks-hdp/tests/test_callbacks.py b/packages/llama-index-callbacks-hdp/tests/test_callbacks.py new file mode 100644 index 0000000..f519b6e --- /dev/null +++ b/packages/llama-index-callbacks-hdp/tests/test_callbacks.py @@ -0,0 +1,224 @@ +"""Tests for HdpCallbackHandler — legacy CallbackManager integration.""" + +from __future__ import annotations + +import time +import pytest +import jcs +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from llama_index.core.callbacks import CBEventType, EventPayload +from llama_index.callbacks.hdp import ( + HdpCallbackHandler, + HdpPrincipal, + HDPScopeViolationError, + ScopePolicy, + verify_chain, +) +from llama_index.callbacks.hdp.session import clear_token, get_token + + +def _generate_key(): + priv = Ed25519PrivateKey.generate() + return priv.private_bytes_raw(), priv.public_key() + + +def _make_handler(scope=None, **kwargs): + key, pub = _generate_key() + handler = HdpCallbackHandler( + signing_key=key, + principal=HdpPrincipal(id="user@test.com", id_type="email"), + scope=scope or ScopePolicy(intent="Test query"), + **kwargs, + ) + return handler, key, pub + + +class FakeTool: + def __init__(self, name: str): + self.name = name + + +class TestRootTokenIssuance: + def setup_method(self): + clear_token() + + def test_start_trace_issues_root_token(self): + handler, _, _ = _make_handler() + handler.start_trace("trace-001") + token = get_token() + assert token is not None + assert token["hdp"] == "0.1" + assert token["header"]["session_id"] == "trace-001" + assert token["chain"] == [] + + def test_start_trace_without_id_generates_session(self): + handler, _, _ = _make_handler() + handler.start_trace() + token = get_token() + assert token is not None + assert token["header"]["session_id"] # some UUID was generated + + def test_root_signature_is_verifiable(self): + handler, _, pub = _make_handler() + handler.start_trace("s1") + token = get_token() + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + + def test_export_token_matches_context(self): + handler, _, _ = _make_handler() + handler.start_trace("s2") + assert handler.export_token() is get_token() + + +class TestEndTrace: + def setup_method(self): + clear_token() + + def test_end_trace_calls_on_token_ready(self): + received = [] + handler, _, _ = _make_handler(on_token_ready=received.append) + handler.start_trace("s3") + handler.end_trace("s3") + assert len(received) == 1 + assert received[0]["hdp"] == "0.1" + + def test_end_trace_without_token_is_noop(self): + handler, _, _ = _make_handler() + handler.end_trace("s3") # no start_trace called first — must not raise + + +class TestToolCallHandling: + def setup_method(self): + clear_token() + + def _tool_start(self, handler, tool_name: str, event_id="e1"): + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool(tool_name)}, + event_id=event_id, + ) + + def test_tool_call_extends_chain(self): + handler, _, _ = _make_handler() + handler.start_trace("s4") + self._tool_start(handler, "web_search") + chain = get_token()["chain"] + assert len(chain) == 1 + assert chain[0]["action_summary"] == "tool_call: web_search" + + def test_tool_call_hop_is_signed(self): + handler, _, pub = _make_handler() + handler.start_trace("s5") + self._tool_start(handler, "web_search") + result = verify_chain(get_token(), pub.public_bytes_raw()) + assert result.valid + + def test_multiple_tool_calls_build_chain(self): + handler, _, pub = _make_handler() + handler.start_trace("s6") + self._tool_start(handler, "tool_a", "e1") + self._tool_start(handler, "tool_b", "e2") + self._tool_start(handler, "tool_c", "e3") + chain = get_token()["chain"] + assert len(chain) == 3 + assert [h["seq"] for h in chain] == [1, 2, 3] + assert verify_chain(get_token(), pub.public_bytes_raw()).valid + + def test_tool_output_recorded_on_end(self): + handler, _, _ = _make_handler() + handler.start_trace("s7") + self._tool_start(handler, "web_search") + handler.on_event_end( + CBEventType.FUNCTION_CALL, + payload={EventPayload.FUNCTION_OUTPUT: "search results here"}, + event_id="e1", + ) + last_hop = get_token()["chain"][-1] + assert "tool_output_preview" in last_hop.get("metadata", {}) + + +class TestScopeEnforcement: + def setup_method(self): + clear_token() + + def test_authorized_tool_no_violation(self): + handler, _, _ = _make_handler( + scope=ScopePolicy(intent="x", authorized_tools=["web_search"]) + ) + handler.start_trace("sv1") + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("web_search")}, + ) + violations = get_token().get("scope", {}).get("extensions", {}).get("scope_violations", []) + assert violations == [] + + def test_unauthorized_tool_recorded_in_observe_mode(self): + handler, _, _ = _make_handler( + scope=ScopePolicy(intent="x", authorized_tools=["web_search"]) + ) + handler.start_trace("sv2") + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("exec_code")}, + ) + violations = get_token()["scope"]["extensions"]["scope_violations"] + assert len(violations) == 1 + assert violations[0]["tool"] == "exec_code" + + def test_strict_mode_raises(self): + handler, _, _ = _make_handler( + scope=ScopePolicy(intent="x", authorized_tools=["web_search"]), + strict=True, + ) + handler.start_trace("sv3") + with pytest.raises(HDPScopeViolationError) as exc_info: + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("exec_code")}, + ) + assert exc_info.value.tool == "exec_code" + + def test_no_authorized_tools_means_all_allowed(self): + handler, _, _ = _make_handler(scope=ScopePolicy(intent="x")) + handler.start_trace("sv4") + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("anything")}, + ) + extensions = get_token().get("scope", {}).get("extensions", {}) + assert "scope_violations" not in extensions + + def test_max_hops_enforced(self): + handler, _, _ = _make_handler(scope=ScopePolicy(intent="x", max_hops=2)) + handler.start_trace("sv5") + for i in range(5): + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool(f"tool_{i}")}, + ) + assert len(get_token()["chain"]) == 2 + + +class TestNonBlocking: + def setup_method(self): + clear_token() + + def test_bad_key_does_not_raise(self): + handler = HdpCallbackHandler( + signing_key=b"\x00" * 5, + principal=HdpPrincipal(id="u", id_type="opaque"), + scope=ScopePolicy(intent="x"), + ) + handler.start_trace("nb1") + assert get_token() is None + + def test_events_without_token_are_noop(self): + handler, _, _ = _make_handler() + # No start_trace — on_event_start must not raise + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("web_search")}, + ) diff --git a/packages/llama-index-callbacks-hdp/tests/test_postprocessor.py b/packages/llama-index-callbacks-hdp/tests/test_postprocessor.py new file mode 100644 index 0000000..f5427a8 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/tests/test_postprocessor.py @@ -0,0 +1,157 @@ +"""Tests for HdpNodePostprocessor — retrieval hop recording and scope enforcement.""" + +from __future__ import annotations + +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from llama_index.core.schema import NodeWithScore, TextNode +from llama_index.callbacks.hdp import ( + HDPScopeViolationError, + HdpNodePostprocessor, + HdpPrincipal, + ScopePolicy, + verify_chain, +) +from llama_index.callbacks.hdp.callbacks import HdpCallbackHandler +from llama_index.callbacks.hdp.session import clear_token, get_token, set_token + + +def _generate_key(): + priv = Ed25519PrivateKey.generate() + return priv.private_bytes_raw(), priv.public_key() + + +def _issue_token(signing_key: bytes, scope: ScopePolicy | None = None) -> dict: + """Issue a minimal root token and store it in the ContextVar.""" + import time, uuid + from llama_index.callbacks.hdp._crypto import sign_root + now = int(time.time() * 1000) + scope = scope or ScopePolicy(intent="test") + unsigned = { + "hdp": "0.1", + "header": { + "token_id": str(uuid.uuid4()), + "issued_at": now, + "expires_at": now + 86400000, + "session_id": "test-session", + "version": "0.1", + }, + "principal": {"id": "user@test.com", "id_type": "email"}, + "scope": scope.to_dict(), + "chain": [], + } + sig = sign_root(unsigned, signing_key, "default") + token = {**unsigned, "signature": sig} + set_token(token) + return token + + +def _make_nodes(*classifications: str) -> list[NodeWithScore]: + nodes = [] + for cls in classifications: + metadata = {"classification": cls} if cls else {} + nodes.append(NodeWithScore(node=TextNode(text="content", metadata=metadata), score=1.0)) + return nodes + + +class TestRetrievelHopRecording: + def setup_method(self): + clear_token() + + def test_records_retrieval_hop_with_signing_key(self): + key, pub = _generate_key() + _issue_token(key) + pp = HdpNodePostprocessor(signing_key=key) + nodes = _make_nodes("public", "public") + result = pp._postprocess_nodes(nodes) + assert result == nodes + chain = get_token()["chain"] + assert len(chain) == 1 + assert "retrieval: 2 nodes" in chain[0]["action_summary"] + + def test_retrieval_hop_is_signed(self): + key, pub = _generate_key() + _issue_token(key) + pp = HdpNodePostprocessor(signing_key=key) + pp._postprocess_nodes(_make_nodes("public")) + result = verify_chain(get_token(), pub.public_bytes_raw()) + assert result.valid + + def test_without_signing_key_records_unsigned_hop(self): + key, _ = _generate_key() + _issue_token(key) + pp = HdpNodePostprocessor() # no signing_key + pp._postprocess_nodes(_make_nodes("public")) + chain = get_token()["chain"] + assert len(chain) == 1 + assert chain[0]["hop_signature"] == "" + + def test_no_active_token_returns_nodes_unchanged(self): + pp = HdpNodePostprocessor() + nodes = _make_nodes("public", "internal") + result = pp._postprocess_nodes(nodes) + assert result == nodes + + def test_query_str_included_in_hop_summary(self): + from llama_index.core.schema import QueryBundle + key, _ = _generate_key() + _issue_token(key) + pp = HdpNodePostprocessor(signing_key=key) + qb = QueryBundle(query_str="what is AI?") + pp._postprocess_nodes(_make_nodes("public"), query_bundle=qb) + summary = get_token()["chain"][0]["action_summary"] + assert "what is AI?" in summary + + +class TestDataClassificationEnforcement: + def setup_method(self): + clear_token() + + def test_nodes_within_classification_pass(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="confidential")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=True) + nodes = _make_nodes("public", "internal", "confidential") + result = pp._postprocess_nodes(nodes) + assert len(result) == 3 + violations = get_token()["scope"].get("extensions", {}).get("classification_violations", []) + assert violations == [] + + def test_nodes_above_classification_logged_in_observe_mode(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="internal")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=True, strict=False) + nodes = _make_nodes("public", "restricted") # restricted > internal + result = pp._postprocess_nodes(nodes) + assert len(result) == 2 # observe mode: nodes still returned + violations = get_token()["scope"]["extensions"]["classification_violations"] + assert len(violations) == 1 + assert "restricted" in violations[0]["violated_classifications"] + + def test_strict_mode_raises_on_classification_violation(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="internal")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=True, strict=True) + with pytest.raises(HDPScopeViolationError): + pp._postprocess_nodes(_make_nodes("restricted")) + + def test_check_data_classification_false_skips_check(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="public")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=False) + # restricted nodes should pass through without any violation + nodes = _make_nodes("restricted") + result = pp._postprocess_nodes(nodes) + assert result == nodes + assert "classification_violations" not in get_token()["scope"].get("extensions", {}) + + def test_nodes_without_classification_default_to_internal(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="internal")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=True) + # Node with no classification metadata should be treated as "internal" + node = NodeWithScore(node=TextNode(text="no metadata"), score=1.0) + result = pp._postprocess_nodes([node]) + assert len(result) == 1 + assert "classification_violations" not in get_token()["scope"].get("extensions", {}) diff --git a/packages/llama-index-callbacks-hdp/tests/test_session.py b/packages/llama-index-callbacks-hdp/tests/test_session.py new file mode 100644 index 0000000..b40fec9 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/tests/test_session.py @@ -0,0 +1,74 @@ +"""Tests for ContextVar session state — isolation across concurrent async tasks.""" + +from __future__ import annotations + +import asyncio +import pytest +from llama_index.callbacks.hdp.session import clear_token, get_token, set_token + + +class TestSessionBasics: + def test_get_token_returns_none_by_default(self): + clear_token() + assert get_token() is None + + def test_set_and_get_token(self): + token = {"hdp": "0.1", "header": {"token_id": "abc"}} + set_token(token) + assert get_token() is token + clear_token() + + def test_clear_token(self): + set_token({"hdp": "0.1"}) + clear_token() + assert get_token() is None + + def test_set_overwrites_previous(self): + set_token({"id": "first"}) + set_token({"id": "second"}) + assert get_token()["id"] == "second" + clear_token() + + +class TestContextVarIsolation: + @pytest.mark.asyncio + async def test_tasks_do_not_share_token(self): + """Each asyncio task should have its own ContextVar copy.""" + results = {} + + async def task_a(): + clear_token() + set_token({"task": "a"}) + await asyncio.sleep(0.01) + results["a"] = get_token() + + async def task_b(): + clear_token() + set_token({"task": "b"}) + await asyncio.sleep(0.01) + results["b"] = get_token() + + await asyncio.gather(task_a(), task_b()) + assert results["a"]["task"] == "a" + assert results["b"]["task"] == "b" + + @pytest.mark.asyncio + async def test_child_task_inherits_parent_but_is_isolated(self): + """Child tasks inherit the parent's ContextVar at creation time but modifications + in the child do not affect the parent.""" + clear_token() + set_token({"owner": "parent"}) + + child_saw: dict = {} + + async def child(): + child_saw["initial"] = get_token() + set_token({"owner": "child"}) + child_saw["after_set"] = get_token() + + await asyncio.create_task(child()) + # Parent's token must be unchanged + assert get_token()["owner"] == "parent" + assert child_saw["initial"]["owner"] == "parent" + assert child_saw["after_set"]["owner"] == "child" + clear_token() diff --git a/packages/llama-index-callbacks-hdp/tests/test_verify.py b/packages/llama-index-callbacks-hdp/tests/test_verify.py new file mode 100644 index 0000000..eaeb907 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/tests/test_verify.py @@ -0,0 +1,136 @@ +"""Tests for offline chain verification — ported and extended from hdp-crewai.""" + +from __future__ import annotations + +import time +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from llama_index.callbacks.hdp import verify_chain +from llama_index.callbacks.hdp._crypto import sign_root, sign_hop + + +def _generate_key(): + priv = Ed25519PrivateKey.generate() + return priv.private_bytes_raw(), priv.public_key() + + +def _issue_token(key: bytes, pub_key=None, session_id="s1", expired=False, max_hops=None) -> dict: + import uuid + now = int(time.time() * 1000) + expires_at = now - 1000 if expired else now + 86400000 + scope: dict = {"intent": "test", "data_classification": "internal", "network_egress": True, "persistence": False} + if max_hops is not None: + scope["max_hops"] = max_hops + unsigned = { + "hdp": "0.1", + "header": { + "token_id": str(uuid.uuid4()), + "issued_at": now, + "expires_at": expires_at, + "session_id": session_id, + "version": "0.1", + }, + "principal": {"id": "user@test.com", "id_type": "email"}, + "scope": scope, + "chain": [], + } + sig = sign_root(unsigned, key, "k1") + return {**unsigned, "signature": sig} + + +def _add_hop(token: dict, key: bytes, action: str) -> dict: + import time + chain = token.get("chain", []) + seq = len(chain) + 1 + unsigned_hop = { + "seq": seq, + "agent_id": "test-agent", + "agent_type": "tool-executor", + "timestamp": int(time.time() * 1000), + "action_summary": action, + "parent_hop": seq - 1, + } + cumulative = [*chain, unsigned_hop] + hop_sig = sign_hop(cumulative, token["signature"]["value"], key) + return {**token, "chain": [*chain, {**unsigned_hop, "hop_signature": hop_sig}]} + + +class TestVerifyChain: + def test_empty_chain_valid(self): + key, pub = _generate_key() + token = _issue_token(key) + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + assert result.hop_count == 0 + + def test_chain_with_hops_valid(self): + key, pub = _generate_key() + token = _issue_token(key) + token = _add_hop(token, key, "tool_call: web_search") + token = _add_hop(token, key, "tool_call: retriever") + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + assert result.hop_count == 2 + + def test_accepts_raw_public_key_bytes(self): + key, pub = _generate_key() + token = _issue_token(key) + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + + def test_tampered_root_sig_fails(self): + key, pub = _generate_key() + token = _issue_token(key) + token["signature"]["value"] = token["signature"]["value"][:-4] + "XXXX" + result = verify_chain(token, pub.public_bytes_raw()) + assert not result.valid + assert any("Root signature" in v for v in result.violations) + + def test_tampered_hop_sig_fails(self): + key, pub = _generate_key() + token = _issue_token(key) + token = _add_hop(token, key, "action") + token["chain"][0]["hop_signature"] = "AAAA" + result = verify_chain(token, pub.public_bytes_raw()) + assert not result.valid + + def test_wrong_public_key_fails(self): + key, _ = _generate_key() + _, other_pub = _generate_key() + token = _issue_token(key) + result = verify_chain(token, other_pub.public_bytes_raw()) + assert not result.valid + + def test_expired_token_flagged(self): + key, pub = _generate_key() + token = _issue_token(key, expired=True) + result = verify_chain(token, pub.public_bytes_raw()) + assert any("expired" in v.lower() for v in result.violations) + + def test_max_hops_exceeded_flagged(self): + key, pub = _generate_key() + # max_hops must be set at issuance time so root sig covers it + token = _issue_token(key, max_hops=1) + token = _add_hop(token, key, "hop 1") + token = _add_hop(token, key, "hop 2") + result = verify_chain(token, pub.public_bytes_raw()) + assert any("max_hops" in v for v in result.violations) + + def test_hop_results_detail(self): + key, pub = _generate_key() + token = _issue_token(key) + token = _add_hop(token, key, "action a") + token = _add_hop(token, key, "action b") + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + assert len(result.hop_results) == 2 + assert all(hr.valid for hr in result.hop_results) + + def test_depth_property(self): + key, pub = _generate_key() + token = _issue_token(key) + for i in range(3): + token = _add_hop(token, key, f"hop {i}") + result = verify_chain(token, pub.public_bytes_raw()) + assert result.depth == 3