From 32572e07984e486993623e24729a8551e2c055e5 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Fri, 17 Apr 2026 18:33:13 +0200 Subject: [PATCH 1/4] feat: add followup-based context compaction Ref #43 --- followup_compaction.py | 211 ++++++++++++++++++++++++++++++ tests/test_followup_compaction.py | 176 +++++++++++++++++++++++++ 2 files changed, 387 insertions(+) create mode 100644 followup_compaction.py create mode 100644 tests/test_followup_compaction.py diff --git a/followup_compaction.py b/followup_compaction.py new file mode 100644 index 0000000..56b545c --- /dev/null +++ b/followup_compaction.py @@ -0,0 +1,211 @@ +"""Follow-up compaction: stub past-turn tool_results before each API call. + +Non-destructive: produces a new message list, leaves `state.messages` intact +so persistence and resume keep the full history. +""" +from __future__ import annotations + +import html +import json +import re +import time +from typing import Iterable + +DEFAULT_EXEMPT_TOOLS = frozenset({"Edit", "Write", "TodoWrite"}) + + +def compact_tool_history( + messages: list, + keep_last_n_turns: int = 0, + exempt_tools: Iterable[str] = DEFAULT_EXEMPT_TOOLS, +) -> list: + """Return a NEW list where past-turn tool_result contents are replaced by stubs. + + A "turn" begins at a role='user' message. The current turn (from the last + user message onward) is always kept intact. + """ + exempt = frozenset(exempt_tools) + user_indices = [i for i, m in enumerate(messages) if m.get("role") == "user"] + if len(user_indices) <= keep_last_n_turns + 1: + return list(messages) + + cutoff = user_indices[-(keep_last_n_turns + 1)] + tool_call_lookup = _build_tool_call_lookup(messages) + + compacted = [] + for index, message in enumerate(messages): + if index >= cutoff: + compacted.append(message) + continue + role = message.get("role") + if role == "assistant" and message.get("tool_calls"): + stubbed = dict(message) + stubbed["content"] = compact_assistant_xml( + message["content"], message.get("tool_calls") + ) + compacted.append(stubbed) + continue + if role != "tool" or message.get("name") in exempt: + compacted.append(message) + continue + tool_call_id = message.get("tool_call_id", "") + name, inp = tool_call_lookup.get( + tool_call_id, (message.get("name", "tool"), {}) + ) + stubbed = dict(message) + stubbed["content"] = _build_stub(name, inp) + compacted.append(stubbed) + return compacted + + +def _build_tool_call_lookup(messages: list) -> dict: + lookup: dict = {} + for message in messages: + if message.get("role") != "assistant": + continue + for tool_call in message.get("tool_calls") or []: + lookup[tool_call.get("id", "")] = ( + tool_call.get("name", ""), + tool_call.get("input") or {}, + ) + return lookup + + +def _escape_xml_attr(value: str) -> str: + return html.escape(value, quote=False).replace('"', '"') + + +def _build_stub(name: str, input_dict: dict) -> str: + brief = _input_brief(name, input_dict) + return f'' + + +def _input_brief(name: str, inp: dict) -> str: + if name == "Read": + path = inp.get("file_path", "?") + parts = [f"file_path={path}"] + if "offset" in inp: + parts.append(f"offset={inp['offset']}") + if "limit" in inp: + parts.append(f"limit={inp['limit']}") + return ", ".join(parts) + if name == "Bash": + cmd = (inp.get("command") or "").replace("\n", " ") + if len(cmd) > 100: + cmd = cmd[:97] + "..." + return f"command={cmd!r}" + if name == "Grep": + parts = [f"pattern={inp.get('pattern', '?')!r}"] + if "path" in inp: + parts.append(f"path={inp['path']}") + return ", ".join(parts) + if name == "Glob": + return f"pattern={inp.get('pattern', '?')!r}" + try: + rendered = json.dumps(inp, ensure_ascii=False) + except (TypeError, ValueError): + rendered = str(inp) + if len(rendered) > 120: + rendered = rendered[:117] + "..." + return rendered + + +def _build_tc_lookup(tool_calls: list | None) -> dict: + lookup: dict = {} + for tc in tool_calls or []: + tid = tc.get("id", "") + if tid: + lookup[tid] = (tc.get("name", "tool"), tc.get("input") or {}) + return lookup + + +def _xml_replacer(tc_lookup: dict, target_ids: set | None = None): + def _replacer(match): + name, tid = match.group(1), match.group(2) + if target_ids is not None and tid not in target_ids: + return match.group(0) + tc_name, tc_input = tc_lookup.get(tid, (name, {})) + brief = _input_brief(tc_name, tc_input) + return f'' + return _replacer + + +_TOOL_USE_RE = re.compile( + r']*>.*?', + re.DOTALL, +) + + +def compact_assistant_xml(content: str, tool_calls: list | None = None) -> str: + """Replace ALL inline XML tool_use blocks with one-line summaries.""" + if not content or " str: + """Replace only XML blocks whose id is in target_ids, leaving others intact.""" + if not content or " list: + """Apply follow-up compaction + model-driven GC, then inject working memory notes.""" + if not config.get("followup_compaction_enabled", True): + compacted = list(state.messages) + else: + keep = config.get("followup_keep_last_n_turns", 0) + exempt = config.get("followup_exempt_tools", DEFAULT_EXEMPT_TOOLS) + compacted = compact_tool_history(state.messages, keep_last_n_turns=keep, exempt_tools=exempt) + + from compaction import estimate_tokens + tokens_before = estimate_tokens(state.messages) + tokens_after = estimate_tokens(compacted) + if tokens_before != tokens_after: + state.compaction_log.append({ + "event": "followup_compact", + "timestamp": time.time(), + "turn": getattr(state, "turn_count", 0), + "tokens_est_before": tokens_before, + "tokens_est_after": tokens_after, + "tokens_est_saved": tokens_before - tokens_after, + }) + + return _apply_context_gc(compacted, state) + + +def _apply_context_gc(messages: list, state) -> list: + """Apply model-driven GC decisions and inject working memory notes.""" + try: + from context_gc import apply_gc, inject_notes, prepend_verbatim_audit + except ImportError: + return messages + gc_state = getattr(state, 'gc_state', None) + if not gc_state: + return prepend_verbatim_audit(messages) + if not gc_state.trashed_ids and not gc_state.snippets and not gc_state.notes: + return prepend_verbatim_audit(messages) + + from compaction import estimate_tokens + tokens_before = estimate_tokens(messages) + result = apply_gc(messages, gc_state) + result = inject_notes(result, gc_state.notes) + tokens_after = estimate_tokens(result) + if tokens_before != tokens_after: + state.compaction_log.append({ + "event": "context_gc", + "timestamp": time.time(), + "turn": getattr(state, "turn_count", 0), + "trashed_count": len(gc_state.trashed_ids), + "snippet_count": len(gc_state.snippets), + "notes_count": len(gc_state.notes), + "tokens_est_saved": tokens_before - tokens_after, + }) + return prepend_verbatim_audit(result) diff --git a/tests/test_followup_compaction.py b/tests/test_followup_compaction.py new file mode 100644 index 0000000..c352e9b --- /dev/null +++ b/tests/test_followup_compaction.py @@ -0,0 +1,176 @@ +"""Tests for followup_compaction module.""" +import pytest + +from followup_compaction import ( + compact_tool_history, _build_tool_call_lookup, _build_stub, + _input_brief, _escape_xml_attr, + compact_assistant_xml, compact_assistant_xml_selective, + DEFAULT_EXEMPT_TOOLS, +) + + +class TestCompactToolHistory: + def _make_messages(self): + return [ + {"role": "user", "content": "turn 1"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "tc1", "name": "Read", "input": {"file_path": "/a.py"}}, + ]}, + {"role": "tool", "tool_call_id": "tc1", "name": "Read", "content": "file contents..."}, + {"role": "user", "content": "turn 2"}, + {"role": "assistant", "content": "done"}, + ] + + def test_stubs_old_tool_results(self): + msgs = self._make_messages() + result = compact_tool_history(msgs) + assert "") + + def test_quote(self): + assert """ in _escape_xml_attr('"hello"') + + +class TestCompactAssistantXml: + def test_replaces_tool_use_blocks(self): + content = 'text before /a.py text after' + tool_calls = [{"id": "r1", "name": "Read", "input": {"file_path": "/a.py"}}] + result = compact_assistant_xml(content, tool_calls) + assert "x' + 'y' + ) + tool_calls = [ + {"id": "r1", "name": "Read", "input": {"file_path": "/a.py"}}, + {"id": "r2", "name": "Grep", "input": {"pattern": "x"}}, + ] + result = compact_assistant_xml_selective(content, tool_calls, {"r1"}) + assert " Date: Sat, 18 Apr 2026 09:30:21 +0200 Subject: [PATCH 2/4] fix: add ImportError guard for context_gc import (standalone compat) --- followup_compaction.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/followup_compaction.py b/followup_compaction.py index 56b545c..bb174eb 100644 --- a/followup_compaction.py +++ b/followup_compaction.py @@ -184,7 +184,10 @@ def build_messages_for_api(state, config: dict) -> list: def _apply_context_gc(messages: list, state) -> list: """Apply model-driven GC decisions and inject working memory notes.""" try: - from context_gc import apply_gc, inject_notes, prepend_verbatim_audit + try: + from context_gc import apply_gc + except ImportError: + return messages # context_gc not available yet, skip, inject_notes, prepend_verbatim_audit except ImportError: return messages gc_state = getattr(state, 'gc_state', None) From 325b34d91613f35420ad5fc9e20a75b63eac0f21 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sun, 19 Apr 2026 14:39:48 +0200 Subject: [PATCH 3/4] refactor: drop XML-compaction dead code + fix _apply_context_gc imports Remove compact_assistant_xml, compact_assistant_xml_selective, _xml_replacer, _build_tc_lookup and _TOOL_USE_RE. These functions compact inline ... XML blocks inside assistant message content, which only exist on providers that don't natively support tool_use blocks (e.g. AWS Bedrock socle in bouzecode). Upstream cheetahclaws uses the native Anthropic content: [{"type":"tool_use", ...}] format, so these functions early-returned on every call and the compact_tool_history branch that invoked compact_assistant_xml was a no-op. Also fix _apply_context_gc which was wrapped in a double try/except where the outer pass was unreachable, and which imported only apply_gc while referencing inject_notes and prepend_verbatim_audit (NameError when gc_state had entries). Replaced with a single try that imports all three names and cleanly returns on ImportError if PR #55 isn't deployed alongside. Test file drops the TestCompactAssistantXml / TestCompactAssistantXmlSelective classes that exercised the removed functions. Co-Authored-By: Claude Opus 4.7 (1M context) --- followup_compaction.py | 66 ++++--------------------------- tests/test_followup_compaction.py | 38 ------------------ 2 files changed, 7 insertions(+), 97 deletions(-) diff --git a/followup_compaction.py b/followup_compaction.py index bb174eb..a5eb61d 100644 --- a/followup_compaction.py +++ b/followup_compaction.py @@ -7,7 +7,6 @@ import html import json -import re import time from typing import Iterable @@ -38,13 +37,6 @@ def compact_tool_history( compacted.append(message) continue role = message.get("role") - if role == "assistant" and message.get("tool_calls"): - stubbed = dict(message) - stubbed["content"] = compact_assistant_xml( - message["content"], message.get("tool_calls") - ) - compacted.append(stubbed) - continue if role != "tool" or message.get("name") in exempt: compacted.append(message) continue @@ -110,52 +102,6 @@ def _input_brief(name: str, inp: dict) -> str: return rendered -def _build_tc_lookup(tool_calls: list | None) -> dict: - lookup: dict = {} - for tc in tool_calls or []: - tid = tc.get("id", "") - if tid: - lookup[tid] = (tc.get("name", "tool"), tc.get("input") or {}) - return lookup - - -def _xml_replacer(tc_lookup: dict, target_ids: set | None = None): - def _replacer(match): - name, tid = match.group(1), match.group(2) - if target_ids is not None and tid not in target_ids: - return match.group(0) - tc_name, tc_input = tc_lookup.get(tid, (name, {})) - brief = _input_brief(tc_name, tc_input) - return f'' - return _replacer - - -_TOOL_USE_RE = re.compile( - r']*>.*?', - re.DOTALL, -) - - -def compact_assistant_xml(content: str, tool_calls: list | None = None) -> str: - """Replace ALL inline XML tool_use blocks with one-line summaries.""" - if not content or " str: - """Replace only XML blocks whose id is in target_ids, leaving others intact.""" - if not content or " list: """Apply follow-up compaction + model-driven GC, then inject working memory notes.""" if not config.get("followup_compaction_enabled", True): @@ -182,12 +128,14 @@ def build_messages_for_api(state, config: dict) -> list: def _apply_context_gc(messages: list, state) -> list: - """Apply model-driven GC decisions and inject working memory notes.""" + """Apply model-driven GC decisions and inject working memory notes. + + Falls back to returning messages unchanged when the context_gc module is + absent (this PR can ship independently of PR #55). The import is narrow: + only ImportError is swallowed; any other error propagates. + """ try: - try: - from context_gc import apply_gc - except ImportError: - return messages # context_gc not available yet, skip, inject_notes, prepend_verbatim_audit + from context_gc import apply_gc, inject_notes, prepend_verbatim_audit except ImportError: return messages gc_state = getattr(state, 'gc_state', None) diff --git a/tests/test_followup_compaction.py b/tests/test_followup_compaction.py index c352e9b..024a803 100644 --- a/tests/test_followup_compaction.py +++ b/tests/test_followup_compaction.py @@ -4,7 +4,6 @@ from followup_compaction import ( compact_tool_history, _build_tool_call_lookup, _build_stub, _input_brief, _escape_xml_attr, - compact_assistant_xml, compact_assistant_xml_selective, DEFAULT_EXEMPT_TOOLS, ) @@ -122,43 +121,6 @@ def test_quote(self): assert """ in _escape_xml_attr('"hello"') -class TestCompactAssistantXml: - def test_replaces_tool_use_blocks(self): - content = 'text before /a.py text after' - tool_calls = [{"id": "r1", "name": "Read", "input": {"file_path": "/a.py"}}] - result = compact_assistant_xml(content, tool_calls) - assert "x' - 'y' - ) - tool_calls = [ - {"id": "r1", "name": "Read", "input": {"file_path": "/a.py"}}, - {"id": "r2", "name": "Grep", "input": {"pattern": "x"}}, - ] - result = compact_assistant_xml_selective(content, tool_calls, {"r1"}) - assert " Date: Tue, 21 Apr 2026 09:52:44 +0200 Subject: [PATCH 4/4] feat: rewrite followup compaction - destruction-based, XML compaction, thinking strip, cache breakpoint --- followup_compaction.py | 366 ++++++++++++++-------- tests/test_followup_compaction.py | 501 ++++++++++++++++++++++++------ 2 files changed, 640 insertions(+), 227 deletions(-) diff --git a/followup_compaction.py b/followup_compaction.py index a5eb61d..5c225fc 100644 --- a/followup_compaction.py +++ b/followup_compaction.py @@ -1,162 +1,278 @@ -"""Follow-up compaction: stub past-turn tool_results before each API call. +"""Follow-up compaction: destroy past-turn tool content before each API call. -Non-destructive: produces a new message list, leaves `state.messages` intact -so persistence and resume keep the full history. +At each user turn boundary, ALL tool messages and assistant tool_calls from +prior turns are completely removed (no stubs). The current turn is always +kept intact. + +Non-destructive to state.messages -- produces a new list so persistence and +resume keep the full history. """ from __future__ import annotations import html -import json +import re import time -from typing import Iterable -DEFAULT_EXEMPT_TOOLS = frozenset({"Edit", "Write", "TodoWrite"}) + +_THINKING_BLOCK_RE = re.compile(r'.*?\s*', re.DOTALL) + + +_ARGS_PREFERRED_KEY = { + "Read": "file_path", "Edit": "file_path", "Write": "file_path", + "NotebookEdit": "notebook_path", + "Glob": "pattern", "Grep": "pattern", + "Bash": "command", + "WebFetch": "url", "WebSearch": "query", +} + + +def _escape_xml_attr(s: str) -> str: + return html.escape(str(s), quote=True) + + +def _input_brief(tool_name: str, input_dict: dict, max_len: int = 60) -> str: + if not input_dict: + return "" + val = input_dict.get(_ARGS_PREFERRED_KEY.get(tool_name, "")) + if val is None: + for v in input_dict.values(): + if isinstance(v, str) and v: + val = v + break + if val is None: + return "" + val = str(val).replace("\n", " ") + if len(val) > max_len: + val = val[: max_len - 3] + "..." + return val + + +def _build_tc_lookup(tool_calls: list | None) -> dict: + lookup: dict = {} + for tc in tool_calls or []: + tid = tc.get("id", "") + if tid: + lookup[tid] = (tc.get("name", "tool"), tc.get("input") or {}) + return lookup + + +def _xml_replacer(tc_lookup: dict, target_ids: set | None = None): + def _replacer(match): + name, tid = match.group(1), match.group(2) + if target_ids is not None and tid not in target_ids: + return match.group(0) + tc_name, tc_input = tc_lookup.get(tid, (name, {})) + brief = _input_brief(tc_name, tc_input) + return f'' + return _replacer -def compact_tool_history( - messages: list, - keep_last_n_turns: int = 0, - exempt_tools: Iterable[str] = DEFAULT_EXEMPT_TOOLS, -) -> list: - """Return a NEW list where past-turn tool_result contents are replaced by stubs. +_TOOL_USE_RE = re.compile( + r']*>.*?', + re.DOTALL, +) - A "turn" begins at a role='user' message. The current turn (from the last - user message onward) is always kept intact. + +def compact_assistant_xml(content: str, tool_calls: list | None = None) -> str: + """Replace ALL inline XML tool_use blocks with one-line summaries.""" + if not content or " str: + """Replace only XML blocks whose id is in target_ids, leaving others intact.""" + if not content or " bool: + if user_idx == 0: + return True + prev = messages[user_idx - 1] + return prev.get("role") == "assistant" and not prev.get("tool_calls") + + +def compact_tool_history(messages: list, keep_last_n_turns: int = 0) -> list: + """Completely remove prior-turn tool content. + + At user turn boundaries, ALL tool messages and assistant tool_calls from + prior turns are destroyed (no stubs). Assistant messages that become empty + after stripping are also removed. + + The current turn (last ``keep_last_n_turns + 1`` user messages onward) is + kept intact. """ - exempt = frozenset(exempt_tools) user_indices = [i for i, m in enumerate(messages) if m.get("role") == "user"] - if len(user_indices) <= keep_last_n_turns + 1: + if not user_indices: return list(messages) - cutoff = user_indices[-(keep_last_n_turns + 1)] - tool_call_lookup = _build_tool_call_lookup(messages) + valid_boundaries = [i for i in user_indices + if _is_completed_boundary(messages, i)] + + total_keep = keep_last_n_turns + 1 + if total_keep >= len(valid_boundaries): + return list(messages) - compacted = [] - for index, message in enumerate(messages): - if index >= cutoff: - compacted.append(message) + current_turn_start = valid_boundaries[-total_keep] + + result = [] + for i, msg in enumerate(messages): + if i >= current_turn_start: + result.append(msg) continue - role = message.get("role") - if role != "tool" or message.get("name") in exempt: - compacted.append(message) + + role = msg.get("role") + + if role == "tool": continue - tool_call_id = message.get("tool_call_id", "") - name, inp = tool_call_lookup.get( - tool_call_id, (message.get("name", "tool"), {}) - ) - stubbed = dict(message) - stubbed["content"] = _build_stub(name, inp) - compacted.append(stubbed) - return compacted + if role == "user": + result.append(msg) + continue -def _build_tool_call_lookup(messages: list) -> dict: - lookup: dict = {} - for message in messages: - if message.get("role") != "assistant": + if role == "assistant": + tool_calls = msg.get("tool_calls") + content = msg.get("content", "") or "" + + if tool_calls: + content = compact_assistant_xml(content, tool_calls) + cleaned = dict(msg) + cleaned.pop("tool_calls", None) + cleaned["content"] = content + if not content.strip(): + continue + result.append(cleaned) + else: + if content.strip(): + result.append(msg) continue - for tool_call in message.get("tool_calls") or []: - lookup[tool_call.get("id", "")] = ( - tool_call.get("name", ""), - tool_call.get("input") or {}, - ) - return lookup + result.append(msg) -def _escape_xml_attr(value: str) -> str: - return html.escape(value, quote=False).replace('"', '"') - - -def _build_stub(name: str, input_dict: dict) -> str: - brief = _input_brief(name, input_dict) - return f'' - - -def _input_brief(name: str, inp: dict) -> str: - if name == "Read": - path = inp.get("file_path", "?") - parts = [f"file_path={path}"] - if "offset" in inp: - parts.append(f"offset={inp['offset']}") - if "limit" in inp: - parts.append(f"limit={inp['limit']}") - return ", ".join(parts) - if name == "Bash": - cmd = (inp.get("command") or "").replace("\n", " ") - if len(cmd) > 100: - cmd = cmd[:97] + "..." - return f"command={cmd!r}" - if name == "Grep": - parts = [f"pattern={inp.get('pattern', '?')!r}"] - if "path" in inp: - parts.append(f"path={inp['path']}") - return ", ".join(parts) - if name == "Glob": - return f"pattern={inp.get('pattern', '?')!r}" - try: - rendered = json.dumps(inp, ensure_ascii=False) - except (TypeError, ValueError): - rendered = str(inp) - if len(rendered) > 120: - rendered = rendered[:117] + "..." - return rendered + return result -def build_messages_for_api(state, config: dict) -> list: - """Apply follow-up compaction + model-driven GC, then inject working memory notes.""" - if not config.get("followup_compaction_enabled", True): - compacted = list(state.messages) - else: - keep = config.get("followup_keep_last_n_turns", 0) - exempt = config.get("followup_exempt_tools", DEFAULT_EXEMPT_TOOLS) - compacted = compact_tool_history(state.messages, keep_last_n_turns=keep, exempt_tools=exempt) +def _mark_compaction_boundary(messages: list) -> None: + """Mark the last message before the current user turn with _cache_breakpoint. - from compaction import estimate_tokens - tokens_before = estimate_tokens(state.messages) - tokens_after = estimate_tokens(compacted) - if tokens_before != tokens_after: - state.compaction_log.append({ - "event": "followup_compact", - "timestamp": time.time(), - "turn": getattr(state, "turn_count", 0), - "tokens_est_before": tokens_before, - "tokens_est_after": tokens_after, - "tokens_est_saved": tokens_before - tokens_after, - }) + This tells messages_to_anthropic where to place cache_control so the + compacted prefix is cached and current-loop messages stay fresh. + """ + user_indices = [i for i, m in enumerate(messages) if m.get("role") == "user"] + if len(user_indices) < 2: + return + valid_boundaries = [] + for idx in user_indices: + if idx == 0: + valid_boundaries.append(idx) + else: + prev = messages[idx - 1] + role = prev.get("role") + if role == "assistant" and not prev.get("tool_calls"): + valid_boundaries.append(idx) + elif role == "user": + valid_boundaries.append(idx) + if len(valid_boundaries) < 2: + return + current_start = valid_boundaries[-1] + if current_start > 0: + messages[current_start - 1]["_cache_breakpoint"] = True - return _apply_context_gc(compacted, state) +def _strip_thinking_from_messages(messages: list) -> list: + """Remove ... blocks from assistant message content. + + Non-destructive: returns a new list with new dicts where needed. + Handles both string and list-of-blocks content formats. + """ + result = [] + for msg in messages: + if msg.get("role") != "assistant": + result.append(msg) + continue + content = msg.get("content", "") + if isinstance(content, str) and "" in content: + cleaned = _THINKING_BLOCK_RE.sub("", content) + result.append({**msg, "content": cleaned or "."}) + elif isinstance(content, list): + new_blocks = [] + changed = False + for block in content: + if isinstance(block, dict) and block.get("type") == "text" and "" in block.get("text", ""): + cleaned = _THINKING_BLOCK_RE.sub("", block["text"]) + new_blocks.append({**block, "text": cleaned or "."}) + changed = True + else: + new_blocks.append(block) + result.append({**msg, "content": new_blocks} if changed else msg) + else: + result.append(msg) + return result -def _apply_context_gc(messages: list, state) -> list: - """Apply model-driven GC decisions and inject working memory notes. - Falls back to returning messages unchanged when the context_gc module is - absent (this PR can ship independently of PR #55). The import is narrow: - only ImportError is swallowed; any other error propagates. +def build_messages_for_api(state, config: dict) -> list: + """Compact prior-turn tool content at user boundaries, then apply ContextGC. + + compact_tool_history runs on every build so the post-compaction prefix is + byte-stable across every call in a turn, not just the one that immediately + follows a user message. The function is idempotent: it always leaves the + last user turn intact and only touches prior-turn tool content. """ + compacted = compact_tool_history(list(state.messages)) + result = _apply_context_gc(compacted, state) try: - from context_gc import apply_gc, inject_notes, prepend_verbatim_audit + from context_gc import strip_trashed_stubs + result = strip_trashed_stubs(result) + except ImportError: + pass + result = _strip_thinking_from_messages(result) + _mark_compaction_boundary(result) + return result + + +def _apply_context_gc(messages: list, state) -> list: + """Apply model-driven GC decisions. Notes and audit info are injected + into the last user message in dispatch.py, keeping them out of system + blocks for Anthropic cache stability.""" + try: + from context_gc import apply_gc except ImportError: return messages gc_state = getattr(state, 'gc_state', None) if not gc_state: - return prepend_verbatim_audit(messages) - if not gc_state.trashed_ids and not gc_state.snippets and not gc_state.notes: - return prepend_verbatim_audit(messages) + return messages + if not gc_state.trashed_ids and not gc_state.snippets: + return messages + + try: + from compaction import estimate_tokens + tokens_before = estimate_tokens(messages) + except ImportError: + tokens_before = None - from compaction import estimate_tokens - tokens_before = estimate_tokens(messages) result = apply_gc(messages, gc_state) - result = inject_notes(result, gc_state.notes) - tokens_after = estimate_tokens(result) - if tokens_before != tokens_after: - state.compaction_log.append({ - "event": "context_gc", - "timestamp": time.time(), - "turn": getattr(state, "turn_count", 0), - "trashed_count": len(gc_state.trashed_ids), - "snippet_count": len(gc_state.snippets), - "notes_count": len(gc_state.notes), - "tokens_est_saved": tokens_before - tokens_after, - }) - return prepend_verbatim_audit(result) + + if tokens_before is not None: + try: + tokens_after = estimate_tokens(result) + if tokens_before != tokens_after and hasattr(state, 'compaction_log'): + state.compaction_log.append({ + "event": "context_gc", + "timestamp": time.time(), + "turn": getattr(state, "turn_count", 0), + "trashed_count": len(gc_state.trashed_ids), + "snippet_count": len(gc_state.snippets), + "notes_count": len(gc_state.notes), + "tokens_est_saved": tokens_before - tokens_after, + }) + except ImportError: + pass + return result diff --git a/tests/test_followup_compaction.py b/tests/test_followup_compaction.py index 024a803..7c9cc4c 100644 --- a/tests/test_followup_compaction.py +++ b/tests/test_followup_compaction.py @@ -1,138 +1,435 @@ -"""Tests for followup_compaction module.""" -import pytest +"""Tests for followup_compaction.py.""" +from __future__ import annotations + +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from followup_compaction import ( - compact_tool_history, _build_tool_call_lookup, _build_stub, - _input_brief, _escape_xml_attr, - DEFAULT_EXEMPT_TOOLS, + compact_tool_history, + compact_assistant_xml, + compact_assistant_xml_selective, + build_messages_for_api, ) +def _turn(user_text: str, tool_calls: list, tool_results: list) -> list: + """Build [user, assistant+tool_calls, tool_result, tool_result, ...].""" + msgs = [{"role": "user", "content": user_text}] + msgs.append({"role": "assistant", "content": "ok", "tool_calls": tool_calls}) + msgs.extend(tool_results) + return msgs + + +HEAVY = "X" * 5000 +_TERM_ASST = {"role": "assistant", "content": "done"} + + class TestCompactToolHistory: - def _make_messages(self): - return [ - {"role": "user", "content": "turn 1"}, - {"role": "assistant", "content": "ok", "tool_calls": [ - {"id": "tc1", "name": "Read", "input": {"file_path": "/a.py"}}, + def test_removes_old_tool_messages(self): + history = ( + _turn( + "first question", + [{"id": "t1", "name": "Read", "input": {"file_path": "a.py"}}], + [{"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}], + ) + + [{"role": "assistant", "content": "done", "tool_calls": []}] + + _turn( + "follow up", + [{"id": "t2", "name": "Read", "input": {"file_path": "b.py"}}], + [{"role": "tool", "tool_call_id": "t2", "name": "Read", "content": HEAVY}], + ) + ) + out = compact_tool_history(history, keep_last_n_turns=0) + prior_tools = [m for m in out if m.get("role") == "tool" and m.get("tool_call_id") == "t1"] + assert len(prior_tools) == 0 + current_tools = [m for m in out if m.get("role") == "tool" and m.get("tool_call_id") == "t2"] + assert len(current_tools) == 1 + assert current_tools[0]["content"] == HEAVY + + def test_strips_tool_calls_from_prior_assistant(self): + history = ( + _turn( + "first", + [{"id": "t1", "name": "Read", "input": {"file_path": "a.py"}}], + [{"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}], + ) + + [_TERM_ASST] + + [{"role": "user", "content": "follow"}] + ) + out = compact_tool_history(history, keep_last_n_turns=0) + prior_assistants = [m for m in out if m.get("role") == "assistant"] + for a in prior_assistants: + assert "tool_calls" not in a + + def test_current_turn_intact(self): + history = ( + _turn( + "first", + [{"id": "t1", "name": "Read", "input": {"file_path": "a.py"}}], + [{"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}], + ) + + [_TERM_ASST] + + _turn( + "current", + [{"id": "t2", "name": "Bash", "input": {"command": "ls"}}], + [{"role": "tool", "tool_call_id": "t2", "name": "Bash", "content": HEAVY}], + ) + ) + out = compact_tool_history(history, keep_last_n_turns=0) + assert out[-1]["content"] == HEAVY + + def test_keep_last_n_turns_1(self): + history = ( + _turn( + "first", + [{"id": "t1", "name": "Read", "input": {"file_path": "a.py"}}], + [{"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}], + ) + + [_TERM_ASST] + + _turn( + "second", + [{"id": "t2", "name": "Read", "input": {"file_path": "b.py"}}], + [{"role": "tool", "tool_call_id": "t2", "name": "Read", "content": HEAVY}], + ) + + [_TERM_ASST] + + _turn( + "third (current)", + [{"id": "t3", "name": "Read", "input": {"file_path": "c.py"}}], + [{"role": "tool", "tool_call_id": "t3", "name": "Read", "content": HEAVY}], + ) + ) + out = compact_tool_history(history, keep_last_n_turns=1) + by_id = {m["tool_call_id"]: m for m in out if m.get("role") == "tool"} + assert "t1" not in by_id + assert by_id["t2"]["content"] == HEAVY + assert by_id["t3"]["content"] == HEAVY + + def test_removes_empty_assistant_after_stripping(self): + history = [ + {"role": "user", "content": "first"}, + {"role": "assistant", "content": "", "tool_calls": [ + {"id": "t1", "name": "Read", "input": {"file_path": "a.py"}} ]}, - {"role": "tool", "tool_call_id": "tc1", "name": "Read", "content": "file contents..."}, - {"role": "user", "content": "turn 2"}, + {"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}, + _TERM_ASST, + {"role": "user", "content": "follow"}, + ] + out = compact_tool_history(history, keep_last_n_turns=0) + assert len(out) == 3 + assert out[0]["content"] == "first" + assert out[1]["content"] == "done" + assert out[2]["content"] == "follow" + + def test_non_destructive(self): + original = _turn( + "q", + [{"id": "t1", "name": "Read", "input": {"file_path": "a.py"}}], + [{"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}], + ) + [_TERM_ASST, {"role": "user", "content": "follow"}] + snapshot_content = original[2]["content"] + compact_tool_history(original, keep_last_n_turns=0) + assert original[2]["content"] == snapshot_content + assert original[2]["content"] == HEAVY + + def test_no_compaction_when_only_one_turn(self): + history = _turn( + "only", + [{"id": "t1", "name": "Read", "input": {"file_path": "a.py"}}], + [{"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}], + ) + out = compact_tool_history(history, keep_last_n_turns=0) + assert out[2]["content"] == HEAVY + + +class _FakeState: + def __init__(self, messages): + self.messages = messages + self.compaction_log = [] + self.turn_count = 1 + + +class TestBuildMessagesForApi: + def test_mid_loop_compacts_prior_turns_keeps_current(self): + """Mid-loop (last msg is tool): prior turn gets compacted, current turn intact.""" + history = [ + {"role": "user", "content": "q"}, + {"role": "assistant", "content": "ok", "tool_calls": [{"id": "t1", "name": "Read", "input": {"file_path": "a.py"}}]}, + {"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}, {"role": "assistant", "content": "done"}, + {"role": "user", "content": "follow"}, + {"role": "assistant", "content": "ok", "tool_calls": [{"id": "t2", "name": "Read", "input": {"file_path": "b.py"}}]}, + {"role": "tool", "tool_call_id": "t2", "name": "Read", "content": HEAVY}, ] + state = _FakeState(history) + result = build_messages_for_api(state, {}) + tool_msgs = [m for m in result if m.get("role") == "tool"] + assert len(tool_msgs) == 1, "t1 (prior turn) must be compacted, t2 (current turn) kept" + assert HEAVY in tool_msgs[0]["content"] + assert tool_msgs[0]["tool_call_id"] == "t2" - def test_stubs_old_tool_results(self): - msgs = self._make_messages() - result = compact_tool_history(msgs) - assert "foo.py' + '\n\nMore text.' + ) + tool_calls = [{"id": "r1", "name": "Read", "input": {"file_path": "foo.py"}}] + result = compact_assistant_xml(content, tool_calls) + assert "Analysis here." in result + assert "More text." in result + assert 'a.py' + '\ntext\n' + 'ls' + ) + tool_calls = [ + {"id": "r1", "name": "Read", "input": {"file_path": "a.py"}}, + {"id": "b1", "name": "Bash", "input": {"command": "ls"}}, ] - result = compact_tool_history(msgs, keep_last_n_turns=1) - assert result[3]["content"] == "data2" + result = compact_assistant_xml(content, tool_calls) + assert '' + '' + '' + '' + ) + tool_calls = [{"id": "w1", "name": "Write", "input": {"file_path": "test.py"}}] + result = compact_assistant_xml(content, tool_calls) + assert 'a.py' + '\ntext\n' + 'b.py' + 'big code' + ) + tool_calls = [ + {"id": "r1", "name": "Read", "input": {"file_path": "a.py"}}, + {"id": "w1", "name": "Write", "input": {"file_path": "b.py"}}, + ] + result = compact_assistant_xml_selective(content, tool_calls, {"w1"}) + assert '' in result + assert 'a.py' + ) + history = [ + {"role": "user", "content": "first question"}, + {"role": "assistant", "content": xml_content, "tool_calls": [ + {"id": "r1", "name": "Read", "input": {"file_path": "a.py"}} + ]}, + {"role": "tool", "tool_call_id": "r1", "name": "Read", "content": HEAVY}, + _TERM_ASST, + {"role": "user", "content": "follow up"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "r2", "name": "Read", "input": {"file_path": "b.py"}} + ]}, + {"role": "tool", "tool_call_id": "r2", "name": "Read", "content": HEAVY}, + ] + out = compact_tool_history(history, keep_last_n_turns=0) + prior_asst = out[1] + assert '") +class TestInterruptedTurnPreservation: + """After Ctrl+C, the interrupted turn's tool results must survive compaction.""" - def test_quote(self): - assert """ in _escape_xml_attr('"hello"') + def test_interrupted_turn_not_compacted(self): + """No terminal assistant before user2 -> not a valid boundary -> no compaction.""" + history = [ + {"role": "user", "content": "first"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "t1", "name": "Read", "input": {"file_path": "a.py"}} + ]}, + {"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}, + {"role": "user", "content": "after interrupt"}, + ] + out = compact_tool_history(history, keep_last_n_turns=0) + tool_msgs = [m for m in out if m.get("role") == "tool"] + assert len(tool_msgs) == 1 + assert tool_msgs[0]["content"] == HEAVY + def test_completed_then_interrupted_preserves_interrupted(self): + """Completed turn compacted, interrupted turn preserved.""" + history = [ + {"role": "user", "content": "turn1"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "t1", "name": "Read", "input": {"file_path": "a.py"}} + ]}, + {"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}, + {"role": "assistant", "content": "done"}, + {"role": "user", "content": "turn2"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "t2", "name": "Bash", "input": {"command": "ls"}} + ]}, + {"role": "tool", "tool_call_id": "t2", "name": "Bash", "content": HEAVY}, + {"role": "user", "content": "after interrupt"}, + ] + out = compact_tool_history(history, keep_last_n_turns=0) + by_id = {m["tool_call_id"]: m for m in out if m.get("role") == "tool"} + assert "t1" not in by_id, "completed turn's tools should be compacted" + assert "t2" in by_id, "interrupted turn's tools must be preserved" + assert by_id["t2"]["content"] == HEAVY -class TestBuildToolCallLookup: - def test_builds_lookup(self): - msgs = [ - {"role": "assistant", "tool_calls": [ - {"id": "tc1", "name": "Read", "input": {"file_path": "/x"}}, - {"id": "tc2", "name": "Bash", "input": {"command": "ls"}}, + def test_multiple_interrupted_turns_all_preserved(self): + """Two consecutive interrupted turns: neither is compacted.""" + history = [ + {"role": "user", "content": "turn1"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "t1", "name": "Read", "input": {"file_path": "a.py"}} + ]}, + {"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}, + {"role": "user", "content": "turn2 after interrupt"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "t2", "name": "Read", "input": {"file_path": "b.py"}} ]}, + {"role": "tool", "tool_call_id": "t2", "name": "Read", "content": HEAVY}, + {"role": "user", "content": "turn3 after interrupt"}, ] - lookup = _build_tool_call_lookup(msgs) - assert lookup["tc1"] == ("Read", {"file_path": "/x"}) - assert lookup["tc2"] == ("Bash", {"command": "ls"}) + out = compact_tool_history(history, keep_last_n_turns=0) + tool_msgs = [m for m in out if m.get("role") == "tool"] + assert len(tool_msgs) == 2, "both interrupted turns' tools must be preserved" - def test_skips_non_assistant(self): - msgs = [{"role": "user", "content": "hi"}] - assert _build_tool_call_lookup(msgs) == {} + def test_build_messages_preserves_interrupted(self): + """build_messages_for_api preserves interrupted turn tool results.""" + history = [ + {"role": "user", "content": "first"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "t1", "name": "Read", "input": {"file_path": "a.py"}} + ]}, + {"role": "tool", "tool_call_id": "t1", "name": "Read", "content": HEAVY}, + {"role": "user", "content": "after ctrl+c"}, + ] + state = _FakeState(history) + result = build_messages_for_api(state, {}) + tool_msgs = [m for m in result if m.get("role") == "tool"] + assert len(tool_msgs) == 1 + assert tool_msgs[0]["content"] == HEAVY