diff --git a/CHANGELOG.md b/CHANGELOG.md index bf27b20..4b553ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Progressive disclosure for tool results: view registry + drilldown loop (#17) +- `ViewRegistry` class in `context/views.py` — maps content-type patterns to `ViewSpec` generators +- Built-in view generators for `application/json`, `text/csv`, `text/plain`, and binary/image content +- `generate_views()` function for auto-generating `ViewSpec` entries from artifact data +- `drilldown_tool_spec()` helper — generates a `SelectableItem` exposing drilldown as an agent-callable tool +- `ContextManager.drilldown()` / `drilldown_sync()` — agent-facing wrapper for `ArtifactStore.drilldown()` with optional context injection +- `ContextManager.view_registry` property for accessing/extending the view registry +- Auto-generated `ViewSpec` entries during `ingest_tool_result()` (both large and small outputs) +- Auto-generated `ViewSpec` entries during `apply_firewall()` via view registry +- Content-type detection heuristics for generic `application/octet-stream` artifacts +- Small tool outputs now stored in artifact store with `artifact_ref` for drilldown support + ## [0.1.2] - 2026-03-04 ### Added diff --git a/src/contextweaver/__init__.py b/src/contextweaver/__init__.py index c640146..468d08c 100644 --- a/src/contextweaver/__init__.py +++ b/src/contextweaver/__init__.py @@ -25,6 +25,7 @@ from contextweaver.config import ContextBudget, ContextPolicy, ScoringConfig from contextweaver.context.manager import ContextManager from contextweaver.context.sensitivity import MaskRedactionHook, register_redaction_hook +from contextweaver.context.views import ViewRegistry, drilldown_tool_spec, generate_views from contextweaver.envelope import ( BuildStats, ChoiceCard, @@ -136,6 +137,9 @@ # context engine "ContextManager", "MaskRedactionHook", + "ViewRegistry", + "drilldown_tool_spec", + "generate_views", "register_redaction_hook", # routing engine "Catalog", diff --git a/src/contextweaver/context/__init__.py b/src/contextweaver/context/__init__.py index b4f6a38..2c92e54 100644 --- a/src/contextweaver/context/__init__.py +++ b/src/contextweaver/context/__init__.py @@ -20,17 +20,21 @@ apply_sensitivity_filter, register_redaction_hook, ) +from contextweaver.context.views import ViewRegistry, drilldown_tool_spec, generate_views __all__ = [ "ContextManager", "MaskRedactionHook", + "ViewRegistry", "apply_firewall", "apply_firewall_to_batch", "apply_sensitivity_filter", "register_redaction_hook", "build_schema_header", "deduplicate_candidates", + "drilldown_tool_spec", "generate_candidates", + "generate_views", "render_context", "render_item", "resolve_dependency_closure", diff --git a/src/contextweaver/context/firewall.py b/src/contextweaver/context/firewall.py index 6b26394..b6d2ee3 100644 --- a/src/contextweaver/context/firewall.py +++ b/src/contextweaver/context/firewall.py @@ -10,6 +10,7 @@ from typing import Literal +from contextweaver.context.views import ViewRegistry, generate_views from contextweaver.envelope import ResultEnvelope from contextweaver.protocols import ArtifactStore, EventHook, NoOpHook from contextweaver.summarize.extract import extract_facts @@ -28,6 +29,7 @@ def apply_firewall( item: ContextItem, artifact_store: ArtifactStore, hook: EventHook | None = None, + view_registry: ViewRegistry | None = None, ) -> tuple[ContextItem, ResultEnvelope | None]: """Intercept a ``tool_result`` item and store its content out-of-band. @@ -38,6 +40,7 @@ def apply_firewall( item: The candidate item to inspect. artifact_store: Where to store the raw content. hook: Optional lifecycle hook to notify on firewall trigger. + view_registry: Optional custom view registry for auto-view generation. Returns: A 2-tuple ``(processed_item, envelope_or_none)``. When the firewall @@ -75,11 +78,14 @@ def apply_firewall( facts = [] status = "error" if status == "error" else "partial" + views = generate_views(ref, raw_bytes, registry=view_registry) + envelope = ResultEnvelope( status=status, summary=summary, facts=facts, artifacts=[ref], + views=views, provenance={"source_item_id": item.id}, ) @@ -101,6 +107,7 @@ def apply_firewall_to_batch( items: list[ContextItem], artifact_store: ArtifactStore, hook: EventHook | None = None, + view_registry: ViewRegistry | None = None, ) -> tuple[list[ContextItem], list[ResultEnvelope]]: """Apply the firewall to a list of items. @@ -108,6 +115,7 @@ def apply_firewall_to_batch( items: Candidate items (may contain a mix of kinds). artifact_store: Where to store raw tool outputs. hook: Optional lifecycle hook. + view_registry: Optional custom view registry for auto-view generation. Returns: A 2-tuple of ``(processed_items, envelopes)``. @@ -115,7 +123,7 @@ def apply_firewall_to_batch( processed = [] envelopes = [] for item in items: - p, env = apply_firewall(item, artifact_store, hook) + p, env = apply_firewall(item, artifact_store, hook, view_registry) processed.append(p) if env is not None: envelopes.append(env) diff --git a/src/contextweaver/context/manager.py b/src/contextweaver/context/manager.py index de86540..e2ce0cf 100644 --- a/src/contextweaver/context/manager.py +++ b/src/contextweaver/context/manager.py @@ -24,6 +24,7 @@ from contextweaver.context.scoring import score_candidates from contextweaver.context.selection import select_and_pack from contextweaver.context.sensitivity import apply_sensitivity_filter +from contextweaver.context.views import ViewRegistry, generate_views from contextweaver.envelope import ContextPack, ResultEnvelope from contextweaver.protocols import ( ArtifactStore, @@ -91,6 +92,7 @@ def __init__( self._scoring = scoring_config or ScoringConfig() self._estimator: TokenEstimator = estimator or CharDivFourEstimator() self._hook: EventHook = hook or NoOpHook() + self._view_registry: ViewRegistry = ViewRegistry() # ------------------------------------------------------------------ # Properties @@ -116,6 +118,11 @@ def fact_store(self) -> InMemoryFactStore: """The underlying fact store.""" return self._fact_store + @property + def view_registry(self) -> ViewRegistry: + """The view registry for auto-generating drilldown views.""" + return self._view_registry + # ------------------------------------------------------------------ # Ingestion helpers # ------------------------------------------------------------------ @@ -147,7 +154,10 @@ def ingest_tool_result( """Ingest a raw tool result through the context firewall. If the raw output exceeds *firewall_threshold* characters it is stored - in the artifact store and the LLM sees only a summary. + in the artifact store and the LLM sees only a summary. Small outputs + are also stored in the artifact store (with ``artifact_ref`` set on the + returned item) to enable drilldown on all tool results regardless of + size. Args: tool_call_id: ID of the originating tool call. @@ -158,7 +168,8 @@ def ingest_tool_result( stores the raw output out-of-band. Returns: - A ``(ContextItem, ResultEnvelope)`` tuple. + A ``(ContextItem, ResultEnvelope)`` tuple. The item always has a + non-``None`` ``artifact_ref``. """ item = ContextItem( id=f"result:{tool_call_id}", @@ -170,23 +181,46 @@ def ingest_tool_result( ) if len(raw_output) > firewall_threshold: - processed, envelope = apply_firewall(item, self._artifact_store, self._hook) + processed, envelope = apply_firewall( + item, self._artifact_store, self._hook, self._view_registry + ) if envelope is None: # Shouldn't happen for tool_result items, but be safe envelope = ResultEnvelope(status="ok", summary=raw_output[:500]) self._event_log.append(processed) return processed, envelope - # Small output: still extract facts but no artifact storage + # Small output: extract facts and store in artifact store to enable drilldown from contextweaver.summarize.extract import extract_facts facts = extract_facts(raw_output, item.metadata) + # For small outputs, store in artifact store to enable drilldown + raw_bytes = raw_output.encode("utf-8") + handle = f"artifact:{item.id}" + ref = self._artifact_store.put( + handle=handle, + content=raw_bytes, + media_type=media_type, + label=f"raw tool result for {item.id}", + ) + views = generate_views(ref, raw_bytes, registry=self._view_registry) envelope = ResultEnvelope( status="ok", summary=raw_output, facts=facts, + artifacts=[ref], + views=views, provenance={"source_item_id": item.id, "tool_name": tool_name}, ) + item = ContextItem( + id=item.id, + kind=item.kind, + text=item.text, + token_estimate=item.token_estimate, + metadata=dict(item.metadata), + parent_id=item.parent_id, + artifact_ref=ref, + ) self._event_log.append(item) return item, envelope @@ -336,6 +370,68 @@ def add_episode_sync( """Synchronous alias for :meth:`add_episode`.""" self.add_episode(episode_id, summary, metadata) + # ------------------------------------------------------------------ + # Drilldown + # ------------------------------------------------------------------ + + def drilldown( + self, + handle: str, + selector: dict[str, Any], + *, + inject: bool = False, + parent_id: str | None = None, + ) -> str: + """Fetch a slice of a stored artifact via the drilldown protocol. + + Wraps :meth:`~contextweaver.protocols.ArtifactStore.drilldown` and + optionally injects the result as a new :class:`ContextItem` in the + event log for subsequent context builds. + + Args: + handle: Artifact handle to drill into. + selector: Drilldown selector dict (see + :meth:`~contextweaver.store.artifacts.InMemoryArtifactStore.drilldown`). + inject: If ``True``, append the drilldown result as a + ``tool_result`` :class:`ContextItem` to the event log. + parent_id: Optional parent item ID for dependency closure when + *inject* is ``True``. + + Returns: + The drilldown result text. + + Raises: + ArtifactNotFoundError: If *handle* is not in the store. + ValueError: If the selector type is unknown. + """ + result = self._artifact_store.drilldown(handle, selector) + + if inject: + sel_type = selector.get("type", "unknown") + item_id = f"drilldown:{handle}:{sel_type}:{self._event_log.count()}" + item = ContextItem( + id=item_id, + kind=ItemKind.tool_result, + text=result, + token_estimate=self._estimator.estimate(result), + metadata={"drilldown_handle": handle, "selector": selector}, + parent_id=parent_id, + ) + self._event_log.append(item) + + return result + + def drilldown_sync( + self, + handle: str, + selector: dict[str, Any], + *, + inject: bool = False, + parent_id: str | None = None, + ) -> str: + """Synchronous alias for :meth:`drilldown`.""" + return self.drilldown(handle, selector, inject=inject, parent_id=parent_id) + # ------------------------------------------------------------------ # Core pipeline # ------------------------------------------------------------------ diff --git a/src/contextweaver/context/views.py b/src/contextweaver/context/views.py new file mode 100644 index 0000000..712f945 --- /dev/null +++ b/src/contextweaver/context/views.py @@ -0,0 +1,286 @@ +"""View registry and progressive disclosure helpers for contextweaver. + +Provides automatic :class:`~contextweaver.types.ViewSpec` generation, +a content-type view registry, and ``drilldown_tool_spec()``. +""" + +from __future__ import annotations + +import csv +import json +from collections.abc import Callable + +from contextweaver.types import ArtifactRef, SelectableItem, ViewSpec + +ViewGenerator = Callable[[ArtifactRef, bytes], list[ViewSpec]] + + +def _json_views(ref: ArtifactRef, data: bytes) -> list[ViewSpec]: + """Generate views for JSON content.""" + text = data.decode("utf-8", errors="replace") + try: + obj = json.loads(text) + except (json.JSONDecodeError, ValueError): + return [] + + views: list[ViewSpec] = [] + + if isinstance(obj, dict) and obj: + keys = sorted(obj.keys()) + views.append( + ViewSpec( + view_id=f"{ref.handle}:json_keys", + label=f"JSON keys: {', '.join(keys[:5])}" + ("…" if len(keys) > 5 else ""), + selector={"type": "json_keys", "keys": keys}, + artifact_ref=ref, + ) + ) + # Individual key views for up to 10 top-level keys + for key in keys[:10]: + views.append( + ViewSpec( + view_id=f"{ref.handle}:key:{key}", + label=f"JSON key '{key}'", + selector={"type": "json_keys", "keys": [key]}, + artifact_ref=ref, + ) + ) + + if isinstance(obj, list) and obj: + views.append( + ViewSpec( + view_id=f"{ref.handle}:array_head", + label=f"Array head ({min(len(obj), 5)} of {len(obj)} items)", + selector={"type": "head", "chars": 500}, + artifact_ref=ref, + ) + ) + + # Always offer a head view for non-trivial JSON + if len(text) > 200: + views.append( + ViewSpec( + view_id=f"{ref.handle}:head", + label="Head (500 chars)", + selector={"type": "head", "chars": 500}, + artifact_ref=ref, + ) + ) + + return views + + +def _csv_views(ref: ArtifactRef, data: bytes) -> list[ViewSpec]: + """Generate views for CSV/TSV content.""" + text = data.decode("utf-8", errors="replace") + lines = text.splitlines() + total = len(lines) + + views: list[ViewSpec] = [] + + if total > 0: + views.append( + ViewSpec( + view_id=f"{ref.handle}:head_rows", + label=f"Head rows (first {min(total, 10)} of {total})", + selector={"type": "rows", "start": 0, "end": min(total, 10)}, + artifact_ref=ref, + ) + ) + + if total > 10: + views.append( + ViewSpec( + view_id=f"{ref.handle}:tail_rows", + label=f"Tail rows (last {min(total, 10)} of {total})", + selector={"type": "rows", "start": max(0, total - 10), "end": total}, + artifact_ref=ref, + ) + ) + + return views + + +def _text_views(ref: ArtifactRef, data: bytes) -> list[ViewSpec]: + """Generate views for plain text content.""" + text = data.decode("utf-8", errors="replace") + lines = text.splitlines() + total = len(lines) + + views: list[ViewSpec] = [] + + if total > 0: + views.append( + ViewSpec( + view_id=f"{ref.handle}:head", + label=f"Head ({min(total, 20)} lines)", + selector={"type": "lines", "start": 0, "end": min(total, 20)}, + artifact_ref=ref, + ) + ) + + if total > 20: + views.append( + ViewSpec( + view_id=f"{ref.handle}:tail", + label=f"Tail ({min(total, 20)} lines)", + selector={"type": "lines", "start": max(0, total - 20), "end": total}, + artifact_ref=ref, + ) + ) + + return views + + +def _binary_views(ref: ArtifactRef, data: bytes) -> list[ViewSpec]: + """Generate a header-inspection view for binary/image content.""" + # NOTE: drilldown decodes to UTF-8 with errors="replace", so binary + # content may contain replacement characters. This is intentional — + # the purpose is file-header / magic-byte inspection. + return [ + ViewSpec( + view_id=f"{ref.handle}:meta", + label=f"Header (128 bytes, {ref.media_type})", + selector={"type": "head", "chars": 128}, + artifact_ref=ref, + ) + ] + + +# --------------------------------------------------------------------------- +# Content-type detection +# --------------------------------------------------------------------------- + + +def _detect_content_type(data: bytes, media_type: str) -> str: + """Detect effective content type using media_type hint and heuristics.""" + if media_type.startswith("image/"): + return media_type + if media_type == "application/octet-stream": + try: + text = data.decode("utf-8") + except (UnicodeDecodeError, ValueError): + return media_type + text = text.strip() + if text and text[0] in ("{", "["): + try: + json.loads(text) + return "application/json" + except (json.JSONDecodeError, ValueError): + pass + if _looks_like_csv(text): + return "text/csv" + return "text/plain" + + return media_type + + +def _looks_like_csv(text: str) -> bool: + """Heuristic: does the text look like CSV/TSV?""" + lines = text.splitlines() + if len(lines) < 2: + return False + try: + sniffer = csv.Sniffer() + sample = "\n".join(lines[:5]) + sniffer.sniff(sample) + return True + except csv.Error: + return False + + +# --------------------------------------------------------------------------- +# ViewRegistry +# --------------------------------------------------------------------------- + + +class ViewRegistry: + """Maps content-type patterns to :class:`ViewSpec` generators. + + Built-in generators handle ``application/json``, ``text/csv``, + ``text/plain``, and binary/image content. Users extend via :meth:`register`. + """ + + def __init__(self) -> None: + self._generators: dict[str, ViewGenerator] = { + "application/json": _json_views, + "text/csv": _csv_views, + "text/plain": _text_views, + } + + def register(self, content_type: str, generator: ViewGenerator) -> None: + """Register a view generator for a content type.""" + self._generators[content_type] = generator + + def generate_views(self, ref: ArtifactRef, data: bytes) -> list[ViewSpec]: + """Auto-generate :class:`ViewSpec` entries for an artifact. + + Detects effective content type and delegates to the matching + generator. Falls back to a binary/metadata view for unknown types. + """ + effective = _detect_content_type(data, ref.media_type) + + # Exact match + if effective in self._generators: + return self._generators[effective](ref, data) + + # Prefix match for text/* types only — prefer text/plain as + # the general fallback within the text family. + if effective.startswith("text/") and "text/plain" in self._generators: + return self._generators["text/plain"](ref, data) + + # Fallback: binary metadata view + return _binary_views(ref, data) + + +def generate_views( + ref: ArtifactRef, + data: bytes, + registry: ViewRegistry | None = None, +) -> list[ViewSpec]: + """Auto-generate views for an artifact using the given or default registry.""" + reg = registry or ViewRegistry() + return reg.generate_views(ref, data) + + +# --------------------------------------------------------------------------- +# Drilldown tool spec +# --------------------------------------------------------------------------- + + +def drilldown_tool_spec() -> SelectableItem: + """Return a :class:`SelectableItem` describing the drilldown action. + + Add to the agent's tool catalog during ``interpret`` phase so the + agent can call drilldown to fetch artifact slices. + """ + return SelectableItem( + id="contextweaver:drilldown", + kind="internal", + name="drilldown", + description=( + "Fetch a specific slice of a stored artifact. " + "Provide an artifact handle and a selector to retrieve " + "only the data you need." + ), + tags=["progressive-disclosure", "drilldown", "artifact"], + args_schema={ + "type": "object", + "properties": { + "handle": {"type": "string", "description": "Artifact handle."}, + "selector": { + "type": "object", + "description": 'Selector: "head", "lines", "json_keys", or "rows".', + "properties": { + "type": { + "type": "string", + "enum": ["head", "lines", "json_keys", "rows"], + }, + }, + "required": ["type"], + }, + }, + "required": ["handle", "selector"], + }, + metadata={"builtin": True, "phase": "interpret"}, + ) diff --git a/tests/test_manager.py b/tests/test_manager.py index 1051656..167ea64 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -2,6 +2,8 @@ from __future__ import annotations +import json + import pytest from contextweaver.context.manager import ContextManager @@ -691,3 +693,204 @@ def test_ingest_mcp_result_mixed_content() -> None: assert mgr.artifact_store.get("mcp:multi_tool:image:1") == img_bytes assert "Found 5 results" in env.summary assert "Report content" in env.summary + + +# --------------------------------------------------------------------------- +# Drilldown +# --------------------------------------------------------------------------- + + +def test_drilldown_basic() -> None: + """drilldown() returns a slice of a stored artifact.""" + mgr = ContextManager() + large_output = json.dumps({"users": [{"id": i, "name": f"User {i}"} for i in range(100)]}) + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-dd1", + raw_output=large_output, + tool_name="search_users", + media_type="application/json", + firewall_threshold=100, + ) + assert len(env.artifacts) >= 1 + handle = env.artifacts[0].handle + result = mgr.drilldown(handle, {"type": "head", "chars": 50}) + assert len(result) <= 50 + assert result == large_output[:50] + + +def test_drilldown_json_keys() -> None: + """drilldown() with json_keys selector returns filtered JSON.""" + mgr = ContextManager() + data = json.dumps({"name": "Alice", "age": 30, "role": "admin"}) + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-dd2", + raw_output=data, + tool_name="get_user", + media_type="application/json", + firewall_threshold=10, + ) + handle = env.artifacts[0].handle + result = mgr.drilldown(handle, {"type": "json_keys", "keys": ["name"]}) + parsed = json.loads(result) + assert parsed == {"name": "Alice"} + + +def test_drilldown_inject_into_event_log() -> None: + """drilldown(inject=True) appends the result as a new ContextItem.""" + mgr = ContextManager() + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-dd3", + raw_output="line0\nline1\nline2\nline3\nline4", + tool_name="list_files", + firewall_threshold=10, + ) + handle = env.artifacts[0].handle + initial_count = mgr.event_log.count() + result = mgr.drilldown( + handle, + {"type": "lines", "start": 1, "end": 3}, + inject=True, + parent_id=_item.id, + ) + assert result == "line1\nline2" + assert mgr.event_log.count() == initial_count + 1 + injected = mgr.event_log.get(f"drilldown:{handle}:lines:{initial_count}") + assert injected.text == "line1\nline2" + assert injected.parent_id == _item.id + + +def test_drilldown_inject_repeated_same_selector() -> None: + """Repeated drilldown(inject=True) with the same selector must not crash.""" + mgr = ContextManager() + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-dd-rep", + raw_output="line0\nline1\nline2\nline3\nline4", + tool_name="list_files", + firewall_threshold=10, + ) + handle = env.artifacts[0].handle + selector = {"type": "lines", "start": 0, "end": 2} + count_before = mgr.event_log.count() + r1 = mgr.drilldown(handle, selector, inject=True) + r2 = mgr.drilldown(handle, selector, inject=True) + assert r1 == r2 == "line0\nline1" + assert mgr.event_log.count() == count_before + 2 + + +def test_drilldown_without_inject() -> None: + """drilldown(inject=False) does not modify event log.""" + mgr = ContextManager() + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-dd4", + raw_output="hello world", + tool_name="echo", + firewall_threshold=5, + ) + handle = env.artifacts[0].handle + count_before = mgr.event_log.count() + mgr.drilldown(handle, {"type": "head", "chars": 5}) + assert mgr.event_log.count() == count_before + + +def test_drilldown_sync() -> None: + """drilldown_sync() is a synchronous alias.""" + mgr = ContextManager() + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-dd5", + raw_output="sync test data", + tool_name="echo", + firewall_threshold=5, + ) + handle = env.artifacts[0].handle + result = mgr.drilldown_sync(handle, {"type": "head", "chars": 4}) + assert result == "sync" + + +def test_drilldown_missing_handle_raises() -> None: + """drilldown() raises ArtifactNotFoundError for unknown handles.""" + from contextweaver.exceptions import ArtifactNotFoundError + + mgr = ContextManager() + with pytest.raises(ArtifactNotFoundError): + mgr.drilldown("no-such-handle", {"type": "head", "chars": 10}) + + +def test_drilldown_unknown_selector_raises() -> None: + """drilldown() raises ValueError for unknown selector types.""" + mgr = ContextManager() + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-dd6", + raw_output="some data", + tool_name="echo", + firewall_threshold=5, + ) + handle = env.artifacts[0].handle + with pytest.raises(ValueError, match="Unknown drilldown"): + mgr.drilldown(handle, {"type": "unknown_type"}) + + +# --------------------------------------------------------------------------- +# Auto-generated views on ingest +# --------------------------------------------------------------------------- + + +def test_ingest_tool_result_auto_views_json_large() -> None: + """Large JSON tool results have auto-generated views after firewall.""" + mgr = ContextManager() + data = json.dumps({"users": [1, 2, 3], "count": 3}) + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-av1", + raw_output=data, + tool_name="api_call", + media_type="application/json", + firewall_threshold=10, + ) + assert len(env.views) > 0 + view_ids = [v.view_id for v in env.views] + assert any("json_keys" in vid for vid in view_ids) + + +def test_ingest_tool_result_auto_views_small_output() -> None: + """Small outputs also get auto-generated views and artifact refs.""" + mgr = ContextManager() + data = json.dumps({"status": "ok"}) + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-av2", + raw_output=data, + tool_name="ping", + media_type="application/json", + firewall_threshold=5000, + ) + assert len(env.views) > 0 + assert len(env.artifacts) > 0 + assert _item.artifact_ref is not None + + +def test_ingest_tool_result_views_drilldown_chain() -> None: + """Views from ingest can be used to drive drilldown calls.""" + mgr = ContextManager() + data = json.dumps({"alpha": "aaa", "beta": "bbb", "gamma": "ccc"}) + _item, env = mgr.ingest_tool_result( + tool_call_id="tc-chain", + raw_output=data, + tool_name="get_data", + media_type="application/json", + firewall_threshold=10, + ) + # Pick a view and use its selector to drilldown + key_views = [v for v in env.views if v.selector.get("type") == "json_keys"] + assert len(key_views) > 0 + view = key_views[0] + handle = env.artifacts[0].handle + result = mgr.drilldown(handle, view.selector) + assert len(result) > 0 + parsed = json.loads(result) + assert isinstance(parsed, dict) + + +def test_view_registry_accessible() -> None: + """ContextManager exposes view_registry property.""" + mgr = ContextManager() + from contextweaver.context.views import ViewRegistry + + assert isinstance(mgr.view_registry, ViewRegistry) diff --git a/tests/test_views.py b/tests/test_views.py new file mode 100644 index 0000000..27aadb9 --- /dev/null +++ b/tests/test_views.py @@ -0,0 +1,343 @@ +"""Tests for contextweaver.context.views.""" + +from __future__ import annotations + +import json + +from contextweaver.context.views import ( + ViewRegistry, + _binary_views, + _csv_views, + _detect_content_type, + _json_views, + _looks_like_csv, + _text_views, + drilldown_tool_spec, + generate_views, +) +from contextweaver.types import ArtifactRef, SelectableItem, ViewSpec + + +def _ref( + handle: str = "artifact:test", + media_type: str = "text/plain", + size_bytes: int = 100, +) -> ArtifactRef: + return ArtifactRef(handle=handle, media_type=media_type, size_bytes=size_bytes) + + +# ------------------------------------------------------------------ +# JSON view generation +# ------------------------------------------------------------------ + + +def test_json_views_dict_keys() -> None: + ref = _ref(media_type="application/json") + data = json.dumps({"name": "Alice", "age": 30, "role": "admin"}).encode() + views = _json_views(ref, data) + # Should have: 1 all-keys view + 3 individual key views + assert len(views) == 4 + all_keys = views[0] + assert "json_keys" in all_keys.view_id + assert all_keys.selector["type"] == "json_keys" + assert sorted(all_keys.selector["keys"]) == ["age", "name", "role"] + + +def test_json_views_large_dict_includes_head() -> None: + ref = _ref(media_type="application/json", size_bytes=500) + obj = {f"key_{i}": f"value_{i}" for i in range(20)} + data = json.dumps(obj).encode() + views = _json_views(ref, data) + # 1 all-keys + 10 individual keys (capped) + 1 head view + assert any(v.view_id.endswith(":head") for v in views) + + +def test_json_views_array() -> None: + ref = _ref(media_type="application/json") + data = json.dumps([{"id": i} for i in range(50)]).encode() + views = _json_views(ref, data) + assert any("array_head" in v.view_id for v in views) + + +def test_json_views_empty_dict() -> None: + ref = _ref(media_type="application/json") + data = b"{}" + views = _json_views(ref, data) + assert len(views) == 0 + + +def test_json_views_invalid_json() -> None: + ref = _ref(media_type="application/json") + data = b"not json at all" + views = _json_views(ref, data) + assert len(views) == 0 + + +# ------------------------------------------------------------------ +# CSV view generation +# ------------------------------------------------------------------ + + +def test_csv_views_basic() -> None: + ref = _ref(media_type="text/csv") + lines = ["name,age,role"] + [f"user{i},{20 + i},admin" for i in range(20)] + data = "\n".join(lines).encode() + views = _csv_views(ref, data) + assert len(views) == 2 # head + tail + head = views[0] + assert head.selector["type"] == "rows" + assert head.selector["start"] == 0 + assert head.selector["end"] == 10 + + +def test_csv_views_short() -> None: + ref = _ref(media_type="text/csv") + data = b"name,age\nAlice,30\nBob,25" + views = _csv_views(ref, data) + assert len(views) == 1 # only head, no tail + + +def test_csv_views_empty() -> None: + ref = _ref(media_type="text/csv") + data = b"" + views = _csv_views(ref, data) + assert len(views) == 0 + + +# ------------------------------------------------------------------ +# Text view generation +# ------------------------------------------------------------------ + + +def test_text_views_short() -> None: + ref = _ref(media_type="text/plain") + data = b"line1\nline2\nline3" + views = _text_views(ref, data) + assert len(views) == 1 # only head + assert views[0].selector["type"] == "lines" + + +def test_text_views_long() -> None: + ref = _ref(media_type="text/plain") + lines = [f"line {i}" for i in range(50)] + data = "\n".join(lines).encode() + views = _text_views(ref, data) + assert len(views) == 2 # head + tail + head, tail = views[0], views[1] + assert head.selector["start"] == 0 + assert head.selector["end"] == 20 + assert tail.selector["start"] == 30 + assert tail.selector["end"] == 50 + + +def test_text_views_empty() -> None: + ref = _ref(media_type="text/plain") + data = b"" + views = _text_views(ref, data) + # Empty text has zero lines after splitlines + assert len(views) == 0 + + +# ------------------------------------------------------------------ +# Binary view generation +# ------------------------------------------------------------------ + + +def test_binary_views() -> None: + ref = _ref(media_type="image/png", size_bytes=2048) + data = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100 + views = _binary_views(ref, data) + assert len(views) == 1 + assert "Header" in views[0].label + assert "image/png" in views[0].label + assert views[0].selector == {"type": "head", "chars": 128} + + +# ------------------------------------------------------------------ +# Content-type detection +# ------------------------------------------------------------------ + + +def test_detect_json_from_octet_stream() -> None: + data = json.dumps({"key": "value"}).encode() + assert _detect_content_type(data, "application/octet-stream") == "application/json" + + +def test_detect_csv_from_octet_stream() -> None: + data = b"name,age,role\nAlice,30,admin\nBob,25,user" + assert _detect_content_type(data, "application/octet-stream") == "text/csv" + + +def test_detect_plain_text_from_octet_stream() -> None: + data = b"just some plain text without any structure" + assert _detect_content_type(data, "application/octet-stream") == "text/plain" + + +def test_detect_image_passthrough() -> None: + data = b"\x89PNG\r\n\x1a\n" + assert _detect_content_type(data, "image/png") == "image/png" + + +def test_detect_explicit_json() -> None: + data = b'{"key": "value"}' + assert _detect_content_type(data, "application/json") == "application/json" + + +def test_detect_binary_content() -> None: + data = bytes(range(256)) + assert _detect_content_type(data, "application/octet-stream") == "application/octet-stream" + + +# ------------------------------------------------------------------ +# CSV heuristic +# ------------------------------------------------------------------ + + +def test_looks_like_csv_positive() -> None: + text = "name,age,role\nAlice,30,admin\nBob,25,user" + assert _looks_like_csv(text) is True + + +def test_looks_like_csv_single_line() -> None: + assert _looks_like_csv("just one line") is False + + +def test_looks_like_csv_plain_text() -> None: + text = "abc\ndef" + # Single words per line — no delimiter for csv.Sniffer to detect + assert _looks_like_csv(text) is False + + +# ------------------------------------------------------------------ +# ViewRegistry +# ------------------------------------------------------------------ + + +def test_registry_defaults() -> None: + reg = ViewRegistry() + ref = _ref(media_type="application/json") + data = json.dumps({"a": 1}).encode() + views = reg.generate_views(ref, data) + assert len(views) > 0 + assert all(isinstance(v, ViewSpec) for v in views) + + +def test_registry_custom_generator() -> None: + reg = ViewRegistry() + + def my_generator(ref: ArtifactRef, data: bytes) -> list[ViewSpec]: + return [ + ViewSpec( + view_id=f"{ref.handle}:custom", + label="Custom view", + selector={"type": "head", "chars": 100}, + artifact_ref=ref, + ) + ] + + reg.register("application/xml", my_generator) + ref = _ref(media_type="application/xml") + views = reg.generate_views(ref, b"") + assert len(views) == 1 + assert views[0].label == "Custom view" + + +def test_registry_override_builtin() -> None: + reg = ViewRegistry() + + def override(ref: ArtifactRef, data: bytes) -> list[ViewSpec]: + return [ViewSpec(view_id="override", label="Overridden")] + + reg.register("application/json", override) + ref = _ref(media_type="application/json") + views = reg.generate_views(ref, b'{"a": 1}') + assert len(views) == 1 + assert views[0].label == "Overridden" + + +def test_registry_fallback_to_binary() -> None: + reg = ViewRegistry() + ref = _ref(media_type="application/x-custom-binary") + views = reg.generate_views(ref, b"\x00\x01\x02") + assert len(views) == 1 + assert "Header" in views[0].label + + +def test_registry_prefix_match() -> None: + """Text-like MIME types should fall back to text generator via prefix match.""" + reg = ViewRegistry() + ref = _ref(media_type="text/markdown") + data = "\n".join(f"line {i}" for i in range(30)).encode() + views = reg.generate_views(ref, data) + assert len(views) >= 1 + assert views[0].selector["type"] == "lines" + + +# ------------------------------------------------------------------ +# generate_views() convenience function +# ------------------------------------------------------------------ + + +def test_generate_views_default_registry() -> None: + ref = _ref(media_type="text/plain") + data = b"hello\nworld\nfoo" + views = generate_views(ref, data) + assert len(views) >= 1 + + +def test_generate_views_custom_registry() -> None: + reg = ViewRegistry() + reg.register("text/plain", lambda r, d: [ViewSpec(view_id="x", label="X")]) + ref = _ref(media_type="text/plain") + views = generate_views(ref, b"data", registry=reg) + assert len(views) == 1 + assert views[0].label == "X" + + +# ------------------------------------------------------------------ +# ViewSpec serialisation round-trip +# ------------------------------------------------------------------ + + +def test_viewspec_round_trip() -> None: + ref = _ref() + vs = ViewSpec( + view_id="v1", + label="Test", + selector={"type": "head", "chars": 100}, + artifact_ref=ref, + ) + d = vs.to_dict() + restored = ViewSpec.from_dict(d) + assert restored.view_id == vs.view_id + assert restored.label == vs.label + assert restored.selector == vs.selector + assert restored.artifact_ref is not None + assert restored.artifact_ref.handle == ref.handle + + +# ------------------------------------------------------------------ +# drilldown_tool_spec +# ------------------------------------------------------------------ + + +def test_drilldown_tool_spec_returns_selectable_item() -> None: + spec = drilldown_tool_spec() + assert isinstance(spec, SelectableItem) + assert spec.kind == "internal" + assert spec.name == "drilldown" + assert "handle" in spec.args_schema.get("properties", {}) + assert "selector" in spec.args_schema.get("properties", {}) + + +def test_drilldown_tool_spec_deterministic() -> None: + s1 = drilldown_tool_spec() + s2 = drilldown_tool_spec() + assert s1.id == s2.id + assert s1.to_dict() == s2.to_dict() + + +def test_drilldown_tool_spec_tags() -> None: + spec = drilldown_tool_spec() + assert "drilldown" in spec.tags + assert "progressive-disclosure" in spec.tags