Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions headroom/transforms/read_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@
logger = logging.getLogger(__name__)


def _format_read_lifecycle_transform(classification: ReadClassification) -> str:
"""Format a read_lifecycle transform tag including the source file path.

Shape: ``read_lifecycle:<state>:<file_path>``. Consumers splitting on ``:``
must bound the split to 3 parts so paths containing ``:`` are preserved.
"""
path = classification.file_path or ""
return f"read_lifecycle:{classification.state.value}:{path}"


class ReadState(str, Enum):
"""Lifecycle state of a Read output."""

Expand Down Expand Up @@ -381,7 +391,7 @@ def _apply_lifecycle(
replaced, marker, ccr_hash = self._replace_content(content, classification)
if replaced:
result_messages.append({**msg, "content": marker})
transforms.append(f"read_lifecycle:{classification.state.value}")
transforms.append(_format_read_lifecycle_transform(classification))
if ccr_hash:
ccr_hashes.append(ccr_hash)
bytes_before += len(content.encode("utf-8"))
Expand Down Expand Up @@ -435,7 +445,7 @@ def _process_anthropic_blocks(
replaced, marker, ccr_hash = self._replace_content(tool_content, classification)
if replaced:
new_blocks.append({**block, "content": marker})
transforms.append(f"read_lifecycle:{classification.state.value}")
transforms.append(_format_read_lifecycle_transform(classification))
if ccr_hash:
ccr_hashes.append(ccr_hash)
any_replaced = True
Expand Down
53 changes: 52 additions & 1 deletion headroom/transforms/tool_crusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,44 @@
logger = logging.getLogger(__name__)


def _build_tool_name_index(messages: list[dict[str, Any]]) -> dict[str, str]:
"""Map tool_call_id/tool_use_id → tool name across OpenAI + Anthropic formats.

Skips entries where id or name is missing; those calls still crush, but
won't contribute a tool-name to the ``tool_crush`` tag.
"""
index: dict[str, str] = {}
for msg in messages:
if msg.get("role") != "assistant":
continue
for tc in msg.get("tool_calls") or []:
tc_id = tc.get("id")
name = (tc.get("function") or {}).get("name")
if tc_id and name:
index[tc_id] = name
content = msg.get("content")
if isinstance(content, list):
for block in content:
if not isinstance(block, dict) or block.get("type") != "tool_use":
continue
bid = block.get("id")
name = block.get("name")
if bid and name:
index[bid] = name
return index


def _format_tool_crush_transform(count: int, tool_names: list[str]) -> str:
"""Format ``tool_crush:<count>[:<name1,name2,...>]``.

Names are included when known so consumers can show what was crushed. Empty
names fall back to the count-only form for backwards compatibility.
"""
if tool_names:
return f"tool_crush:{count}:{','.join(tool_names)}"
return f"tool_crush:{count}"


class ToolCrusher(Transform):
"""
Compress tool output to reduce token usage.
Expand Down Expand Up @@ -102,6 +140,15 @@ def apply(
warnings: list[str] = []

crushed_count = 0
crushed_tool_names: list[str] = []
seen_tool_names: set[str] = set()
tool_names_by_id = _build_tool_name_index(result_messages)

def _record(tool_id: str) -> None:
name = tool_names_by_id.get(tool_id)
if name and name not in seen_tool_names:
seen_tool_names.add(name)
crushed_tool_names.append(name)

for msg in result_messages:
# OpenAI style: role="tool"
Expand Down Expand Up @@ -130,6 +177,7 @@ def apply(
msg["content"] = crushed + "\n" + marker
crushed_count += 1
markers_inserted.append(marker)
_record(tool_call_id)

# Anthropic style: role="user" with tool_result content blocks
content = msg.get("content")
Expand Down Expand Up @@ -165,9 +213,12 @@ def apply(
content[i]["content"] = crushed + "\n" + marker
crushed_count += 1
markers_inserted.append(marker)
_record(tool_use_id)

if crushed_count > 0:
transforms_applied.append(f"tool_crush:{crushed_count}")
transforms_applied.append(
_format_tool_crush_transform(crushed_count, crushed_tool_names)
)
logger.info(
"ToolCrusher: compressed %d tool outputs, %d -> %d tokens",
crushed_count,
Expand Down
74 changes: 74 additions & 0 deletions tests/test_transforms/test_read_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,80 @@ def test_transforms_recorded(self):
stale_transforms = [t for t in result.transforms_applied if "stale" in t]
assert len(stale_transforms) == 2 # Both reads are stale

def test_transform_tag_includes_file_path_openai(self):
"""OpenAI-format tag shape is ``read_lifecycle:<state>:<file_path>``."""
config = ReadLifecycleConfig(enabled=True)
mgr = ReadLifecycleManager(config)
messages = [
make_openai_read("r1", "/src/app.py"),
make_openai_tool_result("r1", LARGE_CONTENT),
make_openai_edit("e1", "/src/app.py"),
make_openai_tool_result("e1", "done"),
]

result = mgr.apply(messages)
assert "read_lifecycle:stale:/src/app.py" in result.transforms_applied

def test_transform_tag_includes_file_path_anthropic(self):
"""Anthropic-format tag shape matches OpenAI tag shape."""
config = ReadLifecycleConfig(enabled=True)
mgr = ReadLifecycleManager(config)
messages = [
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "r1",
"name": "Read",
"input": {"file_path": "/src/notes.md"},
}
],
},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "r1", "content": LARGE_CONTENT}],
},
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "e1",
"name": "Edit",
"input": {
"file_path": "/src/notes.md",
"old_string": "old",
"new_string": "new",
},
}
],
},
{
"role": "user",
"content": [{"type": "tool_result", "tool_use_id": "e1", "content": "done"}],
},
]

result = mgr.apply(messages)
assert "read_lifecycle:stale:/src/notes.md" in result.transforms_applied

def test_transform_tag_preserves_colons_in_path(self):
"""Paths containing ``:`` survive — consumers must bound their split."""
config = ReadLifecycleConfig(enabled=True)
mgr = ReadLifecycleManager(config)
weird_path = "/tmp/has:colon/file.py"
messages = [
make_openai_read("r1", weird_path),
make_openai_tool_result("r1", LARGE_CONTENT),
make_openai_edit("e1", weird_path),
make_openai_tool_result("e1", "done"),
]

result = mgr.apply(messages)
tag = next(t for t in result.transforms_applied if t.startswith("read_lifecycle:stale"))
assert tag.split(":", 2) == ["read_lifecycle", "stale", weird_path]


class TestNoFilePathHandling:
"""Reads without parseable file_path should be left alone."""
Expand Down
141 changes: 141 additions & 0 deletions tests/test_transforms/test_tool_crusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,147 @@ def test_digest_marker_added(self):
assert "<headroom:tool_digest" in tool_content
assert "sha256=" in tool_content

def test_transform_tag_includes_tool_names_openai(self):
"""Tag shape is ``tool_crush:<count>:<name1,name2>`` for OpenAI format."""
large_a = {"items": [{"id": i, "v": "x" * 10} for i in range(40)]}
large_b = {"rows": list(range(200))}

messages = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "c1",
"type": "function",
"function": {"name": "Bash", "arguments": "{}"},
},
{
"id": "c2",
"type": "function",
"function": {"name": "Grep", "arguments": "{}"},
},
],
},
{"role": "tool", "tool_call_id": "c1", "content": json.dumps(large_a)},
{"role": "tool", "tool_call_id": "c2", "content": json.dumps(large_b)},
]

crusher = ToolCrusher(ToolCrusherConfig(min_tokens_to_crush=10, max_array_items=3))
result = crusher.apply(messages, get_tokenizer())

tags = [t for t in result.transforms_applied if t.startswith("tool_crush:")]
assert len(tags) == 1
parts = tags[0].split(":", 2)
assert parts[0] == "tool_crush"
assert parts[1] == "2"
# Order follows first-crushed-first
assert parts[2] == "Bash,Grep"

def test_transform_tag_includes_tool_names_anthropic(self):
"""Anthropic tool_use blocks feed the tool-name index."""
large = {"items": [{"id": i, "v": "x" * 10} for i in range(40)]}
messages = [
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "u1", "name": "Read", "input": {}},
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "u1", "content": json.dumps(large)},
],
},
]

crusher = ToolCrusher(ToolCrusherConfig(min_tokens_to_crush=10, max_array_items=3))
result = crusher.apply(messages, get_tokenizer())

assert "tool_crush:1:Read" in result.transforms_applied

def test_transform_tag_dedupes_repeated_tool(self):
"""Same tool crushed twice shows once in the tag."""
large = {"items": [{"id": i, "v": "x" * 10} for i in range(40)]}

messages = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "c1",
"type": "function",
"function": {"name": "Bash", "arguments": "{}"},
},
{
"id": "c2",
"type": "function",
"function": {"name": "Bash", "arguments": "{}"},
},
],
},
{"role": "tool", "tool_call_id": "c1", "content": json.dumps(large)},
{"role": "tool", "tool_call_id": "c2", "content": json.dumps(large)},
]

crusher = ToolCrusher(ToolCrusherConfig(min_tokens_to_crush=10, max_array_items=3))
result = crusher.apply(messages, get_tokenizer())

assert "tool_crush:2:Bash" in result.transforms_applied

def test_tool_name_index_skips_entries_missing_id_or_name(self):
"""Guards: tool_calls / tool_use blocks missing id or name are skipped,
other blocks (text, etc.) are skipped, and the crushed tag still
reflects the entries that DO have both."""
large = {"items": [{"id": i, "v": "x" * 10} for i in range(40)]}
messages = [
{
"role": "assistant",
"content": [
# tool_use block with no id → skipped
{"type": "tool_use", "name": "NamelessRead"},
# tool_use block with no name → skipped
{"type": "tool_use", "id": "u0"},
# Non-tool_use block → skipped
{"type": "text", "text": "thinking..."},
# The one good entry
{"type": "tool_use", "id": "u1", "name": "Grep", "input": {}},
],
# OpenAI-style tool_calls missing id/name → skipped
"tool_calls": [
{"id": "", "function": {"name": "Empty"}},
{"id": "c1", "function": {"name": ""}},
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "u1", "content": json.dumps(large)},
],
},
]

crusher = ToolCrusher(ToolCrusherConfig(min_tokens_to_crush=10, max_array_items=3))
result = crusher.apply(messages, get_tokenizer())

# Only Grep (u1) had both id + name AND was actually crushed.
assert "tool_crush:1:Grep" in result.transforms_applied

def test_transform_tag_falls_back_when_no_names(self):
"""Crushed tool with no resolvable name keeps legacy ``tool_crush:<n>`` shape."""
large = {"items": [{"id": i, "v": "x" * 10} for i in range(40)]}
# No assistant message → no name index entries.
messages = [
{"role": "tool", "tool_call_id": "orphan", "content": json.dumps(large)},
]

crusher = ToolCrusher(ToolCrusherConfig(min_tokens_to_crush=10, max_array_items=3))
result = crusher.apply(messages, get_tokenizer())

assert "tool_crush:1" in result.transforms_applied

def test_non_tool_messages_unchanged(self):
"""Non-tool messages should not be modified."""
messages = [
Expand Down
Loading