From 7840a198a83ff087684894cc8370e653fb889967 Mon Sep 17 00:00:00 2001 From: Siri Dalugoda Date: Wed, 8 Apr 2026 23:00:30 +1200 Subject: [PATCH 1/3] docs: add LlamaIndex integration design spec Full integration strategy covering instrumentation handler, legacy callback handler, node postprocessor, dual-publish packaging, and three-phase LlamaHub/docs PR/blog campaign. --- docs/llamaindex-integration-design.md | 266 ++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 docs/llamaindex-integration-design.md diff --git a/docs/llamaindex-integration-design.md b/docs/llamaindex-integration-design.md new file mode 100644 index 0000000..9693bef --- /dev/null +++ b/docs/llamaindex-integration-design.md @@ -0,0 +1,266 @@ +# LlamaIndex Integration — Design Spec +**Date:** 2026-04-08 +**Status:** Approved for implementation planning + +--- + +## Problem Statement + +HDP provides cryptographic chain-of-custody for agentic AI systems. It currently integrates with MCP, CrewAI, AutoGen (Python + TypeScript), Grok/xAI, and LangChain. LlamaIndex is a major gap — it is widely used for RAG pipelines, multi-step agents, and workflow orchestration, but has no HDP integration today. + +The goal is to add full native support for LlamaIndex and run a sequenced campaign to drive visibility and endorsement from the LlamaIndex maintainers and community. + +--- + +## Constraints + +- LlamaIndex's main repo (`run-llama/llama_index`) **no longer accepts new integration packages** — PRs adding a `pyproject.toml` are auto-closed. The contribution path is: publish independently to PyPI, submit to LlamaHub for discovery. +- LlamaIndex docs PRs (guides, examples for external packages) are still accepted. +- The integration must follow the LlamaIndex implicit namespace package convention for LlamaHub listing. +- No external network calls in the hot path — consistent with HDP's offline-verification design. + +--- + +## Approach: Full Integration, One Shot + +Build all three integration surfaces simultaneously and dual-publish. Run the LlamaHub + docs + blog campaign after the package is live. + +--- + +## Technical Architecture + +### Package Structure + +Two published packages: + +**`llama-index-callbacks-hdp`** (primary, PyPI + LlamaHub namespace) +- Uses the `llama_index.*` implicit namespace so users import from the standard LlamaIndex path +- Registered in LlamaHub via `[tool.llamahub]` metadata +- Category: `callbacks` (for LlamaHub discoverability) + +**`hdp-llamaindex`** (metapackage, PyPI) +- Thin wrapper that depends on `llama-index-callbacks-hdp` +- Allows `pip install hdp-llamaindex` for users who discover HDP first + +### Module Layout + +``` +llama-index-callbacks-hdp/ +├── llama_index/callbacks/hdp/ +│ ├── __init__.py ← public exports +│ ├── session.py ← shared ContextVar token state +│ ├── instrumentation.py ← Layer 1: modern dispatcher integration +│ ├── callbacks.py ← Layer 2: legacy CallbackManager integration +│ └── postprocessor.py ← Layer 3: node postprocessor +├── tests/ +└── pyproject.toml +``` + +### Shared Session State (`session.py`) + +A `ContextVar[HdpToken | None]` holds the active token for the duration of a query. This is the glue between the three layers — it works correctly across `asyncio` tasks because Python's `ContextVar` is task-local by default. + +Helper functions in this module: +- `get_token() → HdpToken | None` +- `set_token(token: HdpToken) → None` +- `clear_token() → None` + +### Token Lifecycle Mapped to LlamaIndex Events + +| LlamaIndex Signal | Layer | HDP Operation | +|-------------------|-------|---------------| +| `QueryStartEvent` / `start_trace` | 1 + 2 | `issueToken()` → store in ContextVar | +| `AgentToolCallEvent` / `FUNCTION_CALL` | 1 + 2 | `extendChain()` with tool name as action summary | +| `LLMChatStartEvent` / `LLM` start | 1 + 2 | Annotate current hop metadata | +| `ExceptionEvent` / `EXCEPTION` | 1 + 2 | Record anomaly in chain (does not invalidate token) | +| `QueryEndEvent` / `end_trace` | 1 + 2 | Finalize token, call optional `on_token_ready` callback | +| `_postprocess_nodes()` | 3 | Read token from ContextVar, validate scope, extend chain with retrieval hop | + +--- + +## Layer 1 — Modern Instrumentation Handler + +**Class:** `HdpInstrumentationHandler` implementing `BaseInstrumentationHandler` + +**Entry point:** A classmethod `init()` that wires handlers to the root `llama_index.core.instrumentation` dispatcher. + +**Parameters:** +- `signing_key` — Ed25519 private key bytes +- `principal` — `HdpPrincipal` identifying the authorizing human +- `scope` — `ScopePolicy` (intent string, authorized tools list, max hops, data classification) +- `key_id` — key identifier for rotation support +- `on_violation` — `"log"` (default) or `"raise"` +- `on_token_ready` — optional callable invoked with the final token at query end + +**Internal components wired at `init()` time:** + +`HdpEventHandler(BaseEventHandler)` +Single abstract method: `handle(event: BaseEvent)`. Dispatches on event type to issue token, extend chain, annotate hops, or finalize. + +`HdpSpanHandler(BaseSpanHandler)` +Tags each new span with the active token ID for cross-tool trace correlation. Logs span drops with token context. Does not manage token lifecycle (that belongs to the event handler). + +**Why both?** The event handler manages the delegation chain. The span handler enables token-to-span correlation for users running HDP alongside Arize Phoenix or Langfuse — their traces can be joined to the HDP audit record by token ID. + +--- + +## Layer 2 — Legacy Callback Handler + +**Class:** `HdpCallbackHandler` implementing `BaseCallbackHandler` + +For users on older LlamaIndex versions or who configure via `Settings.callback_manager`. + +**Parameters:** +- Same as Layer 1 (`signing_key`, `principal`, `scope`, `key_id`, `strict`) +- `strict=False` default (observe mode: log violations, continue); `strict=True` raises `HDPScopeViolationError` + +**The four required abstract methods:** + +`start_trace(trace_id)` +Issues root token using `trace_id` as the session ID. Stores in ContextVar. + +`end_trace(trace_id, trace_map)` +Emits/finalizes the token. If `on_token_ready` is set, calls it here. + +`on_event_start(event_type, payload, event_id, parent_id)` +Routes on `CBEventType`: +- `QUERY` → records query intent in current hop metadata +- `FUNCTION_CALL` → calls `extendChain()` with tool name from `EventPayload.TOOL` +- `LLM` → annotates hop with model name from `EventPayload.MODEL_NAME` +- `EXCEPTION` → records anomaly + +`on_event_end(event_type, payload, event_id)` +Routes on `CBEventType`: +- `FUNCTION_CALL` → records output summary from `EventPayload.FUNCTION_OUTPUT` +- All others → noop + +**Design note:** `start_trace` is the correct hook for issuing the root token, not `on_event_start(QUERY)`, because `start_trace` is always called first and provides the trace ID for session binding. This matches the pattern used by Langfuse and Arize in LlamaIndex. + +--- + +## Layer 3 — Node Postprocessor + +**Class:** `HdpNodePostprocessor` implementing `BaseNodePostprocessor` + +Runs after retrieval, before synthesis. Validates scope and records retrieval as a hop. + +**Parameters:** +- `strict=False` — observe mode (log violations, return all nodes); `strict=True` raises `HDPScopeViolationError` +- `check_data_classification=True` — if enabled, inspects node metadata for a `classification` field and validates against `scope.data_classification` + +**`_postprocess_nodes(nodes, query_bundle)` logic:** + +1. Read active token from ContextVar. If none: emit warning, return nodes unchanged. +2. Extract query string from `query_bundle`. Log it against `scope.intent` for audit purposes. +3. If `check_data_classification` is enabled: inspect each node's metadata for a `classification` key. If any node's classification exceeds the allowed level in scope: log violation or raise (depending on `strict`). +4. Call `extendChain()` with `action_summary = f"retrieval: {len(nodes)} nodes"` to record retrieval in the delegation chain. +5. Return nodes (all in observe mode; filtered or none in strict mode on violation). + +**What this enables:** A RAG pipeline where every retrieval step is recorded in the delegation chain. The final HDP token proves which human authorized which query, which tools were used, and which documents were retrieved — all cryptographically. + +--- + +## Dual-Publish Strategy + +**`llama-index-callbacks-hdp`** +- Primary package +- Follows LlamaIndex implicit namespace convention (PEP 420) +- `[tool.llamahub]` section in `pyproject.toml` for LlamaHub registration +- LlamaHub submission: PR to `run-llama/llama_hub` repo + +**`hdp-llamaindex`** +- Metapackage: `install_requires = ["llama-index-callbacks-hdp"]` +- Re-exports for convenience from `hdp_llamaindex` namespace (for users who discover HDP first) +- Maintained in the existing HDP packages directory + +--- + +## Dependencies + +| Package | Version | Purpose | +|---------|---------|---------| +| `llama-index-core` | `>=0.10.20,<0.15` | Core abstractions (BaseEventHandler, BaseCallbackHandler, BaseNodePostprocessor, instrumentation dispatcher) | +| `llama-index-instrumentation` | `>=0.1.0` | Standalone instrumentation package (BaseEventHandler, BaseSpanHandler) | +| `cryptography` | `>=42.0.0` | Ed25519 signing (bundled inline, consistent with all other HDP Python packages) | +| `jcs` | `>=0.2.1` | RFC 8785 canonical JSON (bundled inline) | + +**Note on HDP core:** All HDP Python packages (`hdp-crewai`, `hdp-autogen`, `hdp-grok`) bundle `_crypto.py` and `_types.py` inline rather than depending on a separate `hdp` PyPI package. The llamaindex package follows the same pattern. + +--- + +## Error Handling + +- **No active token:** All three layers gracefully degrade — they log a warning and do not raise. This ensures HDP does not break applications that haven't configured it at startup. +- **Scope violation (observe mode):** Log at WARNING level with full context (tool name, scope, token ID). Never raise. +- **Scope violation (strict mode):** Raise `HDPScopeViolationError` with the same context. Applications may catch this to implement custom fallback. +- **Chain extension failure:** Log at ERROR level, do not swallow. The underlying HDP SDK error propagates. + +--- + +## Testing + +Each layer has its own test module: +- `tests/test_instrumentation.py` — fires synthetic LlamaIndex events, verifies token is issued and extended correctly +- `tests/test_callbacks.py` — exercises all four abstract methods in sequence +- `tests/test_postprocessor.py` — verifies node pass-through, violation logging, and chain extension +- `tests/test_session.py` — verifies ContextVar isolation across concurrent async tasks +- `tests/test_integration.py` — end-to-end: a minimal LlamaIndex query pipeline exercising all three layers together + +--- + +## Campaign Strategy + +### Phase 1: Publish + LlamaHub Listing + +Deliverables: +- Both packages live on PyPI +- LlamaHub listing PR merged (`run-llama/llama_hub`) +- README clearly stating the differentiation: HDP records *authorization provenance*, not just telemetry + +Success metric: LlamaHub listing live, PyPI download count accumulating. Begin Phase 2 outreach once the package has at least 2 weeks of install history on PyPI — even a modest count signals real usage and gives the docs PR a concrete "this is already being used" data point. + +### Phase 2: Docs PR to run-llama/llama_index + +Target: `docs/docs/understanding/tracing/` or `docs/examples/observability/` + +What the PR contains: +- A written guide (no code artifacts) explaining the three integration paths +- Comparison to pure observability tools: what HDP adds that Arize/Langfuse don't (cryptographic provenance, offline verification, scope enforcement) +- Link to the LlamaHub listing and PyPI package + +Why this is accepted: LlamaIndex docs PRs for external integrations are open. Only new in-repo packages are rejected. + +Pitch approach: Open the PR with a link to the LlamaHub listing and download stats as social proof. Tag a maintainer (check `CODEOWNERS` for the `docs/` path). + +Success metric: PR merged. + +### Phase 3: Blog Co-Author Pitch + +Timing: After Phase 2 PR is merged. + +Target: `llamaindex.ai/blog` — they publish guest and co-authored technical posts. + +Pitch angle: +*"We built the first cryptographic chain-of-custody integration for LlamaIndex agents. Here is why authorization provenance is a different problem from observability — and why it matters for enterprise agentic systems."* + +This is a novel technical story with a concrete, already-live implementation. It is not a product announcement; it is a technical contribution. + +Outreach: LlamaIndex DevRel via their Discord `#community-announcements` channel or X (@jerryjliu0). Frame it as a technical piece that would be valuable to their enterprise-focused readers. + +Success metric: Blog post published with LlamaIndex as co-author or publisher. + +--- + +## What LlamaIndex Is Missing (The Gap This Fills) + +LlamaIndex's current observability ecosystem (Arize Phoenix, Langfuse, Wandb, AgentOps, UpTrain) answers: *what happened and when?* + +HDP answers: *who authorized it, under what scope, and can you prove it offline?* + +No existing LlamaIndex integration provides: +- A cryptographically signed record of which human principal authorized a given agent run +- A tamper-evident delegation chain showing every tool call back to its authorization root +- Offline-verifiable tokens (no network calls, no central registry) +- Scope enforcement at the retrieval layer (the postprocessor integration) + +This is the differentiated claim that makes this story compelling to the LlamaIndex maintainers and their enterprise audience. From 40c20398b031e2ffba5e80a7a582acc399cc3dba Mon Sep 17 00:00:00 2001 From: Siri Dalugoda Date: Thu, 9 Apr 2026 01:05:03 +1200 Subject: [PATCH 2/3] docs: remove spec doc from repo (moved out of git) --- docs/llamaindex-integration-design.md | 266 -------------------------- 1 file changed, 266 deletions(-) delete mode 100644 docs/llamaindex-integration-design.md diff --git a/docs/llamaindex-integration-design.md b/docs/llamaindex-integration-design.md deleted file mode 100644 index 9693bef..0000000 --- a/docs/llamaindex-integration-design.md +++ /dev/null @@ -1,266 +0,0 @@ -# LlamaIndex Integration — Design Spec -**Date:** 2026-04-08 -**Status:** Approved for implementation planning - ---- - -## Problem Statement - -HDP provides cryptographic chain-of-custody for agentic AI systems. It currently integrates with MCP, CrewAI, AutoGen (Python + TypeScript), Grok/xAI, and LangChain. LlamaIndex is a major gap — it is widely used for RAG pipelines, multi-step agents, and workflow orchestration, but has no HDP integration today. - -The goal is to add full native support for LlamaIndex and run a sequenced campaign to drive visibility and endorsement from the LlamaIndex maintainers and community. - ---- - -## Constraints - -- LlamaIndex's main repo (`run-llama/llama_index`) **no longer accepts new integration packages** — PRs adding a `pyproject.toml` are auto-closed. The contribution path is: publish independently to PyPI, submit to LlamaHub for discovery. -- LlamaIndex docs PRs (guides, examples for external packages) are still accepted. -- The integration must follow the LlamaIndex implicit namespace package convention for LlamaHub listing. -- No external network calls in the hot path — consistent with HDP's offline-verification design. - ---- - -## Approach: Full Integration, One Shot - -Build all three integration surfaces simultaneously and dual-publish. Run the LlamaHub + docs + blog campaign after the package is live. - ---- - -## Technical Architecture - -### Package Structure - -Two published packages: - -**`llama-index-callbacks-hdp`** (primary, PyPI + LlamaHub namespace) -- Uses the `llama_index.*` implicit namespace so users import from the standard LlamaIndex path -- Registered in LlamaHub via `[tool.llamahub]` metadata -- Category: `callbacks` (for LlamaHub discoverability) - -**`hdp-llamaindex`** (metapackage, PyPI) -- Thin wrapper that depends on `llama-index-callbacks-hdp` -- Allows `pip install hdp-llamaindex` for users who discover HDP first - -### Module Layout - -``` -llama-index-callbacks-hdp/ -├── llama_index/callbacks/hdp/ -│ ├── __init__.py ← public exports -│ ├── session.py ← shared ContextVar token state -│ ├── instrumentation.py ← Layer 1: modern dispatcher integration -│ ├── callbacks.py ← Layer 2: legacy CallbackManager integration -│ └── postprocessor.py ← Layer 3: node postprocessor -├── tests/ -└── pyproject.toml -``` - -### Shared Session State (`session.py`) - -A `ContextVar[HdpToken | None]` holds the active token for the duration of a query. This is the glue between the three layers — it works correctly across `asyncio` tasks because Python's `ContextVar` is task-local by default. - -Helper functions in this module: -- `get_token() → HdpToken | None` -- `set_token(token: HdpToken) → None` -- `clear_token() → None` - -### Token Lifecycle Mapped to LlamaIndex Events - -| LlamaIndex Signal | Layer | HDP Operation | -|-------------------|-------|---------------| -| `QueryStartEvent` / `start_trace` | 1 + 2 | `issueToken()` → store in ContextVar | -| `AgentToolCallEvent` / `FUNCTION_CALL` | 1 + 2 | `extendChain()` with tool name as action summary | -| `LLMChatStartEvent` / `LLM` start | 1 + 2 | Annotate current hop metadata | -| `ExceptionEvent` / `EXCEPTION` | 1 + 2 | Record anomaly in chain (does not invalidate token) | -| `QueryEndEvent` / `end_trace` | 1 + 2 | Finalize token, call optional `on_token_ready` callback | -| `_postprocess_nodes()` | 3 | Read token from ContextVar, validate scope, extend chain with retrieval hop | - ---- - -## Layer 1 — Modern Instrumentation Handler - -**Class:** `HdpInstrumentationHandler` implementing `BaseInstrumentationHandler` - -**Entry point:** A classmethod `init()` that wires handlers to the root `llama_index.core.instrumentation` dispatcher. - -**Parameters:** -- `signing_key` — Ed25519 private key bytes -- `principal` — `HdpPrincipal` identifying the authorizing human -- `scope` — `ScopePolicy` (intent string, authorized tools list, max hops, data classification) -- `key_id` — key identifier for rotation support -- `on_violation` — `"log"` (default) or `"raise"` -- `on_token_ready` — optional callable invoked with the final token at query end - -**Internal components wired at `init()` time:** - -`HdpEventHandler(BaseEventHandler)` -Single abstract method: `handle(event: BaseEvent)`. Dispatches on event type to issue token, extend chain, annotate hops, or finalize. - -`HdpSpanHandler(BaseSpanHandler)` -Tags each new span with the active token ID for cross-tool trace correlation. Logs span drops with token context. Does not manage token lifecycle (that belongs to the event handler). - -**Why both?** The event handler manages the delegation chain. The span handler enables token-to-span correlation for users running HDP alongside Arize Phoenix or Langfuse — their traces can be joined to the HDP audit record by token ID. - ---- - -## Layer 2 — Legacy Callback Handler - -**Class:** `HdpCallbackHandler` implementing `BaseCallbackHandler` - -For users on older LlamaIndex versions or who configure via `Settings.callback_manager`. - -**Parameters:** -- Same as Layer 1 (`signing_key`, `principal`, `scope`, `key_id`, `strict`) -- `strict=False` default (observe mode: log violations, continue); `strict=True` raises `HDPScopeViolationError` - -**The four required abstract methods:** - -`start_trace(trace_id)` -Issues root token using `trace_id` as the session ID. Stores in ContextVar. - -`end_trace(trace_id, trace_map)` -Emits/finalizes the token. If `on_token_ready` is set, calls it here. - -`on_event_start(event_type, payload, event_id, parent_id)` -Routes on `CBEventType`: -- `QUERY` → records query intent in current hop metadata -- `FUNCTION_CALL` → calls `extendChain()` with tool name from `EventPayload.TOOL` -- `LLM` → annotates hop with model name from `EventPayload.MODEL_NAME` -- `EXCEPTION` → records anomaly - -`on_event_end(event_type, payload, event_id)` -Routes on `CBEventType`: -- `FUNCTION_CALL` → records output summary from `EventPayload.FUNCTION_OUTPUT` -- All others → noop - -**Design note:** `start_trace` is the correct hook for issuing the root token, not `on_event_start(QUERY)`, because `start_trace` is always called first and provides the trace ID for session binding. This matches the pattern used by Langfuse and Arize in LlamaIndex. - ---- - -## Layer 3 — Node Postprocessor - -**Class:** `HdpNodePostprocessor` implementing `BaseNodePostprocessor` - -Runs after retrieval, before synthesis. Validates scope and records retrieval as a hop. - -**Parameters:** -- `strict=False` — observe mode (log violations, return all nodes); `strict=True` raises `HDPScopeViolationError` -- `check_data_classification=True` — if enabled, inspects node metadata for a `classification` field and validates against `scope.data_classification` - -**`_postprocess_nodes(nodes, query_bundle)` logic:** - -1. Read active token from ContextVar. If none: emit warning, return nodes unchanged. -2. Extract query string from `query_bundle`. Log it against `scope.intent` for audit purposes. -3. If `check_data_classification` is enabled: inspect each node's metadata for a `classification` key. If any node's classification exceeds the allowed level in scope: log violation or raise (depending on `strict`). -4. Call `extendChain()` with `action_summary = f"retrieval: {len(nodes)} nodes"` to record retrieval in the delegation chain. -5. Return nodes (all in observe mode; filtered or none in strict mode on violation). - -**What this enables:** A RAG pipeline where every retrieval step is recorded in the delegation chain. The final HDP token proves which human authorized which query, which tools were used, and which documents were retrieved — all cryptographically. - ---- - -## Dual-Publish Strategy - -**`llama-index-callbacks-hdp`** -- Primary package -- Follows LlamaIndex implicit namespace convention (PEP 420) -- `[tool.llamahub]` section in `pyproject.toml` for LlamaHub registration -- LlamaHub submission: PR to `run-llama/llama_hub` repo - -**`hdp-llamaindex`** -- Metapackage: `install_requires = ["llama-index-callbacks-hdp"]` -- Re-exports for convenience from `hdp_llamaindex` namespace (for users who discover HDP first) -- Maintained in the existing HDP packages directory - ---- - -## Dependencies - -| Package | Version | Purpose | -|---------|---------|---------| -| `llama-index-core` | `>=0.10.20,<0.15` | Core abstractions (BaseEventHandler, BaseCallbackHandler, BaseNodePostprocessor, instrumentation dispatcher) | -| `llama-index-instrumentation` | `>=0.1.0` | Standalone instrumentation package (BaseEventHandler, BaseSpanHandler) | -| `cryptography` | `>=42.0.0` | Ed25519 signing (bundled inline, consistent with all other HDP Python packages) | -| `jcs` | `>=0.2.1` | RFC 8785 canonical JSON (bundled inline) | - -**Note on HDP core:** All HDP Python packages (`hdp-crewai`, `hdp-autogen`, `hdp-grok`) bundle `_crypto.py` and `_types.py` inline rather than depending on a separate `hdp` PyPI package. The llamaindex package follows the same pattern. - ---- - -## Error Handling - -- **No active token:** All three layers gracefully degrade — they log a warning and do not raise. This ensures HDP does not break applications that haven't configured it at startup. -- **Scope violation (observe mode):** Log at WARNING level with full context (tool name, scope, token ID). Never raise. -- **Scope violation (strict mode):** Raise `HDPScopeViolationError` with the same context. Applications may catch this to implement custom fallback. -- **Chain extension failure:** Log at ERROR level, do not swallow. The underlying HDP SDK error propagates. - ---- - -## Testing - -Each layer has its own test module: -- `tests/test_instrumentation.py` — fires synthetic LlamaIndex events, verifies token is issued and extended correctly -- `tests/test_callbacks.py` — exercises all four abstract methods in sequence -- `tests/test_postprocessor.py` — verifies node pass-through, violation logging, and chain extension -- `tests/test_session.py` — verifies ContextVar isolation across concurrent async tasks -- `tests/test_integration.py` — end-to-end: a minimal LlamaIndex query pipeline exercising all three layers together - ---- - -## Campaign Strategy - -### Phase 1: Publish + LlamaHub Listing - -Deliverables: -- Both packages live on PyPI -- LlamaHub listing PR merged (`run-llama/llama_hub`) -- README clearly stating the differentiation: HDP records *authorization provenance*, not just telemetry - -Success metric: LlamaHub listing live, PyPI download count accumulating. Begin Phase 2 outreach once the package has at least 2 weeks of install history on PyPI — even a modest count signals real usage and gives the docs PR a concrete "this is already being used" data point. - -### Phase 2: Docs PR to run-llama/llama_index - -Target: `docs/docs/understanding/tracing/` or `docs/examples/observability/` - -What the PR contains: -- A written guide (no code artifacts) explaining the three integration paths -- Comparison to pure observability tools: what HDP adds that Arize/Langfuse don't (cryptographic provenance, offline verification, scope enforcement) -- Link to the LlamaHub listing and PyPI package - -Why this is accepted: LlamaIndex docs PRs for external integrations are open. Only new in-repo packages are rejected. - -Pitch approach: Open the PR with a link to the LlamaHub listing and download stats as social proof. Tag a maintainer (check `CODEOWNERS` for the `docs/` path). - -Success metric: PR merged. - -### Phase 3: Blog Co-Author Pitch - -Timing: After Phase 2 PR is merged. - -Target: `llamaindex.ai/blog` — they publish guest and co-authored technical posts. - -Pitch angle: -*"We built the first cryptographic chain-of-custody integration for LlamaIndex agents. Here is why authorization provenance is a different problem from observability — and why it matters for enterprise agentic systems."* - -This is a novel technical story with a concrete, already-live implementation. It is not a product announcement; it is a technical contribution. - -Outreach: LlamaIndex DevRel via their Discord `#community-announcements` channel or X (@jerryjliu0). Frame it as a technical piece that would be valuable to their enterprise-focused readers. - -Success metric: Blog post published with LlamaIndex as co-author or publisher. - ---- - -## What LlamaIndex Is Missing (The Gap This Fills) - -LlamaIndex's current observability ecosystem (Arize Phoenix, Langfuse, Wandb, AgentOps, UpTrain) answers: *what happened and when?* - -HDP answers: *who authorized it, under what scope, and can you prove it offline?* - -No existing LlamaIndex integration provides: -- A cryptographically signed record of which human principal authorized a given agent run -- A tamper-evident delegation chain showing every tool call back to its authorization root -- Offline-verifiable tokens (no network calls, no central registry) -- Scope enforcement at the retrieval layer (the postprocessor integration) - -This is the differentiated claim that makes this story compelling to the LlamaIndex maintainers and their enterprise audience. From f280894040c52455211c3d7ec413adcc76ac14e3 Mon Sep 17 00:00:00 2001 From: Siri Dalugoda Date: Thu, 9 Apr 2026 01:15:01 +1200 Subject: [PATCH 3/3] feat: add llama-index-callbacks-hdp and hdp-llamaindex packages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three-layer integration providing cryptographic authorization provenance for LlamaIndex agents and RAG pipelines: - Layer 1: HdpInstrumentationHandler — modern dispatcher integration (LlamaIndex >=0.10.20), hooks QueryStartEvent, AgentToolCallEvent, LLMChatStartEvent, QueryEndEvent via root dispatcher - Layer 2: HdpCallbackHandler — legacy CallbackManager integration, hooks start_trace, end_trace, on_event_start/end for all FUNCTION_CALL and LLM events - Layer 3: HdpNodePostprocessor — inline retrieval hop recording and data classification enforcement in the RAG pipeline Shared ContextVar session state isolates tokens across concurrent async tasks. All crypto primitives bundled inline (Ed25519 + RFC 8785 JCS), consistent with all other HDP Python packages. hdp-llamaindex is a thin metapackage re-exporting from the llama_index namespace for users who discover HDP first. 43 tests passing across session isolation, chain verification, callback handler, and postprocessor. --- packages/hdp-llamaindex/README.md | 11 + packages/hdp-llamaindex/pyproject.toml | 21 ++ .../src/hdp_llamaindex/__init__.py | 43 +++ packages/llama-index-callbacks-hdp/README.md | 85 +++++ .../llama_index/callbacks/hdp/__init__.py | 32 ++ .../llama_index/callbacks/hdp/_crypto.py | 71 ++++ .../llama_index/callbacks/hdp/_types.py | 69 ++++ .../llama_index/callbacks/hdp/callbacks.py | 323 +++++++++++++++++ .../callbacks/hdp/instrumentation.py | 335 ++++++++++++++++++ .../callbacks/hdp/postprocessor.py | 179 ++++++++++ .../llama_index/callbacks/hdp/session.py | 29 ++ .../llama_index/callbacks/hdp/verify.py | 119 +++++++ .../llama-index-callbacks-hdp/pyproject.toml | 37 ++ .../tests/__init__.py | 0 .../tests/test_callbacks.py | 224 ++++++++++++ .../tests/test_postprocessor.py | 157 ++++++++ .../tests/test_session.py | 74 ++++ .../tests/test_verify.py | 136 +++++++ 18 files changed, 1945 insertions(+) create mode 100644 packages/hdp-llamaindex/README.md create mode 100644 packages/hdp-llamaindex/pyproject.toml create mode 100644 packages/hdp-llamaindex/src/hdp_llamaindex/__init__.py create mode 100644 packages/llama-index-callbacks-hdp/README.md create mode 100644 packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/__init__.py create mode 100644 packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_crypto.py create mode 100644 packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_types.py create mode 100644 packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/callbacks.py create mode 100644 packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/instrumentation.py create mode 100644 packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/postprocessor.py create mode 100644 packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/session.py create mode 100644 packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/verify.py create mode 100644 packages/llama-index-callbacks-hdp/pyproject.toml create mode 100644 packages/llama-index-callbacks-hdp/tests/__init__.py create mode 100644 packages/llama-index-callbacks-hdp/tests/test_callbacks.py create mode 100644 packages/llama-index-callbacks-hdp/tests/test_postprocessor.py create mode 100644 packages/llama-index-callbacks-hdp/tests/test_session.py create mode 100644 packages/llama-index-callbacks-hdp/tests/test_verify.py diff --git a/packages/hdp-llamaindex/README.md b/packages/hdp-llamaindex/README.md new file mode 100644 index 0000000..901298a --- /dev/null +++ b/packages/hdp-llamaindex/README.md @@ -0,0 +1,11 @@ +# hdp-llamaindex + +Metapackage for users who discover HDP first. + +```bash +pip install hdp-llamaindex +``` + +This installs `llama-index-callbacks-hdp` and re-exports all classes from the `hdp_llamaindex` namespace. + +For full documentation see [llama-index-callbacks-hdp](../llama-index-callbacks-hdp/README.md). diff --git a/packages/hdp-llamaindex/pyproject.toml b/packages/hdp-llamaindex/pyproject.toml new file mode 100644 index 0000000..b17e19c --- /dev/null +++ b/packages/hdp-llamaindex/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "hdp-llamaindex" +version = "0.1.0" +description = "HDP (Human Delegation Provenance) integration for LlamaIndex — metapackage" +readme = "README.md" +license = { text = "CC-BY-4.0" } +requires-python = ">=3.10" +dependencies = [ + "llama-index-callbacks-hdp>=0.1.0", +] + +[project.urls] +Homepage = "https://github.com/Helixar-AI/HDP" +Repository = "https://github.com/Helixar-AI/HDP" + +[tool.hatch.build.targets.wheel] +packages = ["src/hdp_llamaindex"] diff --git a/packages/hdp-llamaindex/src/hdp_llamaindex/__init__.py b/packages/hdp-llamaindex/src/hdp_llamaindex/__init__.py new file mode 100644 index 0000000..60e53b0 --- /dev/null +++ b/packages/hdp-llamaindex/src/hdp_llamaindex/__init__.py @@ -0,0 +1,43 @@ +"""hdp-llamaindex — convenience re-export of the HDP LlamaIndex integration. + +Install via `pip install hdp-llamaindex` if you discover HDP first. +All classes are importable from here or from `llama_index.callbacks.hdp`. +""" + +from llama_index.callbacks.hdp import ( + DataClassification, + HdpCallbackHandler, + HdpInstrumentationHandler, + HdpNodePostprocessor, + HdpPrincipal, + HdpScope, + HdpToken, + HDPScopeViolationError, + HopRecord, + HopVerification, + ScopePolicy, + VerificationResult, + clear_token, + get_token, + set_token, + verify_chain, +) + +__all__ = [ + "DataClassification", + "HdpCallbackHandler", + "HdpInstrumentationHandler", + "HdpNodePostprocessor", + "HdpPrincipal", + "HdpScope", + "HdpToken", + "HDPScopeViolationError", + "HopRecord", + "HopVerification", + "ScopePolicy", + "VerificationResult", + "clear_token", + "get_token", + "set_token", + "verify_chain", +] diff --git a/packages/llama-index-callbacks-hdp/README.md b/packages/llama-index-callbacks-hdp/README.md new file mode 100644 index 0000000..a7c7e67 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/README.md @@ -0,0 +1,85 @@ +# llama-index-callbacks-hdp + +HDP (Human Delegation Provenance) integration for LlamaIndex — cryptographic authorization provenance for agents and RAG pipelines. + +HDP answers the question that observability tools like Arize Phoenix and Langfuse cannot: **who authorized this agent run, under what scope, and can you prove it offline?** + +Every tool call, retrieval step, and LLM invocation is recorded in a tamper-evident, cryptographically signed delegation chain. The chain is fully verifiable offline — no network calls, no central registry. + +## Installation + +```bash +pip install llama-index-callbacks-hdp +``` + +## Usage + +### Option 1 — Modern instrumentation dispatcher (LlamaIndex ≥0.10.20) + +```python +from llama_index.callbacks.hdp import HdpInstrumentationHandler, HdpPrincipal, ScopePolicy + +HdpInstrumentationHandler.init( + signing_key=ed25519_private_key_bytes, + principal=HdpPrincipal(id="alice@corp.com", id_type="email"), + scope=ScopePolicy( + intent="Research pipeline", + authorized_tools=["web_search", "retriever"], + max_hops=10, + ), + on_token_ready=lambda token: print(token["header"]["token_id"]), +) +``` + +### Option 2 — Legacy CallbackManager + +```python +from llama_index.callbacks.hdp import HdpCallbackHandler, HdpPrincipal, ScopePolicy +from llama_index.core import Settings +from llama_index.core.callbacks import CallbackManager + +handler = HdpCallbackHandler( + signing_key=ed25519_private_key_bytes, + principal=HdpPrincipal(id="alice@corp.com", id_type="email"), + scope=ScopePolicy(intent="Research pipeline"), +) +Settings.callback_manager = CallbackManager([handler]) +``` + +### Option 3 — Node postprocessor (inline retrieval enforcement) + +```python +from llama_index.callbacks.hdp import HdpNodePostprocessor + +postprocessor = HdpNodePostprocessor( + signing_key=ed25519_private_key_bytes, + strict=False, + check_data_classification=True, +) +query_engine = index.as_query_engine(node_postprocessors=[postprocessor]) +``` + +### Verifying a token + +```python +from llama_index.callbacks.hdp import verify_chain + +result = verify_chain(token_dict, public_key_bytes) +if result.valid: + print(f"Chain verified: {result.hop_count} hops") +``` + +## What makes HDP different from Arize/Langfuse? + +| Capability | Arize / Langfuse | HDP | +|---|---|---| +| Records what happened | ✓ | ✓ | +| Records who authorized it | ✗ | ✓ | +| Cryptographically signed | ✗ | ✓ | +| Verifiable offline | ✗ | ✓ | +| Scope enforcement | ✗ | ✓ | +| No central registry | n/a | ✓ | + +## License + +CC-BY-4.0 diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/__init__.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/__init__.py new file mode 100644 index 0000000..ffd13e9 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/__init__.py @@ -0,0 +1,32 @@ +"""HDP integration for LlamaIndex — cryptographic authorization provenance.""" + +from ._types import DataClassification, HdpPrincipal, HdpScope, HdpToken, HopRecord +from .callbacks import HdpCallbackHandler, HDPScopeViolationError, ScopePolicy +from .instrumentation import HdpInstrumentationHandler +from .postprocessor import HdpNodePostprocessor +from .session import clear_token, get_token, set_token +from .verify import HopVerification, VerificationResult, verify_chain + +__all__ = [ + # Core types + "DataClassification", + "HdpPrincipal", + "HdpScope", + "HdpToken", + "HopRecord", + # Policy + "ScopePolicy", + "HDPScopeViolationError", + # Integration layers + "HdpCallbackHandler", + "HdpInstrumentationHandler", + "HdpNodePostprocessor", + # Session + "get_token", + "set_token", + "clear_token", + # Verification + "verify_chain", + "VerificationResult", + "HopVerification", +] diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_crypto.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_crypto.py new file mode 100644 index 0000000..507d994 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_crypto.py @@ -0,0 +1,71 @@ +"""Cryptographic primitives for HDP — Ed25519 signing/verification with RFC 8785 canonical JSON. + +Matches the signing scheme in the TypeScript SDK (src/crypto/sign.ts + src/crypto/verify.ts): + - Root: canonicalize({header, principal, scope}) → Ed25519 → base64url + - Hop: canonicalize({chain: [...], root_sig: }) → Ed25519 → base64url +""" + +from __future__ import annotations + +import base64 +from typing import Any + +import jcs +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey + + +def _b64url(sig_bytes: bytes) -> str: + return base64.urlsafe_b64encode(sig_bytes).rstrip(b"=").decode() + + +def _canonicalize(obj: Any) -> bytes: + return jcs.canonicalize(obj) + + +def sign_root(unsigned_token: dict, private_key_bytes: bytes, kid: str) -> dict: + subset = {f: unsigned_token[f] for f in ["header", "principal", "scope"] if f in unsigned_token} + message = _canonicalize(subset) + key = Ed25519PrivateKey.from_private_bytes(private_key_bytes) + sig_bytes = key.sign(message) + return { + "alg": "Ed25519", + "kid": kid, + "value": _b64url(sig_bytes), + "signed_fields": ["header", "principal", "scope"], + } + + +def sign_hop(cumulative_chain: list[dict], root_sig_value: str, private_key_bytes: bytes) -> str: + payload = {"chain": cumulative_chain, "root_sig": root_sig_value} + message = _canonicalize(payload) + key = Ed25519PrivateKey.from_private_bytes(private_key_bytes) + sig_bytes = key.sign(message) + return _b64url(sig_bytes) + + +def _b64url_decode(s: str) -> bytes: + padding = 4 - len(s) % 4 + return base64.urlsafe_b64decode(s + "=" * padding) + + +def verify_root(token: dict, public_key: Ed25519PublicKey) -> bool: + try: + subset = {f: token[f] for f in ["header", "principal", "scope"] if f in token} + message = _canonicalize(subset) + sig_bytes = _b64url_decode(token["signature"]["value"]) + public_key.verify(sig_bytes, message) + return True + except (InvalidSignature, KeyError, Exception): + return False + + +def verify_hop(cumulative_chain: list[dict], root_sig_value: str, hop_signature: str, public_key: Ed25519PublicKey) -> bool: + try: + payload = {"chain": cumulative_chain, "root_sig": root_sig_value} + message = _canonicalize(payload) + sig_bytes = _b64url_decode(hop_signature) + public_key.verify(sig_bytes, message) + return True + except (InvalidSignature, Exception): + return False diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_types.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_types.py new file mode 100644 index 0000000..100b0d0 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/_types.py @@ -0,0 +1,69 @@ +"""Python types mirroring the HDP TypeScript SDK schema.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal, Optional + +DataClassification = Literal["public", "internal", "confidential", "restricted"] +AgentType = Literal["orchestrator", "sub-agent", "tool-executor", "custom"] +PrincipalIdType = Literal["email", "uuid", "did", "poh", "opaque"] + + +@dataclass +class HdpHeader: + token_id: str + issued_at: int + expires_at: int + session_id: str + version: str = "0.1" + parent_token_id: Optional[str] = None + + +@dataclass +class HdpPrincipal: + id: str + id_type: PrincipalIdType + display_name: Optional[str] = None + metadata: Optional[dict[str, Any]] = None + + +@dataclass +class HdpScope: + intent: str + data_classification: DataClassification + network_egress: bool + persistence: bool + authorized_tools: Optional[list[str]] = None + authorized_resources: Optional[list[str]] = None + max_hops: Optional[int] = None + + +@dataclass +class HdpSignature: + alg: str + kid: str + value: str + signed_fields: list[str] = field(default_factory=lambda: ["header", "principal", "scope"]) + + +@dataclass +class HopRecord: + seq: int + agent_id: str + agent_type: AgentType + timestamp: int + action_summary: str + parent_hop: int + hop_signature: str + agent_fingerprint: Optional[str] = None + + +@dataclass +class HdpToken: + hdp: str + header: HdpHeader + principal: HdpPrincipal + scope: HdpScope + chain: list[HopRecord] + signature: HdpSignature diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/callbacks.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/callbacks.py new file mode 100644 index 0000000..3616019 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/callbacks.py @@ -0,0 +1,323 @@ +"""HdpCallbackHandler — legacy CallbackManager integration for LlamaIndex. + +For users on LlamaIndex <0.10.20 or who prefer configuring via Settings.callback_manager. + +Usage: + from llama_index.callbacks.hdp import HdpCallbackHandler, HdpPrincipal, ScopePolicy + from llama_index.core import Settings + from llama_index.core.callbacks import CallbackManager + + handler = HdpCallbackHandler( + signing_key=ed25519_private_key_bytes, + principal=HdpPrincipal(id="alice@corp.com", id_type="email"), + scope=ScopePolicy( + intent="Research pipeline", + authorized_tools=["web_search", "retriever"], + ), + ) + Settings.callback_manager = CallbackManager([handler]) + + # Run your query engine / agent as normal. Retrieve the token after: + token = handler.export_token() +""" + +from __future__ import annotations + +import logging +import time +import uuid +from typing import Any, Callable, Optional + +from llama_index.core.callbacks import CBEventType, EventPayload +from llama_index.core.callbacks.base_handler import BaseCallbackHandler + +from ._crypto import sign_hop, sign_root +from ._types import DataClassification, HdpPrincipal +from .session import get_token, set_token + +logger = logging.getLogger(__name__) + + +class HDPScopeViolationError(Exception): + """Raised when an agent attempts to use a tool outside the authorized scope.""" + + def __init__(self, tool: str, authorized_tools: list[str]) -> None: + self.tool = tool + self.authorized_tools = authorized_tools + super().__init__(f"Tool '{tool}' is not in the authorized scope {authorized_tools}") + + +class ScopePolicy: + """Human-readable policy that maps to the HDP scope field.""" + + def __init__( + self, + intent: str, + data_classification: DataClassification = "internal", + network_egress: bool = True, + persistence: bool = False, + authorized_tools: Optional[list[str]] = None, + authorized_resources: Optional[list[str]] = None, + max_hops: Optional[int] = None, + ) -> None: + self.intent = intent + self.data_classification = data_classification + self.network_egress = network_egress + self.persistence = persistence + self.authorized_tools = authorized_tools + self.authorized_resources = authorized_resources + self.max_hops = max_hops + + def to_dict(self) -> dict: + d: dict = { + "intent": self.intent, + "data_classification": self.data_classification, + "network_egress": self.network_egress, + "persistence": self.persistence, + } + if self.authorized_tools is not None: + d["authorized_tools"] = self.authorized_tools + if self.authorized_resources is not None: + d["authorized_resources"] = self.authorized_resources + if self.max_hops is not None: + d["max_hops"] = self.max_hops + return d + + +class HdpCallbackHandler(BaseCallbackHandler): + """HDP audit trail via LlamaIndex's legacy CallbackManager. + + Hooks into start_trace / end_trace for token lifecycle, and + on_event_start / on_event_end for tool calls and LLM events. + + All HDP operations are non-blocking by default: failures are logged + and execution continues. Set strict=True to raise HDPScopeViolationError + on scope violations. + """ + + def __init__( + self, + signing_key: bytes, + principal: HdpPrincipal, + scope: ScopePolicy, + key_id: str = "default", + expires_in_ms: int = 24 * 60 * 60 * 1000, + strict: bool = False, + on_token_ready: Optional[Callable[[dict], None]] = None, + ) -> None: + super().__init__(event_starts_to_ignore=[], event_ends_to_ignore=[]) + self._signing_key = signing_key + self._principal = principal + self._scope = scope + self._key_id = key_id + self._expires_in_ms = expires_in_ms + self._strict = strict + self._on_token_ready = on_token_ready + self._hop_seq = 0 + + # ------------------------------------------------------------------ + # BaseCallbackHandler abstract methods + # ------------------------------------------------------------------ + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """Issue the HDP root token. Called at the start of each query.""" + try: + session_id = trace_id or str(uuid.uuid4()) + now = int(time.time() * 1000) + unsigned: dict = { + "hdp": "0.1", + "header": { + "token_id": str(uuid.uuid4()), + "issued_at": now, + "expires_at": now + self._expires_in_ms, + "session_id": session_id, + "version": "0.1", + }, + "principal": self._build_principal_dict(), + "scope": self._scope.to_dict(), + "chain": [], + } + signature = sign_root(unsigned, self._signing_key, self._key_id) + token = {**unsigned, "signature": signature} + set_token(token) + self._hop_seq = 0 + logger.debug("HDP root token issued: %s", token["header"]["token_id"]) + except Exception as exc: + logger.warning("HDP start_trace failed (non-blocking): %s", exc) + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[dict[str, list[str]]] = None, + ) -> None: + """Finalize the HDP token and invoke on_token_ready if configured.""" + try: + token = get_token() + if token is not None and self._on_token_ready is not None: + self._on_token_ready(token) + except Exception as exc: + logger.warning("HDP end_trace failed (non-blocking): %s", exc) + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[dict[str, Any]] = None, + event_id: str = "", + parent_id: str = "", + **kwargs: Any, + ) -> str: + try: + if event_type == CBEventType.FUNCTION_CALL: + self._handle_tool_start(payload or {}) + elif event_type == CBEventType.LLM: + self._handle_llm_start(payload or {}) + elif event_type == CBEventType.QUERY: + self._handle_query_start(payload or {}) + elif event_type == CBEventType.EXCEPTION: + self._handle_exception(payload or {}) + except HDPScopeViolationError: + raise + except Exception as exc: + logger.warning("HDP on_event_start failed (non-blocking): %s", exc) + return event_id + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + try: + if event_type == CBEventType.FUNCTION_CALL: + self._handle_tool_end(payload or {}) + except Exception as exc: + logger.warning("HDP on_event_end failed (non-blocking): %s", exc) + + # ------------------------------------------------------------------ + # Inspection + # ------------------------------------------------------------------ + + def export_token(self) -> Optional[dict]: + """Return the current token dict from the ContextVar.""" + return get_token() + + # ------------------------------------------------------------------ + # Internal handlers + # ------------------------------------------------------------------ + + def _handle_tool_start(self, payload: dict) -> None: + tool = payload.get(EventPayload.TOOL) + tool_name: str = getattr(tool, "name", str(tool)) if tool is not None else "unknown-tool" + + authorized = self._scope.authorized_tools + if authorized is not None and tool_name not in authorized: + if self._strict: + raise HDPScopeViolationError(tool_name, authorized) + logger.warning( + "HDP scope violation: tool '%s' not in authorized_tools %s", + tool_name, + authorized, + ) + self._record_scope_violation(tool_name) + + self._extend_chain(action_summary=f"tool_call: {tool_name}") + + def _handle_tool_end(self, payload: dict) -> None: + output = payload.get(EventPayload.FUNCTION_OUTPUT) + if output is not None: + token = get_token() + if token and token.get("chain"): + last_hop = token["chain"][-1] + last_hop["metadata"] = { + **last_hop.get("metadata", {}), + "tool_output_preview": str(output)[:200], + } + + def _handle_llm_start(self, payload: dict) -> None: + model_name = payload.get(EventPayload.MODEL_NAME) or payload.get("model_name", "") + if model_name: + token = get_token() + if token and token.get("chain"): + last_hop = token["chain"][-1] + last_hop["metadata"] = { + **last_hop.get("metadata", {}), + "llm_model": model_name, + } + + def _handle_query_start(self, payload: dict) -> None: + query_str = payload.get(EventPayload.QUERY_STR, "") + if query_str: + token = get_token() + if token: + scope = token.get("scope", {}) + token["scope"] = { + **scope, + "extensions": { + **scope.get("extensions", {}), + "query_intent": str(query_str)[:500], + }, + } + + def _handle_exception(self, payload: dict) -> None: + exc = payload.get(EventPayload.EXCEPTION) + if exc is not None: + self._record_anomaly(f"exception: {type(exc).__name__}: {str(exc)[:200]}") + + def _extend_chain(self, action_summary: str) -> None: + token = get_token() + if token is None: + return + + max_hops = self._scope.max_hops + if max_hops is not None and self._hop_seq >= max_hops: + logger.warning("HDP max_hops (%d) reached — skipping hop", max_hops) + return + + self._hop_seq += 1 + unsigned_hop: dict = { + "seq": self._hop_seq, + "agent_id": "llama-index-agent", + "agent_type": "tool-executor", + "timestamp": int(time.time() * 1000), + "action_summary": action_summary, + "parent_hop": self._hop_seq - 1, + } + + current_chain: list = token.get("chain", []) + cumulative = [*current_chain, unsigned_hop] + hop_sig = sign_hop(cumulative, token["signature"]["value"], self._signing_key) + signed_hop = {**unsigned_hop, "hop_signature": hop_sig} + token = {**token, "chain": [*current_chain, signed_hop]} + set_token(token) + logger.debug("HDP hop %d recorded: %s", self._hop_seq, action_summary) + + def _record_scope_violation(self, tool: str) -> None: + token = get_token() + if token is None: + return + scope = token.get("scope", {}) + extensions = scope.get("extensions", {}) + violations: list = extensions.get("scope_violations", []) + violations.append({"tool": tool, "timestamp": int(time.time() * 1000)}) + token["scope"] = {**scope, "extensions": {**extensions, "scope_violations": violations}} + set_token(token) + + def _record_anomaly(self, description: str) -> None: + token = get_token() + if token is None: + return + scope = token.get("scope", {}) + extensions = scope.get("extensions", {}) + anomalies: list = extensions.get("anomalies", []) + anomalies.append({"description": description, "timestamp": int(time.time() * 1000)}) + token["scope"] = {**scope, "extensions": {**extensions, "anomalies": anomalies}} + set_token(token) + + def _build_principal_dict(self) -> dict: + d: dict = {"id": self._principal.id, "id_type": self._principal.id_type} + if self._principal.display_name is not None: + d["display_name"] = self._principal.display_name + if self._principal.metadata is not None: + d["metadata"] = self._principal.metadata + return d diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/instrumentation.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/instrumentation.py new file mode 100644 index 0000000..26f0b2e --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/instrumentation.py @@ -0,0 +1,335 @@ +"""HdpInstrumentationHandler — modern instrumentation dispatcher integration for LlamaIndex. + +For LlamaIndex >=0.10.20. Hooks into the root instrumentation dispatcher via +BaseEventHandler and BaseSpanHandler. + +Usage: + from llama_index.callbacks.hdp import HdpInstrumentationHandler, HdpPrincipal, ScopePolicy + + HdpInstrumentationHandler.init( + signing_key=ed25519_private_key_bytes, + principal=HdpPrincipal(id="alice@corp.com", id_type="email"), + scope=ScopePolicy( + intent="Research pipeline", + authorized_tools=["web_search", "retriever"], + ), + on_token_ready=lambda token: print(token["header"]["token_id"]), + ) + + # Run your query engine / agent as normal. The root dispatcher captures all events. +""" + +from __future__ import annotations + +import logging +import time +import uuid +from typing import Any, Callable, Optional, Type + +from llama_index.core.instrumentation.event_handlers import BaseEventHandler +from llama_index.core.instrumentation.events import BaseEvent +from llama_index.core.instrumentation.events.agent import ( + AgentRunStepEndEvent, + AgentRunStepStartEvent, + AgentToolCallEvent, +) +from llama_index.core.instrumentation.events.llm import ( + LLMChatEndEvent, + LLMChatStartEvent, +) +from llama_index.core.instrumentation.events.query import ( + QueryEndEvent, + QueryStartEvent, +) +from llama_index.core.instrumentation.span_handlers import BaseSpanHandler +from llama_index.core.instrumentation.span import BaseSpan + +from ._crypto import sign_hop, sign_root +from ._types import DataClassification, HdpPrincipal +from .callbacks import HDPScopeViolationError, ScopePolicy +from .session import get_token, set_token + +logger = logging.getLogger(__name__) + + +class _HdpSpan(BaseSpan): + """Minimal span that carries the HDP token ID for trace correlation.""" + hdp_token_id: Optional[str] = None + + +class HdpSpanHandler(BaseSpanHandler[_HdpSpan]): + """Tags each span with the active HDP token ID for cross-tool trace correlation.""" + + def new_span( + self, + id_: str, + bound_args: Any, + instance: Optional[Any] = None, + parent_span_id: Optional[str] = None, + tags: Optional[dict[str, Any]] = None, + **kwargs: Any, + ) -> Optional[_HdpSpan]: + try: + token = get_token() + token_id = token["header"]["token_id"] if token else None + return _HdpSpan( + id_=id_, + parent_id=parent_span_id, + tags={**(tags or {}), "hdp_token_id": token_id}, + hdp_token_id=token_id, + ) + except Exception as exc: + logger.debug("HDP span creation failed (non-blocking): %s", exc) + return None + + def prepare_to_exit_span( + self, + id_: str, + bound_args: Any, + instance: Optional[Any] = None, + result: Optional[Any] = None, + **kwargs: Any, + ) -> Optional[_HdpSpan]: + return self.open_spans.get(id_) + + def prepare_to_drop_span( + self, + id_: str, + bound_args: Any, + instance: Optional[Any] = None, + err: Optional[BaseException] = None, + **kwargs: Any, + ) -> Optional[_HdpSpan]: + span = self.open_spans.get(id_) + if span is not None: + logger.debug( + "HDP span dropped: %s (token: %s, error: %s)", + id_, + span.hdp_token_id, + err, + ) + return span + + +class HdpEventHandler(BaseEventHandler): + """Routes LlamaIndex instrumentation events to HDP chain operations.""" + + def __init__( + self, + signing_key: bytes, + principal: HdpPrincipal, + scope: ScopePolicy, + key_id: str, + expires_in_ms: int, + strict: bool, + on_token_ready: Optional[Callable[[dict], None]], + ) -> None: + self._signing_key = signing_key + self._principal = principal + self._scope = scope + self._key_id = key_id + self._expires_in_ms = expires_in_ms + self._strict = strict + self._on_token_ready = on_token_ready + self._hop_seq = 0 + + @classmethod + def class_name(cls) -> str: + return "HdpEventHandler" + + def handle(self, event: BaseEvent, **kwargs: Any) -> None: + try: + if isinstance(event, QueryStartEvent): + self._on_query_start(event) + elif isinstance(event, AgentToolCallEvent): + self._on_tool_call(event) + elif isinstance(event, LLMChatStartEvent): + self._on_llm_start(event) + elif isinstance(event, LLMChatEndEvent): + self._on_llm_end(event) + elif isinstance(event, QueryEndEvent): + self._on_query_end(event) + except Exception as exc: + logger.warning("HDP event handler failed (non-blocking): %s", exc) + + # ------------------------------------------------------------------ + # Event handlers + # ------------------------------------------------------------------ + + def _on_query_start(self, event: QueryStartEvent) -> None: + session_id = str(event.id_) if event.id_ else str(uuid.uuid4()) + now = int(time.time() * 1000) + unsigned: dict = { + "hdp": "0.1", + "header": { + "token_id": str(uuid.uuid4()), + "issued_at": now, + "expires_at": now + self._expires_in_ms, + "session_id": session_id, + "version": "0.1", + }, + "principal": self._build_principal_dict(), + "scope": self._scope.to_dict(), + "chain": [], + } + signature = sign_root(unsigned, self._signing_key, self._key_id) + token = {**unsigned, "signature": signature} + set_token(token) + self._hop_seq = 0 + logger.debug("HDP root token issued: %s", token["header"]["token_id"]) + + def _on_tool_call(self, event: AgentToolCallEvent) -> None: + tool_name: str = "" + if hasattr(event, "tool") and event.tool is not None: + tool_name = getattr(event.tool, "name", str(event.tool)) + elif hasattr(event, "tool_name"): + tool_name = str(event.tool_name) + tool_name = tool_name or "unknown-tool" + + authorized = self._scope.authorized_tools + if authorized is not None and tool_name not in authorized: + if self._strict: + raise HDPScopeViolationError(tool_name, authorized) + logger.warning( + "HDP scope violation: tool '%s' not in authorized_tools %s", + tool_name, + authorized, + ) + self._record_scope_violation(tool_name) + + self._extend_chain(action_summary=f"tool_call: {tool_name}") + + def _on_llm_start(self, event: LLMChatStartEvent) -> None: + model_name: str = "" + if hasattr(event, "model_dict") and event.model_dict: + model_name = str(event.model_dict.get("model", "")) + if model_name: + token = get_token() + if token and token.get("chain"): + last_hop = token["chain"][-1] + last_hop["metadata"] = {**last_hop.get("metadata", {}), "llm_model": model_name} + + def _on_llm_end(self, event: LLMChatEndEvent) -> None: + pass # token lifecycle managed by query events + + def _on_query_end(self, event: QueryEndEvent) -> None: + token = get_token() + if token is not None and self._on_token_ready is not None: + try: + self._on_token_ready(token) + except Exception as exc: + logger.warning("HDP on_token_ready callback failed: %s", exc) + + # ------------------------------------------------------------------ + # Helpers (shared with HdpCallbackHandler logic) + # ------------------------------------------------------------------ + + def _extend_chain(self, action_summary: str) -> None: + token = get_token() + if token is None: + return + + max_hops = self._scope.max_hops + if max_hops is not None and self._hop_seq >= max_hops: + logger.warning("HDP max_hops (%d) reached — skipping hop", max_hops) + return + + self._hop_seq += 1 + unsigned_hop: dict = { + "seq": self._hop_seq, + "agent_id": "llama-index-agent", + "agent_type": "tool-executor", + "timestamp": int(time.time() * 1000), + "action_summary": action_summary, + "parent_hop": self._hop_seq - 1, + } + + current_chain: list = token.get("chain", []) + cumulative = [*current_chain, unsigned_hop] + hop_sig = sign_hop(cumulative, token["signature"]["value"], self._signing_key) + signed_hop = {**unsigned_hop, "hop_signature": hop_sig} + token = {**token, "chain": [*current_chain, signed_hop]} + set_token(token) + logger.debug("HDP hop %d recorded: %s", self._hop_seq, action_summary) + + def _record_scope_violation(self, tool: str) -> None: + token = get_token() + if token is None: + return + scope = token.get("scope", {}) + extensions = scope.get("extensions", {}) + violations: list = extensions.get("scope_violations", []) + violations.append({"tool": tool, "timestamp": int(time.time() * 1000)}) + token["scope"] = {**scope, "extensions": {**extensions, "scope_violations": violations}} + set_token(token) + + def _build_principal_dict(self) -> dict: + d: dict = {"id": self._principal.id, "id_type": self._principal.id_type} + if self._principal.display_name is not None: + d["display_name"] = self._principal.display_name + if self._principal.metadata is not None: + d["metadata"] = self._principal.metadata + return d + + +class HdpInstrumentationHandler: + """Entry point for the modern LlamaIndex instrumentation integration. + + Call HdpInstrumentationHandler.init() once at application startup. + It wires HdpEventHandler and HdpSpanHandler to the root dispatcher + so all downstream LlamaIndex activity is captured automatically. + """ + + @classmethod + def init( + cls, + signing_key: bytes, + principal: HdpPrincipal, + scope: ScopePolicy, + key_id: str = "default", + expires_in_ms: int = 24 * 60 * 60 * 1000, + on_violation: str = "log", + on_token_ready: Optional[Callable[[dict], None]] = None, + ) -> "HdpInstrumentationHandler": + """Wire HDP handlers to the root LlamaIndex instrumentation dispatcher. + + Args: + signing_key: Ed25519 private key bytes. + principal: HdpPrincipal identifying the authorizing human. + scope: ScopePolicy (intent, authorized_tools, max_hops, etc.). + key_id: Key identifier for rotation support. + expires_in_ms: Token TTL in milliseconds (default 24h). + on_violation: "log" (default) or "raise". + on_token_ready: Optional callback invoked with the final token at query end. + + Returns: + The HdpInstrumentationHandler instance (holds references to wired handlers). + """ + import llama_index.core.instrumentation as instrument + + strict = on_violation == "raise" + + event_handler = HdpEventHandler( + signing_key=signing_key, + principal=principal, + scope=scope, + key_id=key_id, + expires_in_ms=expires_in_ms, + strict=strict, + on_token_ready=on_token_ready, + ) + span_handler = HdpSpanHandler() + + dispatcher = instrument.get_dispatcher() + dispatcher.add_event_handler(event_handler) + dispatcher.add_span_handler(span_handler) + + instance = cls() + instance._event_handler = event_handler + instance._span_handler = span_handler + instance._dispatcher = dispatcher + return instance + + def export_token(self) -> Optional[dict]: + """Return the active token from the ContextVar.""" + return get_token() diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/postprocessor.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/postprocessor.py new file mode 100644 index 0000000..5566a36 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/postprocessor.py @@ -0,0 +1,179 @@ +"""HdpNodePostprocessor — inline scope enforcement in the LlamaIndex RAG pipeline. + +Runs after retrieval, before synthesis. Validates scope and records retrieval +as a hop in the HDP delegation chain. + +Usage: + from llama_index.callbacks.hdp import HdpNodePostprocessor + + postprocessor = HdpNodePostprocessor( + signing_key=ed25519_private_key_bytes, # same key used for HdpCallbackHandler + strict=False, + ) + + query_engine = index.as_query_engine( + node_postprocessors=[postprocessor] + ) + +The postprocessor reads the active HDP token from the ContextVar. If no token +is present (HdpCallbackHandler or HdpInstrumentationHandler not configured), +it logs a warning and returns nodes unchanged. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any, List, Optional + +from llama_index.core.postprocessor.types import BaseNodePostprocessor +from llama_index.core.schema import NodeWithScore, QueryBundle + +from ._crypto import sign_hop +from .callbacks import HDPScopeViolationError +from .session import get_token, set_token + +logger = logging.getLogger(__name__) + +_CLASSIFICATION_LEVELS = {"public": 0, "internal": 1, "confidential": 2, "restricted": 3} + + +class HdpNodePostprocessor(BaseNodePostprocessor): + """Records retrieval hops and optionally enforces data classification scope. + + Each call to _postprocess_nodes extends the active HDP token's delegation + chain with a retrieval hop. This ensures every document retrieval is + cryptographically recorded as part of the authorization provenance. + + Args: + strict: If True, raise HDPScopeViolationError on classification + violations. If False (default), log and continue. + check_data_classification: If True (default), inspect each node's + metadata for a 'classification' key and validate it against + scope.data_classification. + """ + + strict: bool = False + check_data_classification: bool = True + + def __init__( + self, + signing_key: Optional[bytes] = None, + strict: bool = False, + check_data_classification: bool = True, + ) -> None: + super().__init__() + self._signing_key = signing_key + self.strict = strict + self.check_data_classification = check_data_classification + + @classmethod + def class_name(cls) -> str: + return "HdpNodePostprocessor" + + def _postprocess_nodes( + self, + nodes: List[NodeWithScore], + query_bundle: Optional[QueryBundle] = None, + ) -> List[NodeWithScore]: + token = get_token() + if token is None: + logger.warning( + "HDP: no active token in context — retrieval not recorded. " + "Configure HdpCallbackHandler or HdpInstrumentationHandler before querying." + ) + return nodes + + query_str = "" + if query_bundle is not None: + query_str = getattr(query_bundle, "query_str", "") or "" + + if self.check_data_classification: + nodes = self._check_classification(nodes, token) + + self._extend_chain(token, nodes, query_str) + return nodes + + def _check_classification( + self, + nodes: List[NodeWithScore], + token: dict, + ) -> List[NodeWithScore]: + allowed_classification = token.get("scope", {}).get("data_classification", "internal") + allowed_level = _CLASSIFICATION_LEVELS.get(allowed_classification, 1) + violating = [] + + for node in nodes: + node_classification = node.node.metadata.get("classification", "internal") if node.node.metadata else "internal" + node_level = _CLASSIFICATION_LEVELS.get(node_classification, 1) + if node_level > allowed_level: + violating.append((node, node_classification)) + + if violating: + violated_classes = [c for _, c in violating] + msg = ( + f"HDP: retrieved nodes with classification {violated_classes} " + f"exceed allowed level '{allowed_classification}'" + ) + if self.strict: + raise HDPScopeViolationError( + tool=f"retrieval[{violated_classes}]", + authorized_tools=[f"retrieval[<={allowed_classification}]"], + ) + logger.warning(msg) + self._record_classification_violation(token, violated_classes, allowed_classification) + + return nodes + + def _extend_chain(self, token: dict, nodes: List[NodeWithScore], query_str: str) -> None: + current_chain: list = token.get("chain", []) + next_seq = len(current_chain) + 1 + + summary_parts = [f"retrieval: {len(nodes)} nodes"] + if query_str: + summary_parts.append(f"query: {query_str[:80]}") + action_summary = ", ".join(summary_parts) + + unsigned_hop: dict = { + "seq": next_seq, + "agent_id": "llama-index-retriever", + "agent_type": "tool-executor", + "timestamp": int(time.time() * 1000), + "action_summary": action_summary, + "parent_hop": next_seq - 1, + } + + try: + if self._signing_key is None: + logger.debug( + "HDP postprocessor: no signing key configured — recording unsigned retrieval hop" + ) + token = {**token, "chain": [*current_chain, {**unsigned_hop, "hop_signature": ""}]} + set_token(token) + return + + cumulative = [*current_chain, unsigned_hop] + hop_sig = sign_hop(cumulative, token["signature"]["value"], self._signing_key) + signed_hop = {**unsigned_hop, "hop_signature": hop_sig} + token = {**token, "chain": [*current_chain, signed_hop]} + set_token(token) + logger.debug("HDP retrieval hop %d recorded", next_seq) + except Exception as exc: + logger.warning("HDP postprocessor chain extension failed (non-blocking): %s", exc) + + def _record_classification_violation( + self, + token: dict, + violated_classes: list, + allowed: str, + ) -> None: + scope = token.get("scope", {}) + extensions = scope.get("extensions", {}) + violations: list = extensions.get("classification_violations", []) + violations.append({ + "violated_classifications": violated_classes, + "allowed_classification": allowed, + "timestamp": int(time.time() * 1000), + }) + token["scope"] = {**scope, "extensions": {**extensions, "classification_violations": violations}} + set_token(token) diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/session.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/session.py new file mode 100644 index 0000000..0334cd8 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/session.py @@ -0,0 +1,29 @@ +"""Shared ContextVar session state for the HDP LlamaIndex integration. + +All three layers (instrumentation handler, legacy callback handler, node +postprocessor) share a single ContextVar to hold the active HDP token for +the duration of a query. ContextVar is asyncio-safe: each task gets its own +copy, preventing cross-request token leakage. +""" + +from __future__ import annotations + +from contextvars import ContextVar +from typing import Optional + +_hdp_token: ContextVar[Optional[dict]] = ContextVar("_hdp_token", default=None) + + +def get_token() -> Optional[dict]: + """Return the active HDP token dict, or None if no query is in progress.""" + return _hdp_token.get() + + +def set_token(token: dict) -> None: + """Store a token dict as the active HDP token for the current context.""" + _hdp_token.set(token) + + +def clear_token() -> None: + """Clear the active HDP token.""" + _hdp_token.set(None) diff --git a/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/verify.py b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/verify.py new file mode 100644 index 0000000..17fbc20 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/llama_index/callbacks/hdp/verify.py @@ -0,0 +1,119 @@ +"""Offline chain verification for HDP tokens. + +Usage: + from llama_index.callbacks.hdp import verify_chain + + result = verify_chain(token_dict, public_key_bytes) + if result.valid: + print(f"Chain verified: {result.hop_count} hops") + else: + print(f"Violations: {result.violations}") +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field + +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + +from ._crypto import verify_hop, verify_root + + +@dataclass +class HopVerification: + seq: int + agent_id: str + valid: bool + reason: str = "" + + +@dataclass +class VerificationResult: + valid: bool + token_id: str + session_id: str + hop_count: int + hop_results: list[HopVerification] = field(default_factory=list) + violations: list[str] = field(default_factory=list) + + @property + def depth(self) -> int: + return self.hop_count + + +def verify_chain(token: dict, public_key: Ed25519PublicKey | bytes) -> VerificationResult: + """Verify a complete HDP token — root signature and every hop in the chain. + + Args: + token: Token dict as returned by export_token() on any HDP middleware. + public_key: The human's Ed25519 public key. Pass either an Ed25519PublicKey + instance or the raw 32-byte public key bytes. + + Returns: + VerificationResult with valid=True only if every signature checks out + and no structural violations are found. + """ + if isinstance(public_key, (bytes, bytearray)): + pub = _load_raw_public_key(public_key) + else: + pub = public_key + + token_id = token.get("header", {}).get("token_id", "unknown") + session_id = token.get("header", {}).get("session_id", "unknown") + chain: list[dict] = token.get("chain", []) + violations: list[str] = [] + hop_results: list[HopVerification] = [] + + if not verify_root(token, pub): + violations.append("Root signature invalid") + return VerificationResult( + valid=False, + token_id=token_id, + session_id=session_id, + hop_count=len(chain), + violations=violations, + ) + + expires_at = token.get("header", {}).get("expires_at", 0) + now_ms = int(time.time() * 1000) + if expires_at and now_ms > expires_at: + violations.append(f"Token expired at {expires_at}") + + max_hops = token.get("scope", {}).get("max_hops") + if max_hops is not None and len(chain) > max_hops: + violations.append(f"Chain depth {len(chain)} exceeds max_hops {max_hops}") + + root_sig_value: str = token["signature"]["value"] + for i, hop in enumerate(chain): + hop_sig = hop.get("hop_signature", "") + unsigned_hop = {k: v for k, v in hop.items() if k != "hop_signature"} + cumulative = [*chain[:i], unsigned_hop] + + ok = verify_hop(cumulative, root_sig_value, hop_sig, pub) + hop_results.append(HopVerification( + seq=hop.get("seq", i + 1), + agent_id=hop.get("agent_id", "unknown"), + valid=ok, + reason="" if ok else "Hop signature invalid", + )) + if not ok: + violations.append(f"Hop {hop.get('seq', i + 1)} ({hop.get('agent_id', '?')}) signature invalid") + + for j, hop in enumerate(chain): + if hop.get("seq") != j + 1: + violations.append(f"Non-sequential seq at position {j}: expected {j + 1}, got {hop.get('seq')}") + + return VerificationResult( + valid=len(violations) == 0, + token_id=token_id, + session_id=session_id, + hop_count=len(chain), + hop_results=hop_results, + violations=violations, + ) + + +def _load_raw_public_key(raw_bytes: bytes) -> Ed25519PublicKey: + import cryptography.hazmat.primitives.asymmetric.ed25519 as _ed + return _ed.Ed25519PublicKey.from_public_bytes(raw_bytes) diff --git a/packages/llama-index-callbacks-hdp/pyproject.toml b/packages/llama-index-callbacks-hdp/pyproject.toml new file mode 100644 index 0000000..3fcb8ae --- /dev/null +++ b/packages/llama-index-callbacks-hdp/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "llama-index-callbacks-hdp" +version = "0.1.0" +description = "HDP (Human Delegation Provenance) — cryptographic authorization provenance for LlamaIndex agents" +readme = "README.md" +license = { text = "CC-BY-4.0" } +requires-python = ">=3.10" +dependencies = [ + "llama-index-core>=0.10.20,<0.15", + "cryptography>=42.0.0", + "jcs>=0.2.1", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", +] + +[project.urls] +Homepage = "https://github.com/Helixar-AI/HDP" +Repository = "https://github.com/Helixar-AI/HDP" + +[tool.hatch.build.targets.wheel] +packages = ["llama_index"] + +[tool.llamahub] +contains_example = false +import_path = "llama_index.callbacks.hdp" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] diff --git a/packages/llama-index-callbacks-hdp/tests/__init__.py b/packages/llama-index-callbacks-hdp/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/llama-index-callbacks-hdp/tests/test_callbacks.py b/packages/llama-index-callbacks-hdp/tests/test_callbacks.py new file mode 100644 index 0000000..f519b6e --- /dev/null +++ b/packages/llama-index-callbacks-hdp/tests/test_callbacks.py @@ -0,0 +1,224 @@ +"""Tests for HdpCallbackHandler — legacy CallbackManager integration.""" + +from __future__ import annotations + +import time +import pytest +import jcs +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from llama_index.core.callbacks import CBEventType, EventPayload +from llama_index.callbacks.hdp import ( + HdpCallbackHandler, + HdpPrincipal, + HDPScopeViolationError, + ScopePolicy, + verify_chain, +) +from llama_index.callbacks.hdp.session import clear_token, get_token + + +def _generate_key(): + priv = Ed25519PrivateKey.generate() + return priv.private_bytes_raw(), priv.public_key() + + +def _make_handler(scope=None, **kwargs): + key, pub = _generate_key() + handler = HdpCallbackHandler( + signing_key=key, + principal=HdpPrincipal(id="user@test.com", id_type="email"), + scope=scope or ScopePolicy(intent="Test query"), + **kwargs, + ) + return handler, key, pub + + +class FakeTool: + def __init__(self, name: str): + self.name = name + + +class TestRootTokenIssuance: + def setup_method(self): + clear_token() + + def test_start_trace_issues_root_token(self): + handler, _, _ = _make_handler() + handler.start_trace("trace-001") + token = get_token() + assert token is not None + assert token["hdp"] == "0.1" + assert token["header"]["session_id"] == "trace-001" + assert token["chain"] == [] + + def test_start_trace_without_id_generates_session(self): + handler, _, _ = _make_handler() + handler.start_trace() + token = get_token() + assert token is not None + assert token["header"]["session_id"] # some UUID was generated + + def test_root_signature_is_verifiable(self): + handler, _, pub = _make_handler() + handler.start_trace("s1") + token = get_token() + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + + def test_export_token_matches_context(self): + handler, _, _ = _make_handler() + handler.start_trace("s2") + assert handler.export_token() is get_token() + + +class TestEndTrace: + def setup_method(self): + clear_token() + + def test_end_trace_calls_on_token_ready(self): + received = [] + handler, _, _ = _make_handler(on_token_ready=received.append) + handler.start_trace("s3") + handler.end_trace("s3") + assert len(received) == 1 + assert received[0]["hdp"] == "0.1" + + def test_end_trace_without_token_is_noop(self): + handler, _, _ = _make_handler() + handler.end_trace("s3") # no start_trace called first — must not raise + + +class TestToolCallHandling: + def setup_method(self): + clear_token() + + def _tool_start(self, handler, tool_name: str, event_id="e1"): + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool(tool_name)}, + event_id=event_id, + ) + + def test_tool_call_extends_chain(self): + handler, _, _ = _make_handler() + handler.start_trace("s4") + self._tool_start(handler, "web_search") + chain = get_token()["chain"] + assert len(chain) == 1 + assert chain[0]["action_summary"] == "tool_call: web_search" + + def test_tool_call_hop_is_signed(self): + handler, _, pub = _make_handler() + handler.start_trace("s5") + self._tool_start(handler, "web_search") + result = verify_chain(get_token(), pub.public_bytes_raw()) + assert result.valid + + def test_multiple_tool_calls_build_chain(self): + handler, _, pub = _make_handler() + handler.start_trace("s6") + self._tool_start(handler, "tool_a", "e1") + self._tool_start(handler, "tool_b", "e2") + self._tool_start(handler, "tool_c", "e3") + chain = get_token()["chain"] + assert len(chain) == 3 + assert [h["seq"] for h in chain] == [1, 2, 3] + assert verify_chain(get_token(), pub.public_bytes_raw()).valid + + def test_tool_output_recorded_on_end(self): + handler, _, _ = _make_handler() + handler.start_trace("s7") + self._tool_start(handler, "web_search") + handler.on_event_end( + CBEventType.FUNCTION_CALL, + payload={EventPayload.FUNCTION_OUTPUT: "search results here"}, + event_id="e1", + ) + last_hop = get_token()["chain"][-1] + assert "tool_output_preview" in last_hop.get("metadata", {}) + + +class TestScopeEnforcement: + def setup_method(self): + clear_token() + + def test_authorized_tool_no_violation(self): + handler, _, _ = _make_handler( + scope=ScopePolicy(intent="x", authorized_tools=["web_search"]) + ) + handler.start_trace("sv1") + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("web_search")}, + ) + violations = get_token().get("scope", {}).get("extensions", {}).get("scope_violations", []) + assert violations == [] + + def test_unauthorized_tool_recorded_in_observe_mode(self): + handler, _, _ = _make_handler( + scope=ScopePolicy(intent="x", authorized_tools=["web_search"]) + ) + handler.start_trace("sv2") + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("exec_code")}, + ) + violations = get_token()["scope"]["extensions"]["scope_violations"] + assert len(violations) == 1 + assert violations[0]["tool"] == "exec_code" + + def test_strict_mode_raises(self): + handler, _, _ = _make_handler( + scope=ScopePolicy(intent="x", authorized_tools=["web_search"]), + strict=True, + ) + handler.start_trace("sv3") + with pytest.raises(HDPScopeViolationError) as exc_info: + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("exec_code")}, + ) + assert exc_info.value.tool == "exec_code" + + def test_no_authorized_tools_means_all_allowed(self): + handler, _, _ = _make_handler(scope=ScopePolicy(intent="x")) + handler.start_trace("sv4") + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("anything")}, + ) + extensions = get_token().get("scope", {}).get("extensions", {}) + assert "scope_violations" not in extensions + + def test_max_hops_enforced(self): + handler, _, _ = _make_handler(scope=ScopePolicy(intent="x", max_hops=2)) + handler.start_trace("sv5") + for i in range(5): + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool(f"tool_{i}")}, + ) + assert len(get_token()["chain"]) == 2 + + +class TestNonBlocking: + def setup_method(self): + clear_token() + + def test_bad_key_does_not_raise(self): + handler = HdpCallbackHandler( + signing_key=b"\x00" * 5, + principal=HdpPrincipal(id="u", id_type="opaque"), + scope=ScopePolicy(intent="x"), + ) + handler.start_trace("nb1") + assert get_token() is None + + def test_events_without_token_are_noop(self): + handler, _, _ = _make_handler() + # No start_trace — on_event_start must not raise + handler.on_event_start( + CBEventType.FUNCTION_CALL, + payload={EventPayload.TOOL: FakeTool("web_search")}, + ) diff --git a/packages/llama-index-callbacks-hdp/tests/test_postprocessor.py b/packages/llama-index-callbacks-hdp/tests/test_postprocessor.py new file mode 100644 index 0000000..f5427a8 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/tests/test_postprocessor.py @@ -0,0 +1,157 @@ +"""Tests for HdpNodePostprocessor — retrieval hop recording and scope enforcement.""" + +from __future__ import annotations + +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from llama_index.core.schema import NodeWithScore, TextNode +from llama_index.callbacks.hdp import ( + HDPScopeViolationError, + HdpNodePostprocessor, + HdpPrincipal, + ScopePolicy, + verify_chain, +) +from llama_index.callbacks.hdp.callbacks import HdpCallbackHandler +from llama_index.callbacks.hdp.session import clear_token, get_token, set_token + + +def _generate_key(): + priv = Ed25519PrivateKey.generate() + return priv.private_bytes_raw(), priv.public_key() + + +def _issue_token(signing_key: bytes, scope: ScopePolicy | None = None) -> dict: + """Issue a minimal root token and store it in the ContextVar.""" + import time, uuid + from llama_index.callbacks.hdp._crypto import sign_root + now = int(time.time() * 1000) + scope = scope or ScopePolicy(intent="test") + unsigned = { + "hdp": "0.1", + "header": { + "token_id": str(uuid.uuid4()), + "issued_at": now, + "expires_at": now + 86400000, + "session_id": "test-session", + "version": "0.1", + }, + "principal": {"id": "user@test.com", "id_type": "email"}, + "scope": scope.to_dict(), + "chain": [], + } + sig = sign_root(unsigned, signing_key, "default") + token = {**unsigned, "signature": sig} + set_token(token) + return token + + +def _make_nodes(*classifications: str) -> list[NodeWithScore]: + nodes = [] + for cls in classifications: + metadata = {"classification": cls} if cls else {} + nodes.append(NodeWithScore(node=TextNode(text="content", metadata=metadata), score=1.0)) + return nodes + + +class TestRetrievelHopRecording: + def setup_method(self): + clear_token() + + def test_records_retrieval_hop_with_signing_key(self): + key, pub = _generate_key() + _issue_token(key) + pp = HdpNodePostprocessor(signing_key=key) + nodes = _make_nodes("public", "public") + result = pp._postprocess_nodes(nodes) + assert result == nodes + chain = get_token()["chain"] + assert len(chain) == 1 + assert "retrieval: 2 nodes" in chain[0]["action_summary"] + + def test_retrieval_hop_is_signed(self): + key, pub = _generate_key() + _issue_token(key) + pp = HdpNodePostprocessor(signing_key=key) + pp._postprocess_nodes(_make_nodes("public")) + result = verify_chain(get_token(), pub.public_bytes_raw()) + assert result.valid + + def test_without_signing_key_records_unsigned_hop(self): + key, _ = _generate_key() + _issue_token(key) + pp = HdpNodePostprocessor() # no signing_key + pp._postprocess_nodes(_make_nodes("public")) + chain = get_token()["chain"] + assert len(chain) == 1 + assert chain[0]["hop_signature"] == "" + + def test_no_active_token_returns_nodes_unchanged(self): + pp = HdpNodePostprocessor() + nodes = _make_nodes("public", "internal") + result = pp._postprocess_nodes(nodes) + assert result == nodes + + def test_query_str_included_in_hop_summary(self): + from llama_index.core.schema import QueryBundle + key, _ = _generate_key() + _issue_token(key) + pp = HdpNodePostprocessor(signing_key=key) + qb = QueryBundle(query_str="what is AI?") + pp._postprocess_nodes(_make_nodes("public"), query_bundle=qb) + summary = get_token()["chain"][0]["action_summary"] + assert "what is AI?" in summary + + +class TestDataClassificationEnforcement: + def setup_method(self): + clear_token() + + def test_nodes_within_classification_pass(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="confidential")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=True) + nodes = _make_nodes("public", "internal", "confidential") + result = pp._postprocess_nodes(nodes) + assert len(result) == 3 + violations = get_token()["scope"].get("extensions", {}).get("classification_violations", []) + assert violations == [] + + def test_nodes_above_classification_logged_in_observe_mode(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="internal")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=True, strict=False) + nodes = _make_nodes("public", "restricted") # restricted > internal + result = pp._postprocess_nodes(nodes) + assert len(result) == 2 # observe mode: nodes still returned + violations = get_token()["scope"]["extensions"]["classification_violations"] + assert len(violations) == 1 + assert "restricted" in violations[0]["violated_classifications"] + + def test_strict_mode_raises_on_classification_violation(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="internal")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=True, strict=True) + with pytest.raises(HDPScopeViolationError): + pp._postprocess_nodes(_make_nodes("restricted")) + + def test_check_data_classification_false_skips_check(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="public")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=False) + # restricted nodes should pass through without any violation + nodes = _make_nodes("restricted") + result = pp._postprocess_nodes(nodes) + assert result == nodes + assert "classification_violations" not in get_token()["scope"].get("extensions", {}) + + def test_nodes_without_classification_default_to_internal(self): + key, _ = _generate_key() + _issue_token(key, ScopePolicy(intent="x", data_classification="internal")) + pp = HdpNodePostprocessor(signing_key=key, check_data_classification=True) + # Node with no classification metadata should be treated as "internal" + node = NodeWithScore(node=TextNode(text="no metadata"), score=1.0) + result = pp._postprocess_nodes([node]) + assert len(result) == 1 + assert "classification_violations" not in get_token()["scope"].get("extensions", {}) diff --git a/packages/llama-index-callbacks-hdp/tests/test_session.py b/packages/llama-index-callbacks-hdp/tests/test_session.py new file mode 100644 index 0000000..b40fec9 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/tests/test_session.py @@ -0,0 +1,74 @@ +"""Tests for ContextVar session state — isolation across concurrent async tasks.""" + +from __future__ import annotations + +import asyncio +import pytest +from llama_index.callbacks.hdp.session import clear_token, get_token, set_token + + +class TestSessionBasics: + def test_get_token_returns_none_by_default(self): + clear_token() + assert get_token() is None + + def test_set_and_get_token(self): + token = {"hdp": "0.1", "header": {"token_id": "abc"}} + set_token(token) + assert get_token() is token + clear_token() + + def test_clear_token(self): + set_token({"hdp": "0.1"}) + clear_token() + assert get_token() is None + + def test_set_overwrites_previous(self): + set_token({"id": "first"}) + set_token({"id": "second"}) + assert get_token()["id"] == "second" + clear_token() + + +class TestContextVarIsolation: + @pytest.mark.asyncio + async def test_tasks_do_not_share_token(self): + """Each asyncio task should have its own ContextVar copy.""" + results = {} + + async def task_a(): + clear_token() + set_token({"task": "a"}) + await asyncio.sleep(0.01) + results["a"] = get_token() + + async def task_b(): + clear_token() + set_token({"task": "b"}) + await asyncio.sleep(0.01) + results["b"] = get_token() + + await asyncio.gather(task_a(), task_b()) + assert results["a"]["task"] == "a" + assert results["b"]["task"] == "b" + + @pytest.mark.asyncio + async def test_child_task_inherits_parent_but_is_isolated(self): + """Child tasks inherit the parent's ContextVar at creation time but modifications + in the child do not affect the parent.""" + clear_token() + set_token({"owner": "parent"}) + + child_saw: dict = {} + + async def child(): + child_saw["initial"] = get_token() + set_token({"owner": "child"}) + child_saw["after_set"] = get_token() + + await asyncio.create_task(child()) + # Parent's token must be unchanged + assert get_token()["owner"] == "parent" + assert child_saw["initial"]["owner"] == "parent" + assert child_saw["after_set"]["owner"] == "child" + clear_token() diff --git a/packages/llama-index-callbacks-hdp/tests/test_verify.py b/packages/llama-index-callbacks-hdp/tests/test_verify.py new file mode 100644 index 0000000..eaeb907 --- /dev/null +++ b/packages/llama-index-callbacks-hdp/tests/test_verify.py @@ -0,0 +1,136 @@ +"""Tests for offline chain verification — ported and extended from hdp-crewai.""" + +from __future__ import annotations + +import time +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + +from llama_index.callbacks.hdp import verify_chain +from llama_index.callbacks.hdp._crypto import sign_root, sign_hop + + +def _generate_key(): + priv = Ed25519PrivateKey.generate() + return priv.private_bytes_raw(), priv.public_key() + + +def _issue_token(key: bytes, pub_key=None, session_id="s1", expired=False, max_hops=None) -> dict: + import uuid + now = int(time.time() * 1000) + expires_at = now - 1000 if expired else now + 86400000 + scope: dict = {"intent": "test", "data_classification": "internal", "network_egress": True, "persistence": False} + if max_hops is not None: + scope["max_hops"] = max_hops + unsigned = { + "hdp": "0.1", + "header": { + "token_id": str(uuid.uuid4()), + "issued_at": now, + "expires_at": expires_at, + "session_id": session_id, + "version": "0.1", + }, + "principal": {"id": "user@test.com", "id_type": "email"}, + "scope": scope, + "chain": [], + } + sig = sign_root(unsigned, key, "k1") + return {**unsigned, "signature": sig} + + +def _add_hop(token: dict, key: bytes, action: str) -> dict: + import time + chain = token.get("chain", []) + seq = len(chain) + 1 + unsigned_hop = { + "seq": seq, + "agent_id": "test-agent", + "agent_type": "tool-executor", + "timestamp": int(time.time() * 1000), + "action_summary": action, + "parent_hop": seq - 1, + } + cumulative = [*chain, unsigned_hop] + hop_sig = sign_hop(cumulative, token["signature"]["value"], key) + return {**token, "chain": [*chain, {**unsigned_hop, "hop_signature": hop_sig}]} + + +class TestVerifyChain: + def test_empty_chain_valid(self): + key, pub = _generate_key() + token = _issue_token(key) + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + assert result.hop_count == 0 + + def test_chain_with_hops_valid(self): + key, pub = _generate_key() + token = _issue_token(key) + token = _add_hop(token, key, "tool_call: web_search") + token = _add_hop(token, key, "tool_call: retriever") + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + assert result.hop_count == 2 + + def test_accepts_raw_public_key_bytes(self): + key, pub = _generate_key() + token = _issue_token(key) + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + + def test_tampered_root_sig_fails(self): + key, pub = _generate_key() + token = _issue_token(key) + token["signature"]["value"] = token["signature"]["value"][:-4] + "XXXX" + result = verify_chain(token, pub.public_bytes_raw()) + assert not result.valid + assert any("Root signature" in v for v in result.violations) + + def test_tampered_hop_sig_fails(self): + key, pub = _generate_key() + token = _issue_token(key) + token = _add_hop(token, key, "action") + token["chain"][0]["hop_signature"] = "AAAA" + result = verify_chain(token, pub.public_bytes_raw()) + assert not result.valid + + def test_wrong_public_key_fails(self): + key, _ = _generate_key() + _, other_pub = _generate_key() + token = _issue_token(key) + result = verify_chain(token, other_pub.public_bytes_raw()) + assert not result.valid + + def test_expired_token_flagged(self): + key, pub = _generate_key() + token = _issue_token(key, expired=True) + result = verify_chain(token, pub.public_bytes_raw()) + assert any("expired" in v.lower() for v in result.violations) + + def test_max_hops_exceeded_flagged(self): + key, pub = _generate_key() + # max_hops must be set at issuance time so root sig covers it + token = _issue_token(key, max_hops=1) + token = _add_hop(token, key, "hop 1") + token = _add_hop(token, key, "hop 2") + result = verify_chain(token, pub.public_bytes_raw()) + assert any("max_hops" in v for v in result.violations) + + def test_hop_results_detail(self): + key, pub = _generate_key() + token = _issue_token(key) + token = _add_hop(token, key, "action a") + token = _add_hop(token, key, "action b") + result = verify_chain(token, pub.public_bytes_raw()) + assert result.valid + assert len(result.hop_results) == 2 + assert all(hr.valid for hr in result.hop_results) + + def test_depth_property(self): + key, pub = _generate_key() + token = _issue_token(key) + for i in range(3): + token = _add_hop(token, key, f"hop {i}") + result = verify_chain(token, pub.public_bytes_raw()) + assert result.depth == 3