From 32572e07984e486993623e24729a8551e2c055e5 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Fri, 17 Apr 2026 18:33:13 +0200 Subject: [PATCH 01/11] feat: add followup-based context compaction Ref #43 --- followup_compaction.py | 211 ++++++++++++++++++++++++++++++ tests/test_followup_compaction.py | 176 +++++++++++++++++++++++++ 2 files changed, 387 insertions(+) create mode 100644 followup_compaction.py create mode 100644 tests/test_followup_compaction.py diff --git a/followup_compaction.py b/followup_compaction.py new file mode 100644 index 0000000..56b545c --- /dev/null +++ b/followup_compaction.py @@ -0,0 +1,211 @@ +"""Follow-up compaction: stub past-turn tool_results before each API call. + +Non-destructive: produces a new message list, leaves `state.messages` intact +so persistence and resume keep the full history. +""" +from __future__ import annotations + +import html +import json +import re +import time +from typing import Iterable + +DEFAULT_EXEMPT_TOOLS = frozenset({"Edit", "Write", "TodoWrite"}) + + +def compact_tool_history( + messages: list, + keep_last_n_turns: int = 0, + exempt_tools: Iterable[str] = DEFAULT_EXEMPT_TOOLS, +) -> list: + """Return a NEW list where past-turn tool_result contents are replaced by stubs. + + A "turn" begins at a role='user' message. The current turn (from the last + user message onward) is always kept intact. + """ + exempt = frozenset(exempt_tools) + user_indices = [i for i, m in enumerate(messages) if m.get("role") == "user"] + if len(user_indices) <= keep_last_n_turns + 1: + return list(messages) + + cutoff = user_indices[-(keep_last_n_turns + 1)] + tool_call_lookup = _build_tool_call_lookup(messages) + + compacted = [] + for index, message in enumerate(messages): + if index >= cutoff: + compacted.append(message) + continue + role = message.get("role") + if role == "assistant" and message.get("tool_calls"): + stubbed = dict(message) + stubbed["content"] = compact_assistant_xml( + message["content"], message.get("tool_calls") + ) + compacted.append(stubbed) + continue + if role != "tool" or message.get("name") in exempt: + compacted.append(message) + continue + tool_call_id = message.get("tool_call_id", "") + name, inp = tool_call_lookup.get( + tool_call_id, (message.get("name", "tool"), {}) + ) + stubbed = dict(message) + stubbed["content"] = _build_stub(name, inp) + compacted.append(stubbed) + return compacted + + +def _build_tool_call_lookup(messages: list) -> dict: + lookup: dict = {} + for message in messages: + if message.get("role") != "assistant": + continue + for tool_call in message.get("tool_calls") or []: + lookup[tool_call.get("id", "")] = ( + tool_call.get("name", ""), + tool_call.get("input") or {}, + ) + return lookup + + +def _escape_xml_attr(value: str) -> str: + return html.escape(value, quote=False).replace('"', '"') + + +def _build_stub(name: str, input_dict: dict) -> str: + brief = _input_brief(name, input_dict) + return f'' + + +def _input_brief(name: str, inp: dict) -> str: + if name == "Read": + path = inp.get("file_path", "?") + parts = [f"file_path={path}"] + if "offset" in inp: + parts.append(f"offset={inp['offset']}") + if "limit" in inp: + parts.append(f"limit={inp['limit']}") + return ", ".join(parts) + if name == "Bash": + cmd = (inp.get("command") or "").replace("\n", " ") + if len(cmd) > 100: + cmd = cmd[:97] + "..." + return f"command={cmd!r}" + if name == "Grep": + parts = [f"pattern={inp.get('pattern', '?')!r}"] + if "path" in inp: + parts.append(f"path={inp['path']}") + return ", ".join(parts) + if name == "Glob": + return f"pattern={inp.get('pattern', '?')!r}" + try: + rendered = json.dumps(inp, ensure_ascii=False) + except (TypeError, ValueError): + rendered = str(inp) + if len(rendered) > 120: + rendered = rendered[:117] + "..." + return rendered + + +def _build_tc_lookup(tool_calls: list | None) -> dict: + lookup: dict = {} + for tc in tool_calls or []: + tid = tc.get("id", "") + if tid: + lookup[tid] = (tc.get("name", "tool"), tc.get("input") or {}) + return lookup + + +def _xml_replacer(tc_lookup: dict, target_ids: set | None = None): + def _replacer(match): + name, tid = match.group(1), match.group(2) + if target_ids is not None and tid not in target_ids: + return match.group(0) + tc_name, tc_input = tc_lookup.get(tid, (name, {})) + brief = _input_brief(tc_name, tc_input) + return f'' + return _replacer + + +_TOOL_USE_RE = re.compile( + r']*>.*?', + re.DOTALL, +) + + +def compact_assistant_xml(content: str, tool_calls: list | None = None) -> str: + """Replace ALL inline XML tool_use blocks with one-line summaries.""" + if not content or " str: + """Replace only XML blocks whose id is in target_ids, leaving others intact.""" + if not content or " list: + """Apply follow-up compaction + model-driven GC, then inject working memory notes.""" + if not config.get("followup_compaction_enabled", True): + compacted = list(state.messages) + else: + keep = config.get("followup_keep_last_n_turns", 0) + exempt = config.get("followup_exempt_tools", DEFAULT_EXEMPT_TOOLS) + compacted = compact_tool_history(state.messages, keep_last_n_turns=keep, exempt_tools=exempt) + + from compaction import estimate_tokens + tokens_before = estimate_tokens(state.messages) + tokens_after = estimate_tokens(compacted) + if tokens_before != tokens_after: + state.compaction_log.append({ + "event": "followup_compact", + "timestamp": time.time(), + "turn": getattr(state, "turn_count", 0), + "tokens_est_before": tokens_before, + "tokens_est_after": tokens_after, + "tokens_est_saved": tokens_before - tokens_after, + }) + + return _apply_context_gc(compacted, state) + + +def _apply_context_gc(messages: list, state) -> list: + """Apply model-driven GC decisions and inject working memory notes.""" + try: + from context_gc import apply_gc, inject_notes, prepend_verbatim_audit + except ImportError: + return messages + gc_state = getattr(state, 'gc_state', None) + if not gc_state: + return prepend_verbatim_audit(messages) + if not gc_state.trashed_ids and not gc_state.snippets and not gc_state.notes: + return prepend_verbatim_audit(messages) + + from compaction import estimate_tokens + tokens_before = estimate_tokens(messages) + result = apply_gc(messages, gc_state) + result = inject_notes(result, gc_state.notes) + tokens_after = estimate_tokens(result) + if tokens_before != tokens_after: + state.compaction_log.append({ + "event": "context_gc", + "timestamp": time.time(), + "turn": getattr(state, "turn_count", 0), + "trashed_count": len(gc_state.trashed_ids), + "snippet_count": len(gc_state.snippets), + "notes_count": len(gc_state.notes), + "tokens_est_saved": tokens_before - tokens_after, + }) + return prepend_verbatim_audit(result) diff --git a/tests/test_followup_compaction.py b/tests/test_followup_compaction.py new file mode 100644 index 0000000..c352e9b --- /dev/null +++ b/tests/test_followup_compaction.py @@ -0,0 +1,176 @@ +"""Tests for followup_compaction module.""" +import pytest + +from followup_compaction import ( + compact_tool_history, _build_tool_call_lookup, _build_stub, + _input_brief, _escape_xml_attr, + compact_assistant_xml, compact_assistant_xml_selective, + DEFAULT_EXEMPT_TOOLS, +) + + +class TestCompactToolHistory: + def _make_messages(self): + return [ + {"role": "user", "content": "turn 1"}, + {"role": "assistant", "content": "ok", "tool_calls": [ + {"id": "tc1", "name": "Read", "input": {"file_path": "/a.py"}}, + ]}, + {"role": "tool", "tool_call_id": "tc1", "name": "Read", "content": "file contents..."}, + {"role": "user", "content": "turn 2"}, + {"role": "assistant", "content": "done"}, + ] + + def test_stubs_old_tool_results(self): + msgs = self._make_messages() + result = compact_tool_history(msgs) + assert "") + + def test_quote(self): + assert """ in _escape_xml_attr('"hello"') + + +class TestCompactAssistantXml: + def test_replaces_tool_use_blocks(self): + content = 'text before /a.py text after' + tool_calls = [{"id": "r1", "name": "Read", "input": {"file_path": "/a.py"}}] + result = compact_assistant_xml(content, tool_calls) + assert "x' + 'y' + ) + tool_calls = [ + {"id": "r1", "name": "Read", "input": {"file_path": "/a.py"}}, + {"id": "r2", "name": "Grep", "input": {"pattern": "x"}}, + ] + result = compact_assistant_xml_selective(content, tool_calls, {"r1"}) + assert " Date: Sat, 18 Apr 2026 09:30:21 +0200 Subject: [PATCH 02/11] fix: add ImportError guard for context_gc import (standalone compat) --- followup_compaction.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/followup_compaction.py b/followup_compaction.py index 56b545c..bb174eb 100644 --- a/followup_compaction.py +++ b/followup_compaction.py @@ -184,7 +184,10 @@ def build_messages_for_api(state, config: dict) -> list: def _apply_context_gc(messages: list, state) -> list: """Apply model-driven GC decisions and inject working memory notes.""" try: - from context_gc import apply_gc, inject_notes, prepend_verbatim_audit + try: + from context_gc import apply_gc + except ImportError: + return messages # context_gc not available yet, skip, inject_notes, prepend_verbatim_audit except ImportError: return messages gc_state = getattr(state, 'gc_state', None) From 325b34d91613f35420ad5fc9e20a75b63eac0f21 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sun, 19 Apr 2026 14:39:48 +0200 Subject: [PATCH 03/11] refactor: drop XML-compaction dead code + fix _apply_context_gc imports Remove compact_assistant_xml, compact_assistant_xml_selective, _xml_replacer, _build_tc_lookup and _TOOL_USE_RE. These functions compact inline ... XML blocks inside assistant message content, which only exist on providers that don't natively support tool_use blocks (e.g. AWS Bedrock socle in bouzecode). Upstream cheetahclaws uses the native Anthropic content: [{"type":"tool_use", ...}] format, so these functions early-returned on every call and the compact_tool_history branch that invoked compact_assistant_xml was a no-op. Also fix _apply_context_gc which was wrapped in a double try/except where the outer pass was unreachable, and which imported only apply_gc while referencing inject_notes and prepend_verbatim_audit (NameError when gc_state had entries). Replaced with a single try that imports all three names and cleanly returns on ImportError if PR #55 isn't deployed alongside. Test file drops the TestCompactAssistantXml / TestCompactAssistantXmlSelective classes that exercised the removed functions. Co-Authored-By: Claude Opus 4.7 (1M context) --- followup_compaction.py | 66 ++++--------------------------- tests/test_followup_compaction.py | 38 ------------------ 2 files changed, 7 insertions(+), 97 deletions(-) diff --git a/followup_compaction.py b/followup_compaction.py index bb174eb..a5eb61d 100644 --- a/followup_compaction.py +++ b/followup_compaction.py @@ -7,7 +7,6 @@ import html import json -import re import time from typing import Iterable @@ -38,13 +37,6 @@ def compact_tool_history( compacted.append(message) continue role = message.get("role") - if role == "assistant" and message.get("tool_calls"): - stubbed = dict(message) - stubbed["content"] = compact_assistant_xml( - message["content"], message.get("tool_calls") - ) - compacted.append(stubbed) - continue if role != "tool" or message.get("name") in exempt: compacted.append(message) continue @@ -110,52 +102,6 @@ def _input_brief(name: str, inp: dict) -> str: return rendered -def _build_tc_lookup(tool_calls: list | None) -> dict: - lookup: dict = {} - for tc in tool_calls or []: - tid = tc.get("id", "") - if tid: - lookup[tid] = (tc.get("name", "tool"), tc.get("input") or {}) - return lookup - - -def _xml_replacer(tc_lookup: dict, target_ids: set | None = None): - def _replacer(match): - name, tid = match.group(1), match.group(2) - if target_ids is not None and tid not in target_ids: - return match.group(0) - tc_name, tc_input = tc_lookup.get(tid, (name, {})) - brief = _input_brief(tc_name, tc_input) - return f'' - return _replacer - - -_TOOL_USE_RE = re.compile( - r']*>.*?', - re.DOTALL, -) - - -def compact_assistant_xml(content: str, tool_calls: list | None = None) -> str: - """Replace ALL inline XML tool_use blocks with one-line summaries.""" - if not content or " str: - """Replace only XML blocks whose id is in target_ids, leaving others intact.""" - if not content or " list: """Apply follow-up compaction + model-driven GC, then inject working memory notes.""" if not config.get("followup_compaction_enabled", True): @@ -182,12 +128,14 @@ def build_messages_for_api(state, config: dict) -> list: def _apply_context_gc(messages: list, state) -> list: - """Apply model-driven GC decisions and inject working memory notes.""" + """Apply model-driven GC decisions and inject working memory notes. + + Falls back to returning messages unchanged when the context_gc module is + absent (this PR can ship independently of PR #55). The import is narrow: + only ImportError is swallowed; any other error propagates. + """ try: - try: - from context_gc import apply_gc - except ImportError: - return messages # context_gc not available yet, skip, inject_notes, prepend_verbatim_audit + from context_gc import apply_gc, inject_notes, prepend_verbatim_audit except ImportError: return messages gc_state = getattr(state, 'gc_state', None) diff --git a/tests/test_followup_compaction.py b/tests/test_followup_compaction.py index c352e9b..024a803 100644 --- a/tests/test_followup_compaction.py +++ b/tests/test_followup_compaction.py @@ -4,7 +4,6 @@ from followup_compaction import ( compact_tool_history, _build_tool_call_lookup, _build_stub, _input_brief, _escape_xml_attr, - compact_assistant_xml, compact_assistant_xml_selective, DEFAULT_EXEMPT_TOOLS, ) @@ -122,43 +121,6 @@ def test_quote(self): assert """ in _escape_xml_attr('"hello"') -class TestCompactAssistantXml: - def test_replaces_tool_use_blocks(self): - content = 'text before /a.py text after' - tool_calls = [{"id": "r1", "name": "Read", "input": {"file_path": "/a.py"}}] - result = compact_assistant_xml(content, tool_calls) - assert "x' - 'y' - ) - tool_calls = [ - {"id": "r1", "name": "Read", "input": {"file_path": "/a.py"}}, - {"id": "r2", "name": "Grep", "input": {"pattern": "x"}}, - ] - result = compact_assistant_xml_selective(content, tool_calls, {"r1"}) - assert " Date: Fri, 17 Apr 2026 18:33:14 +0200 Subject: [PATCH 04/11] feat: add ContextGC tool for model-driven context management Ref #43 --- context_gc.py | 234 +++++++++++++++++++++++++++++++++++++++ tests/test_context_gc.py | 208 ++++++++++++++++++++++++++++++++++ tools/__init__.py | 29 +++++ 3 files changed, 471 insertions(+) create mode 100644 context_gc.py create mode 100644 tests/test_context_gc.py diff --git a/context_gc.py b/context_gc.py new file mode 100644 index 0000000..87abe93 --- /dev/null +++ b/context_gc.py @@ -0,0 +1,234 @@ +"""Model-driven context garbage collection for conversation history. + +Lets the LLM trash consumed tool results, keep relevant snippets, +and persist notes across turns to manage its context window. +""" +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class GCState: + trashed_ids: set = field(default_factory=set) + snippets: dict = field(default_factory=dict) + notes: dict = field(default_factory=dict) + compact_xml: bool = False + + +def process_gc_call(params: dict, config: dict) -> str: + gc_state: GCState = config.get("_gc_state") + if gc_state is None: + return "Error: no GC state available" + + trashed = params.get("trash") or [] + snippets = params.get("keep_snippets") or [] + notes = params.get("notes") or [] + trash_notes = params.get("trash_notes") or [] + + for tid in trashed: + gc_state.trashed_ids.add(tid) + gc_state.snippets.pop(tid, None) + + for snippet in snippets: + sid = snippet.get("id") + if sid and sid not in gc_state.trashed_ids: + gc_state.snippets[sid] = snippet + + for note in notes: + name = note.get("name") + content = note.get("content", "") + if name: + gc_state.notes[name] = content + + for name in trash_notes: + gc_state.notes.pop(name, None) + + if params.get("compact_xml"): + gc_state.compact_xml = True + + parts = [] + if trashed: + parts.append(f"trashed {len(trashed)} results") + if snippets: + parts.append(f"kept snippets for {len(snippets)} results") + if notes: + parts.append(f"{len(notes)} notes saved") + if trash_notes: + parts.append(f"{len(trash_notes)} notes removed") + if params.get("compact_xml"): + parts.append("XML compaction enabled") + parts.append(f"{len(gc_state.notes)} active notes, {len(gc_state.trashed_ids)} total trashed") + return "GC applied: " + ", ".join(parts) + + +def apply_gc(messages: list, gc_state: GCState) -> list: + if not gc_state.trashed_ids and not gc_state.snippets and not gc_state.compact_xml: + return messages + + _compact_all = None + _compact_selective = None + last_asst_idx = -1 + + if gc_state.compact_xml: + try: + from followup_compaction import compact_assistant_xml + _compact_all = compact_assistant_xml + except ImportError: + pass + for i in range(len(messages) - 1, -1, -1): + if messages[i].get("role") == "assistant": + last_asst_idx = i + break + + if gc_state.trashed_ids: + try: + from followup_compaction import compact_assistant_xml_selective + _compact_selective = compact_assistant_xml_selective + except ImportError: + pass + + result = [] + for idx, msg in enumerate(messages): + role = msg.get("role") + if role == "assistant" and msg.get("tool_calls"): + if _compact_all and idx != last_asst_idx: + stubbed = dict(msg) + stubbed["content"] = _compact_all(msg["content"], msg["tool_calls"]) + result.append(stubbed) + continue + if _compact_selective: + tc_ids = {tc.get("id") for tc in msg["tool_calls"]} + targeted = tc_ids & gc_state.trashed_ids + if targeted: + stubbed = dict(msg) + stubbed["content"] = _compact_selective( + msg["content"], msg["tool_calls"], targeted, + ) + result.append(stubbed) + continue + result.append(msg) + continue + if role != "tool": + result.append(msg) + continue + tc_id = msg.get("tool_call_id", "") + if tc_id in gc_state.trashed_ids: + stubbed = dict(msg) + name = msg.get("name", "tool") + stubbed["content"] = f"[{name} result -- trashed by model]" + result.append(stubbed) + elif tc_id in gc_state.snippets: + transformed = dict(msg) + transformed["content"] = _apply_snippet(msg["content"], gc_state.snippets[tc_id]) + result.append(transformed) + else: + result.append(msg) + return result + + +def _apply_snippet(content: str, snippet: dict) -> str: + if not content: + return content + lines = content.split("\n") + + if "keep_after" in snippet: + anchor = snippet["keep_after"] + idx = _find_anchor_line(lines, anchor) + if idx is None: + return content + f"\n[GC warning: anchor {anchor!r} not found, kept full result]" + kept = lines[idx:] + trimmed = len(lines) - len(kept) + return f"[{trimmed} lines trimmed, kept after {anchor!r}]\n" + "\n".join(kept) + + if "keep_before" in snippet: + anchor = snippet["keep_before"] + idx = _find_anchor_line(lines, anchor) + if idx is None: + return content + f"\n[GC warning: anchor {anchor!r} not found, kept full result]" + kept = lines[:idx] + trimmed = len(lines) - len(kept) + return "\n".join(kept) + f"\n[{trimmed} lines trimmed at {anchor!r}]" + + if "keep_between" in snippet: + anchors = snippet["keep_between"] + if len(anchors) != 2: + return content + "\n[GC warning: keep_between needs exactly 2 anchors]" + start_anchor, end_anchor = anchors + start_idx = _find_anchor_line(lines, start_anchor) + if start_idx is None: + return content + f"\n[GC warning: start anchor {start_anchor!r} not found]" + end_idx = _find_anchor_line(lines, end_anchor, start_from=start_idx) + if end_idx is None: + return content + f"\n[GC warning: end anchor {end_anchor!r} not found]" + kept = lines[start_idx:end_idx + 1] + before = start_idx + after = len(lines) - end_idx - 1 + header = f"[{before} lines trimmed before {start_anchor!r}]" + footer = f"[{after} lines trimmed after {end_anchor!r}]" + return header + "\n" + "\n".join(kept) + "\n" + footer + + return content + + +def _find_anchor_line(lines: list, text: str, start_from: int = 0) -> int | None: + for i in range(start_from, len(lines)): + if text in lines[i]: + return i + return None + + +def inject_notes(messages: list, notes: dict) -> list: + if not notes: + return messages + parts = [] + for name, content in notes.items(): + parts.append(f"## {name}\n{content}") + notes_block = "[Your working memory notes]\n" + "\n\n".join(parts) + "\n[/Notes]" + result = list(messages) + for i in range(len(result) - 1, -1, -1): + if result[i].get("role") == "user": + result[i] = dict(result[i]) + result[i]["content"] = notes_block + "\n\n" + result[i]["content"] + break + return result + + +def build_verbatim_audit_note(messages: list) -> str: + from compaction import estimate_tokens + lines = [] + for message in messages: + if message.get("role") != "tool": + continue + content = message.get("content", "") + if isinstance(content, list): + content = "".join( + block.get("text", "") if isinstance(block, dict) else str(block) + for block in content + ) + if " list: + note = build_verbatim_audit_note(messages) + if not note: + return messages + result = list(messages) + for i in range(len(result) - 1, -1, -1): + if result[i].get("role") == "user": + result[i] = dict(result[i]) + result[i]["content"] = note + "\n\n" + result[i]["content"] + break + return result diff --git a/tests/test_context_gc.py b/tests/test_context_gc.py new file mode 100644 index 0000000..068abe3 --- /dev/null +++ b/tests/test_context_gc.py @@ -0,0 +1,208 @@ +"""Tests for context_gc module.""" +import pytest + +from context_gc import ( + GCState, process_gc_call, apply_gc, _apply_snippet, + _find_anchor_line, inject_notes, build_verbatim_audit_note, + prepend_verbatim_audit, +) + + +class TestGCState: + def test_defaults(self): + gs = GCState() + assert gs.trashed_ids == set() + assert gs.snippets == {} + assert gs.notes == {} + assert gs.compact_xml is False + + +class TestProcessGCCall: + def _make_config(self): + return {"_gc_state": GCState()} + + def test_no_gc_state(self): + result = process_gc_call({}, {}) + assert "Error" in result + + def test_trash(self): + cfg = self._make_config() + result = process_gc_call({"trash": ["r1", "r2"]}, cfg) + assert "trashed 2 results" in result + assert "r1" in cfg["_gc_state"].trashed_ids + assert "r2" in cfg["_gc_state"].trashed_ids + + def test_notes(self): + cfg = self._make_config() + result = process_gc_call( + {"notes": [{"name": "key", "content": "value"}]}, cfg + ) + assert "1 notes saved" in result + assert cfg["_gc_state"].notes["key"] == "value" + + def test_trash_notes(self): + cfg = self._make_config() + cfg["_gc_state"].notes["old"] = "data" + result = process_gc_call({"trash_notes": ["old"]}, cfg) + assert "1 notes removed" in result + assert "old" not in cfg["_gc_state"].notes + + def test_keep_snippets(self): + cfg = self._make_config() + result = process_gc_call( + {"keep_snippets": [{"id": "r1", "keep_after": "def main"}]}, cfg + ) + assert "kept snippets for 1 results" in result + assert "r1" in cfg["_gc_state"].snippets + + def test_snippet_ignored_if_trashed(self): + cfg = self._make_config() + cfg["_gc_state"].trashed_ids.add("r1") + process_gc_call( + {"keep_snippets": [{"id": "r1", "keep_after": "x"}]}, cfg + ) + assert "r1" not in cfg["_gc_state"].snippets + + def test_compact_xml(self): + cfg = self._make_config() + result = process_gc_call({"compact_xml": True}, cfg) + assert "XML compaction enabled" in result + assert cfg["_gc_state"].compact_xml is True + + +class TestApplyGC: + def test_no_changes(self): + gs = GCState() + msgs = [{"role": "user", "content": "hi"}] + assert apply_gc(msgs, gs) is msgs + + def test_trash_tool_result(self): + gs = GCState() + gs.trashed_ids.add("tc1") + msgs = [ + {"role": "tool", "tool_call_id": "tc1", "name": "Read", "content": "big data..."}, + {"role": "tool", "tool_call_id": "tc2", "name": "Grep", "content": "kept"}, + ] + result = apply_gc(msgs, gs) + assert "trashed by model" in result[0]["content"] + assert result[1]["content"] == "kept" + + def test_snippet_applied(self): + gs = GCState() + gs.snippets["tc1"] = {"id": "tc1", "keep_after": "def main"} + content = "import os\n\ndef main():\n pass\n" + msgs = [{"role": "tool", "tool_call_id": "tc1", "name": "Read", "content": content}] + result = apply_gc(msgs, gs) + assert "def main" in result[0]["content"] + assert "import os" not in result[0]["content"] + + def test_non_tool_messages_pass_through(self): + gs = GCState() + gs.trashed_ids.add("x") + msgs = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "world"}, + ] + result = apply_gc(msgs, gs) + assert len(result) == 2 + assert result[0]["content"] == "hello" + + +class TestApplySnippet: + def test_keep_after(self): + content = "line1\nline2\ndef main():\n pass" + result = _apply_snippet(content, {"keep_after": "def main"}) + assert "def main" in result + assert "line1" not in result + assert "2 lines trimmed" in result + + def test_keep_before(self): + content = "line1\nline2\nclass Foo:\n pass" + result = _apply_snippet(content, {"keep_before": "class Foo"}) + assert "line1" in result + assert "class Foo:\n pass" not in result + assert "2 lines trimmed" in result + + def test_keep_between(self): + content = "a\nb\nSTART\nc\nd\nEND\ne\nf" + result = _apply_snippet(content, {"keep_between": ["START", "END"]}) + assert "START" in result + assert "END" in result + assert "a\n" not in result + + def test_anchor_not_found(self): + content = "some text" + result = _apply_snippet(content, {"keep_after": "MISSING"}) + assert "GC warning" in result + assert "some text" in result + + def test_empty_content(self): + assert _apply_snippet("", {"keep_after": "x"}) == "" + + def test_keep_between_bad_anchors(self): + result = _apply_snippet("text", {"keep_between": ["a"]}) + assert "needs exactly 2 anchors" in result + + +class TestFindAnchorLine: + def test_found(self): + assert _find_anchor_line(["a", "b", "c"], "b") == 1 + + def test_not_found(self): + assert _find_anchor_line(["a", "b"], "z") is None + + def test_start_from(self): + assert _find_anchor_line(["a", "b", "a"], "a", start_from=1) == 2 + + +class TestInjectNotes: + def test_empty_notes(self): + msgs = [{"role": "user", "content": "hi"}] + assert inject_notes(msgs, {}) is msgs + + def test_inject(self): + msgs = [{"role": "user", "content": "hello"}] + result = inject_notes(msgs, {"key": "value"}) + assert "[Your working memory notes]" in result[0]["content"] + assert "## key\nvalue" in result[0]["content"] + assert "hello" in result[0]["content"] + + def test_injects_in_last_user_msg(self): + msgs = [ + {"role": "user", "content": "first"}, + {"role": "assistant", "content": "reply"}, + {"role": "user", "content": "second"}, + ] + result = inject_notes(msgs, {"n": "v"}) + assert "[Your working memory notes]" in result[2]["content"] + assert result[0]["content"] == "first" + + +class TestVerbatimAudit: + def test_empty(self): + assert build_verbatim_audit_note([]) == "" + + def test_skips_trashed(self): + msgs = [{"role": "tool", "tool_call_id": "t1", "name": "Read", + "content": "[Read result -- trashed by model]"}] + assert build_verbatim_audit_note(msgs) == "" + + def test_skips_elided(self): + msgs = [{"role": "tool", "tool_call_id": "t1", "name": "Read", + "content": ''}] + assert build_verbatim_audit_note(msgs) == "" + + def test_includes_verbatim(self): + msgs = [{"role": "tool", "tool_call_id": "r1", "name": "Read", + "content": "file content here"}] + result = build_verbatim_audit_note(msgs) + assert "r1 (Read)" in result + assert "tk" in result + + def test_prepend(self): + msgs = [ + {"role": "user", "content": "hi"}, + {"role": "tool", "tool_call_id": "r1", "name": "Read", "content": "data"}, + ] + result = prepend_verbatim_audit(msgs) + assert "[Verbatim" in result[0]["content"] diff --git a/tools/__init__.py b/tools/__init__.py index 8731a8c..851e702 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -333,6 +333,25 @@ "required": ["seconds"], }, }, + { + "name": "ContextGC", + "description": ( + "Garbage-collect your context to free space. Trash tool results you no longer " + "need, keep only relevant snippets from large results, and save key information " + "in notes that persist across turns." + ), + "input_schema": { + "type": "object", + "properties": { + "trash": {"type": "array", "items": {"type": "string"}, "description": "tool_call_ids to fully discard"}, + "keep_snippets": {"type": "array", "description": "Partial keeps with text anchors"}, + "notes": {"type": "array", "description": "Named scratchpad entries: {name, content}"}, + "trash_notes": {"type": "array", "items": {"type": "string"}, "description": "Note names to discard"}, + "compact_xml": {"type": "boolean", "description": "Strip verbose XML from old assistant outputs"}, + }, + "required": [], + }, + }, ] @@ -479,6 +498,16 @@ def _register_builtins() -> None: read_only=False, concurrent_safe=True, ), ] + + # ContextGC tool + from context_gc import process_gc_call + _tool_defs.append(ToolDef( + name="ContextGC", + schema=_schemas["ContextGC"], + func=lambda p, c: process_gc_call(p, c), + read_only=False, concurrent_safe=False, + )) + for td in _tool_defs: register_tool(td) From 44aa5c2292b40710dbc9d96a11376232a769ef4d Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sat, 18 Apr 2026 09:30:35 +0200 Subject: [PATCH 05/11] fix: add ImportError guard for followup_compaction import (standalone compat) --- context_gc.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/context_gc.py b/context_gc.py index 87abe93..fc14d4c 100644 --- a/context_gc.py +++ b/context_gc.py @@ -72,7 +72,10 @@ def apply_gc(messages: list, gc_state: GCState) -> list: if gc_state.compact_xml: try: - from followup_compaction import compact_assistant_xml + try: + from followup_compaction import compact_assistant_xml + except ImportError: + compact_assistant_xml = None # followup_compaction not available yet _compact_all = compact_assistant_xml except ImportError: pass @@ -83,7 +86,10 @@ def apply_gc(messages: list, gc_state: GCState) -> list: if gc_state.trashed_ids: try: - from followup_compaction import compact_assistant_xml_selective + try: + from followup_compaction import compact_assistant_xml + except ImportError: + compact_assistant_xml = None # followup_compaction not available yet _compact_selective = compact_assistant_xml_selective except ImportError: pass From 93ddba00180e3c91ef4c22b858cc06efa4665133 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sun, 19 Apr 2026 14:36:41 +0200 Subject: [PATCH 06/11] feat: persist gc_state across session save/load Add gc_state (trashed_ids, snippets, notes) as a real field on AgentState, serialize it in _build_session_data, and rehydrate it via a new helper _restore_state_from_data that is shared by cmd_load / cmd_resume / cmd_cloudsave load. Without this, any /save followed by /load silently drops trashed_ids: the tool_results previously elided by ContextGC re-materialize in the next turn's context window, leaking tens of thousands of tokens on long sessions. Tests cover save/load roundtrip and independence of gc_state instances across AgentState instances. Co-Authored-By: Claude Opus 4.7 (1M context) --- agent.py | 4 ++ commands/session.py | 51 ++++++++++++++----- tests/test_gc_state_persistence.py | 80 ++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 12 deletions(-) create mode 100644 tests/test_gc_state_persistence.py diff --git a/agent.py b/agent.py index dba7fe2..6d08505 100644 --- a/agent.py +++ b/agent.py @@ -12,6 +12,7 @@ import tools as _tools_init # ensure built-in tools are registered on import from providers import stream, AssistantTurn, TextChunk, ThinkingChunk, detect_provider from compaction import maybe_compact, estimate_tokens, get_context_limit, compact_messages +from context_gc import GCState import logging_utils as _log import quota as _quota from circuit_breaker import CircuitOpenError as _CircuitOpenError @@ -32,6 +33,9 @@ class AgentState: total_input_tokens: int = 0 total_output_tokens: int = 0 turn_count: int = 0 + # Persisted so trashed_ids, snippets and notes survive /save and /load. + # Without this, restoring a session leaks back tool_results the model had trashed. + gc_state: GCState = field(default_factory=GCState) @dataclass diff --git a/commands/session.py b/commands/session.py index 78b17c5..38c353e 100644 --- a/commands/session.py +++ b/commands/session.py @@ -55,9 +55,45 @@ def _build_session_data(state, session_id: str | None = None) -> dict: "turn_count": state.turn_count, "total_input_tokens": state.total_input_tokens, "total_output_tokens": state.total_output_tokens, + "gc_state": _serialize_gc_state(getattr(state, "gc_state", None)), } +def _serialize_gc_state(gc_state) -> dict: + """JSON-safe view of ContextGC state (trashed_ids, snippets, notes). + + Must be stable across saves so trashed_ids surviving a /load cannot leak + back into the model's context window (the "gc_state leak" class). + """ + if gc_state is None: + return {"trashed_ids": [], "snippets": {}, "notes": {}} + return { + "trashed_ids": sorted(gc_state.trashed_ids), + "snippets": dict(gc_state.snippets), + "notes": dict(gc_state.notes), + } + + +def _restore_state_from_data(state, data: dict) -> None: + """Apply a loaded session dict onto a state in-place. + + Single point of truth for /load, /resume and /cloudsave load. Covers the + full AgentState surface including gc_state — forgetting any of these is + how the session-save/restore roundtrip drifts from in-memory state. + """ + from context_gc import GCState + state.messages = data.get("messages", []) + state.turn_count = data.get("turn_count", 0) + state.total_input_tokens = data.get("total_input_tokens", 0) + state.total_output_tokens = data.get("total_output_tokens", 0) + gc = data.get("gc_state") or {} + state.gc_state = GCState( + trashed_ids=set(gc.get("trashed_ids") or []), + snippets=dict(gc.get("snippets") or {}), + notes=dict(gc.get("notes") or {}), + ) + + # ── /save ────────────────────────────────────────────────────────────────── def cmd_save(args: str, state, config) -> bool: @@ -312,10 +348,7 @@ def cmd_load(args: str, state, config) -> bool: except Exception as e: err(f"Cannot read session file: {e}") return True - state.messages = data.get("messages", []) - state.turn_count = data.get("turn_count", 0) - state.total_input_tokens = data.get("total_input_tokens", 0) - state.total_output_tokens = data.get("total_output_tokens", 0) + _restore_state_from_data(state, data) ok(f"Session loaded from {path} ({len(state.messages)} messages)") return True @@ -353,10 +386,7 @@ def cmd_resume(args: str, state, config) -> bool: except Exception as e: err(f"Cannot read session file: {e}") return True - state.messages = data.get("messages", []) - state.turn_count = data.get("turn_count", 0) - state.total_input_tokens = data.get("total_input_tokens", 0) - state.total_output_tokens = data.get("total_output_tokens", 0) + _restore_state_from_data(state, data) ok(f"Session loaded from {path} ({len(state.messages)} messages)") return True @@ -522,10 +552,7 @@ def cmd_cloudsave(args: str, state, config) -> bool: if err_msg: err(err_msg) return True - state.messages = data.get("messages", []) - state.turn_count = data.get("turn_count", 0) - state.total_input_tokens = data.get("total_input_tokens", 0) - state.total_output_tokens = data.get("total_output_tokens", 0) + _restore_state_from_data(state, data) ok(f"Session loaded from Gist ({len(state.messages)} messages).") return True diff --git a/tests/test_gc_state_persistence.py b/tests/test_gc_state_persistence.py new file mode 100644 index 0000000..958bf09 --- /dev/null +++ b/tests/test_gc_state_persistence.py @@ -0,0 +1,80 @@ +"""gc_state must be a real field on AgentState and survive save/reload. + +Guard against the leak class where ContextGC-trashed tool_call_ids silently +re-materialize after /save + /load because they were only held in a per-turn +config dict, not on AgentState itself. +""" +from __future__ import annotations + +import json + +from agent import AgentState +from context_gc import GCState +from commands.session import _build_session_data, _restore_state_from_data + + +def test_agent_state_has_gc_state_by_default(): + state = AgentState() + assert isinstance(state.gc_state, GCState) + assert state.gc_state.trashed_ids == set() + assert state.gc_state.notes == {} + + +def test_two_agent_states_have_independent_gc_state(): + a = AgentState() + b = AgentState() + a.gc_state.trashed_ids.add("toolcall_1") + assert "toolcall_1" not in b.gc_state.trashed_ids + + +def test_session_save_includes_gc_state_as_sortable_json(): + state = AgentState() + state.gc_state.trashed_ids = {"id_b", "id_a", "id_c"} + state.gc_state.notes = {"task": "do the thing"} + + data = _build_session_data(state) + serialized = json.dumps(data) + assert "gc_state" in data + assert data["gc_state"]["trashed_ids"] == ["id_a", "id_b", "id_c"] + assert data["gc_state"]["notes"] == {"task": "do the thing"} + assert '"trashed_ids":' in serialized + + +def test_session_load_restores_gc_state(): + fresh = AgentState() + _restore_state_from_data(fresh, { + "messages": [], + "gc_state": { + "trashed_ids": ["a", "b"], + "notes": {"k": "v"}, + "snippets": {}, + }, + }) + assert fresh.gc_state.trashed_ids == {"a", "b"} + assert fresh.gc_state.notes == {"k": "v"} + + +def test_session_load_missing_gc_state_returns_fresh_empty(): + fresh = AgentState() + _restore_state_from_data(fresh, {"messages": []}) + assert fresh.gc_state.trashed_ids == set() + assert fresh.gc_state.notes == {} + + +def test_save_then_load_roundtrip_preserves_trashed_ids(): + """End-to-end: trash ids, serialize, rehydrate — ids must still be trashed. + + This is the exact leak the bug class introduces: if roundtrip drops + trashed_ids, previously-elided tool_results come back into context and + inflate the prompt by whatever they were trimmed from. + """ + before = AgentState() + before.gc_state.trashed_ids = {"tool_a", "tool_b"} + before.gc_state.snippets = {"tool_c": {"keep_after": "### Result"}} + + data = json.loads(json.dumps(_build_session_data(before), default=str)) + after = AgentState() + _restore_state_from_data(after, data) + + assert after.gc_state.trashed_ids == {"tool_a", "tool_b"} + assert after.gc_state.snippets == {"tool_c": {"keep_after": "### Result"}} From 694d887e029be41421db6755591326533b58ced0 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sun, 19 Apr 2026 14:41:54 +0200 Subject: [PATCH 07/11] refactor: drop compact_xml + split apply_gc into three focused helpers Remove the compact_xml field from GCState, the compact_xml parameter from the ContextGC tool schema, and the XML-compaction branch of apply_gc. The branch used to dynamically import compact_assistant_xml / compact_assistant_xml_selective from followup_compaction, but those only match ... strings inside assistant text content -- a shape that only appears on providers without native tool_use support (e.g. AWS Bedrock socle in bouzecode). Upstream cheetahclaws emits native Anthropic content blocks, so the XML branch was an unreachable no-op. The branch also had a latent NameError (compact_assistant_xml_selective was imported under the wrong name), which is why 2 existing tests were red against this branch. apply_gc is now a 3-line list comprehension dispatching to _apply_gc_to_message, which in turn delegates to _stub_trashed_tool_result and _apply_snippet_to_message. Each helper fits on one screen, names its intent, and no longer hides behavior behind a dead early-return chain. Co-Authored-By: Claude Opus 4.7 (1M context) --- context_gc.py | 107 +++++++++++++-------------------------- tests/test_context_gc.py | 7 --- tools/__init__.py | 1 - 3 files changed, 34 insertions(+), 81 deletions(-) diff --git a/context_gc.py b/context_gc.py index fc14d4c..6475cdd 100644 --- a/context_gc.py +++ b/context_gc.py @@ -13,7 +13,6 @@ class GCState: trashed_ids: set = field(default_factory=set) snippets: dict = field(default_factory=dict) notes: dict = field(default_factory=dict) - compact_xml: bool = False def process_gc_call(params: dict, config: dict) -> str: @@ -44,9 +43,6 @@ def process_gc_call(params: dict, config: dict) -> str: for name in trash_notes: gc_state.notes.pop(name, None) - if params.get("compact_xml"): - gc_state.compact_xml = True - parts = [] if trashed: parts.append(f"trashed {len(trashed)} results") @@ -56,81 +52,46 @@ def process_gc_call(params: dict, config: dict) -> str: parts.append(f"{len(notes)} notes saved") if trash_notes: parts.append(f"{len(trash_notes)} notes removed") - if params.get("compact_xml"): - parts.append("XML compaction enabled") parts.append(f"{len(gc_state.notes)} active notes, {len(gc_state.trashed_ids)} total trashed") return "GC applied: " + ", ".join(parts) def apply_gc(messages: list, gc_state: GCState) -> list: - if not gc_state.trashed_ids and not gc_state.snippets and not gc_state.compact_xml: - return messages + """Return a new message list with trashed tool_results stubbed and kept snippets trimmed. - _compact_all = None - _compact_selective = None - last_asst_idx = -1 - - if gc_state.compact_xml: - try: - try: - from followup_compaction import compact_assistant_xml - except ImportError: - compact_assistant_xml = None # followup_compaction not available yet - _compact_all = compact_assistant_xml - except ImportError: - pass - for i in range(len(messages) - 1, -1, -1): - if messages[i].get("role") == "assistant": - last_asst_idx = i - break - - if gc_state.trashed_ids: - try: - try: - from followup_compaction import compact_assistant_xml - except ImportError: - compact_assistant_xml = None # followup_compaction not available yet - _compact_selective = compact_assistant_xml_selective - except ImportError: - pass - - result = [] - for idx, msg in enumerate(messages): - role = msg.get("role") - if role == "assistant" and msg.get("tool_calls"): - if _compact_all and idx != last_asst_idx: - stubbed = dict(msg) - stubbed["content"] = _compact_all(msg["content"], msg["tool_calls"]) - result.append(stubbed) - continue - if _compact_selective: - tc_ids = {tc.get("id") for tc in msg["tool_calls"]} - targeted = tc_ids & gc_state.trashed_ids - if targeted: - stubbed = dict(msg) - stubbed["content"] = _compact_selective( - msg["content"], msg["tool_calls"], targeted, - ) - result.append(stubbed) - continue - result.append(msg) - continue - if role != "tool": - result.append(msg) - continue - tc_id = msg.get("tool_call_id", "") - if tc_id in gc_state.trashed_ids: - stubbed = dict(msg) - name = msg.get("name", "tool") - stubbed["content"] = f"[{name} result -- trashed by model]" - result.append(stubbed) - elif tc_id in gc_state.snippets: - transformed = dict(msg) - transformed["content"] = _apply_snippet(msg["content"], gc_state.snippets[tc_id]) - result.append(transformed) - else: - result.append(msg) - return result + Non-destructive: original messages are preserved in state.messages so /save + /load + can restore the full conversation. Only the message list sent to the API is reshaped. + """ + if not gc_state.trashed_ids and not gc_state.snippets: + return messages + return [_apply_gc_to_message(msg, gc_state) for msg in messages] + + +def _apply_gc_to_message(msg: dict, gc_state: GCState) -> dict: + """Apply GC rules to a single message; returns original msg untouched if no rule matches.""" + if msg.get("role") != "tool": + return msg + tool_call_id = msg.get("tool_call_id", "") + if tool_call_id in gc_state.trashed_ids: + return _stub_trashed_tool_result(msg) + if tool_call_id in gc_state.snippets: + return _apply_snippet_to_message(msg, gc_state.snippets[tool_call_id]) + return msg + + +def _stub_trashed_tool_result(msg: dict) -> dict: + """Replace a tool_result's content with a short stub the model can recognise.""" + stubbed = dict(msg) + name = msg.get("name", "tool") + stubbed["content"] = f"[{name} result -- trashed by model]" + return stubbed + + +def _apply_snippet_to_message(msg: dict, snippet: dict) -> dict: + """Apply a keep_{after,before,between} snippet rule to a tool_result message.""" + transformed = dict(msg) + transformed["content"] = _apply_snippet(msg.get("content", ""), snippet) + return transformed def _apply_snippet(content: str, snippet: dict) -> str: diff --git a/tests/test_context_gc.py b/tests/test_context_gc.py index 068abe3..60d05f2 100644 --- a/tests/test_context_gc.py +++ b/tests/test_context_gc.py @@ -14,7 +14,6 @@ def test_defaults(self): assert gs.trashed_ids == set() assert gs.snippets == {} assert gs.notes == {} - assert gs.compact_xml is False class TestProcessGCCall: @@ -63,12 +62,6 @@ def test_snippet_ignored_if_trashed(self): ) assert "r1" not in cfg["_gc_state"].snippets - def test_compact_xml(self): - cfg = self._make_config() - result = process_gc_call({"compact_xml": True}, cfg) - assert "XML compaction enabled" in result - assert cfg["_gc_state"].compact_xml is True - class TestApplyGC: def test_no_changes(self): diff --git a/tools/__init__.py b/tools/__init__.py index 851e702..50508f1 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -347,7 +347,6 @@ "keep_snippets": {"type": "array", "description": "Partial keeps with text anchors"}, "notes": {"type": "array", "description": "Named scratchpad entries: {name, content}"}, "trash_notes": {"type": "array", "items": {"type": "string"}, "description": "Note names to discard"}, - "compact_xml": {"type": "boolean", "description": "Strip verbose XML from old assistant outputs"}, }, "required": [], }, From 4974da1300c05735a78a3c7bb394ffdafbcad318 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sun, 19 Apr 2026 14:43:40 +0200 Subject: [PATCH 08/11] feat: disabled_tools config gate for backwards-compat Introduce a generic config['disabled_tools'] list honoured by the tool registry in two places: - get_tool_schemas(disabled=...) filters disabled names out of the schema list sent to the LLM; the model never learns the tool exists. - execute_tool(...) defense-in-depth: any tool_call whose name is disabled returns an explicit error tool_result instead of running. agent.py passes config['disabled_tools'] to get_tool_schemas per turn. Callers that set disabled_tools=['ContextGC'] now get pre-#55 behaviour with the rest of this PR in place -- which is what makes the ContextGC tool truly opt-out rather than an implicit behaviour change for every existing integration. Co-Authored-By: Claude Opus 4.7 (1M context) --- agent.py | 2 +- tests/test_tool_registry.py | 13 +++++++++++++ tool_registry.py | 18 ++++++++++++++---- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/agent.py b/agent.py index 6d08505..0e00965 100644 --- a/agent.py +++ b/agent.py @@ -124,7 +124,7 @@ def run( model=config["model"], system=system_prompt, messages=state.messages, - tool_schemas=get_tool_schemas(), + tool_schemas=get_tool_schemas(disabled=config.get("disabled_tools") or ()), config=config, ): if isinstance(event, (TextChunk, ThinkingChunk)): diff --git a/tests/test_tool_registry.py b/tests/test_tool_registry.py index 2a7a8d4..e798e9e 100644 --- a/tests/test_tool_registry.py +++ b/tests/test_tool_registry.py @@ -89,6 +89,19 @@ def test_get_tool_schemas(): assert schemas[0]["name"] == "echo" +def test_get_tool_schemas_honours_disabled_list(): + register_tool(_make_echo_tool("kept")) + register_tool(_make_echo_tool("hidden")) + names = [s["name"] for s in get_tool_schemas(disabled=["hidden"])] + assert names == ["kept"] + + +def test_execute_tool_refuses_disabled_tool(): + register_tool(_make_echo_tool("gated")) + result = execute_tool("gated", {"text": "x"}, config={"disabled_tools": ["gated"]}) + assert "disabled" in result.lower() + + # ------------------------------------------------------------------ # execute_tool # ------------------------------------------------------------------ diff --git a/tool_registry.py b/tool_registry.py index f0a66c2..1438715 100644 --- a/tool_registry.py +++ b/tool_registry.py @@ -8,7 +8,7 @@ import hashlib import json from dataclasses import dataclass -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, Iterable, List, Optional @dataclass @@ -69,9 +69,15 @@ def get_all_tools() -> List[ToolDef]: return list(_registry.values()) -def get_tool_schemas() -> List[Dict[str, Any]]: - """Return the schemas of all registered tools (for API tool parameter).""" - return [t.schema for t in _registry.values()] +def get_tool_schemas(disabled: Iterable[str] = ()) -> List[Dict[str, Any]]: + """Return the schemas of all registered tools (for API tool parameter). + + Tools whose name appears in ``disabled`` are omitted: the LLM never sees + them, so it cannot call them. Use this to opt a new tool (e.g. ContextGC) + out of a session for backwards-compatibility without touching the registry. + """ + skip = frozenset(disabled or ()) + return [t.schema for t in _registry.values() if t.name not in skip] def execute_tool( @@ -91,6 +97,10 @@ def execute_tool( Returns: Tool result string, possibly truncated. """ + disabled = frozenset(config.get("disabled_tools") or ()) + if name in disabled: + return f"Error: tool '{name}' is disabled in this session (see config['disabled_tools'])." + tool = get_tool(name) if tool is None: return f"Error: tool '{name}' not found." From fb85220a8fd43a7cd58095597965da0f548040b0 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sun, 19 Apr 2026 14:47:13 +0200 Subject: [PATCH 09/11] fix: wire state.gc_state into config so ContextGC can reach it agent.run builds a fresh per-turn config dict at the top of the function. It adds _depth and _system_prompt so tools like Agent can read them, but forgot to add _gc_state. As a result every ContextGC invocation returned "Error: no GC state available" and trashed_ids was never mutated in production. Add "_gc_state": state.gc_state to the merge. Because state.gc_state is the same object across turns and is persisted in /save, ContextGC can now read and mutate it, and its effects carry over a /load. Co-Authored-By: Claude Opus 4.7 (1M context) --- agent.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/agent.py b/agent.py index 0e00965..6283cfd 100644 --- a/agent.py +++ b/agent.py @@ -89,8 +89,11 @@ def run( user_msg["images"] = [pending_img] state.messages.append(user_msg) - # Inject runtime metadata into config so tools (e.g. Agent) can access it - config = {**config, "_depth": depth, "_system_prompt": system_prompt} + # Inject runtime metadata into config so tools (e.g. Agent, ContextGC) can access it. + # ContextGC reads and mutates config["_gc_state"]; without this binding, every call + # returns "Error: no GC state available" and no trashed_id is ever recorded. + config = {**config, "_depth": depth, "_system_prompt": system_prompt, + "_gc_state": state.gc_state} session_id = config.get("_session_id", "default") # Wire up structured logging from config (idempotent, cheap) From b49064d298411c8694c5fab12b62904ec088abef Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Sun, 19 Apr 2026 14:47:20 +0200 Subject: [PATCH 10/11] test: ContextGC end-to-end via real agent.run with mocked LLM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three scenarios, each driving agent.run through a multi-turn conversation where only providers.stream is replaced by a scripted generator. All tools (echo + ContextGC) execute for real through the registry. - test_llm_trashes_tool_result_via_contextgc_end_to_end: LLM issues echo, then ContextGC(trash=[echo_id]); asserts state.gc_state.trashed_ids. - test_gc_state_survives_save_and_reload_via_session_helpers: same setup + _build_session_data → JSON → _restore_state_from_data roundtrip, asserts trashed_ids still present after restore. - test_disabled_tools_hides_contextgc_schema_from_llm: confirms that config['disabled_tools']=['ContextGC'] removes the schema from the list sent to the stream, proving backwards-compatibility without touching the registry. These cover the integration points unit tests can't see: tool registration, config injection of _gc_state in agent.run, and the schema-filter path. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_context_gc_e2e.py | 153 +++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 tests/test_context_gc_e2e.py diff --git a/tests/test_context_gc_e2e.py b/tests/test_context_gc_e2e.py new file mode 100644 index 0000000..b74d2b8 --- /dev/null +++ b/tests/test_context_gc_e2e.py @@ -0,0 +1,153 @@ +"""End-to-end: drive a real agent.run() conversation where the LLM calls +ContextGC, and verify gc_state ends up correctly populated + survives a +session save/load roundtrip. + +Only the LLM provider is mocked (via monkeypatching agent.stream). The tool +registry, session serializer and ContextGC dispatch all run for real. +""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +import tools as _tools_init # noqa: F401 - force built-in tool registration +from agent import AgentState, run +from providers import AssistantTurn, TextChunk +from tool_registry import ToolDef, register_tool +from commands.session import _build_session_data, _restore_state_from_data + + +def _scripted_stream(turns): + """Yield pre-scripted AssistantTurn objects one per call to stream(...). + + Signature matches providers.stream(**kwargs). We ignore all kwargs. + """ + cursor = iter(turns) + + def fake_stream(**_kwargs): + spec = next(cursor) + if spec.get("text"): + yield TextChunk(spec["text"]) + yield AssistantTurn( + text=spec.get("text", ""), + tool_calls=spec.get("tool_calls") or [], + in_tokens=1, + out_tokens=1, + ) + + return fake_stream + + +@pytest.fixture +def echo_tool(): + """Register a simple echo tool that returns its input verbatim. + + Non-invasive: leaves the rest of the registry intact (built-ins + plugins + loaded at module import) so unrelated tests sharing the process still see + their tools. Only the echo entry is removed on teardown. + """ + from tool_registry import _registry # private, but fine for test isolation + had_echo_before = "echo" in _registry + register_tool(ToolDef( + name="echo", + schema={ + "name": "echo", + "description": "echo", + "input_schema": { + "type": "object", + "properties": {"text": {"type": "string"}}, + }, + }, + func=lambda params, _cfg: f"echoed: {params.get('text', '')}", + read_only=True, concurrent_safe=True, + )) + yield + if not had_echo_before: + _registry.pop("echo", None) + + +def test_llm_trashes_tool_result_via_contextgc_end_to_end(monkeypatch, echo_tool): + """LLM calls echo, then ContextGC(trash=[echo_id]); gc_state is mutated.""" + turns = [ + # Turn 1 (first stream call): LLM issues the echo tool call. + {"tool_calls": [ + {"id": "echo_42", "name": "echo", "input": {"text": "hi"}}, + ]}, + # Turn 2: LLM follows up with ContextGC to trash echo_42. + {"tool_calls": [ + {"id": "gc_1", "name": "ContextGC", "input": {"trash": ["echo_42"]}}, + ]}, + # Turn 3: LLM emits plain text; no tool_calls → loop exits. + {"text": "all set"}, + ] + monkeypatch.setattr("agent.stream", _scripted_stream(turns)) + + state = AgentState() + config = {"model": "test", "permission_mode": "accept-all", "_session_id": "gc_e2e"} + + list(run("please echo and clean up", state, config, "system prompt")) + + assert state.gc_state.trashed_ids == {"echo_42"} + assert "[ContextGC result]" not in state.gc_state.trashed_ids + # Neither the ContextGC tool result nor the echo result are deleted from + # state.messages -- only the OUTGOING messages on the next turn are reshaped. + tool_results = [m for m in state.messages if m.get("role") == "tool"] + assert len(tool_results) == 2 + + +def test_gc_state_survives_save_and_reload_via_session_helpers(monkeypatch, echo_tool, tmp_path): + """Roundtrip through _build_session_data / _restore_state_from_data.""" + turns = [ + {"tool_calls": [ + {"id": "echo_1", "name": "echo", "input": {"text": "x"}}, + ]}, + {"tool_calls": [ + {"id": "gc_1", "name": "ContextGC", "input": {"trash": ["echo_1"]}}, + ]}, + {"text": "done"}, + ] + monkeypatch.setattr("agent.stream", _scripted_stream(turns)) + + state = AgentState() + list(run("go", state, {"model": "test", "permission_mode": "accept-all", + "_session_id": "rt"}, "sys")) + assert state.gc_state.trashed_ids == {"echo_1"} + + # Serialize to disk (through JSON to exercise the real save path). + session_path: Path = tmp_path / "session.json" + session_path.write_text( + json.dumps(_build_session_data(state), default=str), encoding="utf-8" + ) + + # Restore into a brand-new state — trashed_ids must come back intact. + reloaded = AgentState() + _restore_state_from_data( + reloaded, json.loads(session_path.read_text(encoding="utf-8")) + ) + assert reloaded.gc_state.trashed_ids == {"echo_1"} + + +def test_disabled_tools_hides_contextgc_schema_from_llm(monkeypatch, echo_tool): + """With config['disabled_tools']=['ContextGC'] the LLM never sees the schema.""" + captured_schemas = [] + + def spy_stream(**kwargs): + captured_schemas.append([s["name"] for s in kwargs.get("tool_schemas") or []]) + yield AssistantTurn(text="hello", tool_calls=[], in_tokens=1, out_tokens=1) + + monkeypatch.setattr("agent.stream", spy_stream) + + state = AgentState() + list(run("hi", state, { + "model": "test", + "permission_mode": "accept-all", + "_session_id": "gated", + "disabled_tools": ["ContextGC"], + }, "sys")) + + assert captured_schemas, "stream() must have been called at least once" + for schemas in captured_schemas: + assert "ContextGC" not in schemas + assert "echo" in schemas # non-disabled tool still present From 2b433474321235f014a92259f0029565b8bd6596 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Tue, 21 Apr 2026 09:49:37 +0200 Subject: [PATCH 11/11] fix: complete context_gc rewrite with NoteSave/NoteRead, compact_xml, methodology protection, stub detection, audit improvements --- agent.py | 4 +- context_gc.py | 310 +++++++++++++++++++++++++++++++++++---- tests/test_context_gc.py | 163 ++++++++++++++++++-- tool_registry.py | 6 + tools/__init__.py | 110 +++++++++++++- 5 files changed, 543 insertions(+), 50 deletions(-) diff --git a/agent.py b/agent.py index 6283cfd..fd31162 100644 --- a/agent.py +++ b/agent.py @@ -36,6 +36,8 @@ class AgentState: # Persisted so trashed_ids, snippets and notes survive /save and /load. # Without this, restoring a session leaks back tool_results the model had trashed. gc_state: GCState = field(default_factory=GCState) + # Timeline of note changes (for debugging / replay) + notes_timeline: list = field(default_factory=list) @dataclass @@ -93,7 +95,7 @@ def run( # ContextGC reads and mutates config["_gc_state"]; without this binding, every call # returns "Error: no GC state available" and no trashed_id is ever recorded. config = {**config, "_depth": depth, "_system_prompt": system_prompt, - "_gc_state": state.gc_state} + "_gc_state": state.gc_state, "_state": state} session_id = config.get("_session_id", "default") # Wire up structured logging from config (idempotent, cheap) diff --git a/context_gc.py b/context_gc.py index 6475cdd..69d3b02 100644 --- a/context_gc.py +++ b/context_gc.py @@ -2,18 +2,71 @@ Lets the LLM trash consumed tool results, keep relevant snippets, and persist notes across turns to manage its context window. + +Flat-file port of bouzecode's context_gc/ package — combines: + state (GCState, process_gc_call, note_save, note_read) + apply (apply_gc, snippet handling) + notes (inject_notes) + audit (build_verbatim_audit_note, prepend_verbatim_audit) + stubs (strip_trashed_stubs, _is_stub, _is_auto_trashed_stub) """ from __future__ import annotations +import re +import time from dataclasses import dataclass, field +# ── Constants ────────────────────────────────────────────────────────────── + +METHODOLOGY_NOTE = "methodology" + + +# ── Stub detection ───────────────────────────────────────────────────────── + +_ELIDED_RE = re.compile(r'\s*') + +# Only matches stubs produced by apply_gc (model-driven trash). +# Does NOT match breadcrumbs from compact_tool_history — +# those must survive so the model retains a trace of prior tool calls. +_TRASHED_STUB_RE = re.compile(r'^\[.{1,60} -- (?:trashed by model|auto-trashed)\]$') +_AUTO_TRASHED_RE = re.compile(r'^\[.{1,60} -- auto-trashed\]$') + + +def _is_stub(content: str) -> bool: + """Return True for any GC stub (model-trashed OR auto-trashed). + + Used by audit to skip all stubs in the verbatim audit note. + """ + if not content or len(content) > 200: + return False + stripped = content.strip() + if not stripped: + return False + return bool(_TRASHED_STUB_RE.match(stripped)) + + +def _is_auto_trashed_stub(content: str) -> bool: + """Return True only for auto-trashed stubs (ContextGC's own results).""" + if not content or len(content) > 200: + return False + stripped = content.strip() + if not stripped: + return False + return bool(_AUTO_TRASHED_RE.match(stripped)) + + +# ── GCState ──────────────────────────────────────────────────────────────── + @dataclass class GCState: trashed_ids: set = field(default_factory=set) snippets: dict = field(default_factory=dict) notes: dict = field(default_factory=dict) + compact_xml: bool = False + +# ── Process ContextGC tool call ──────────────────────────────────────────── def process_gc_call(params: dict, config: dict) -> str: gc_state: GCState = config.get("_gc_state") @@ -25,6 +78,8 @@ def process_gc_call(params: dict, config: dict) -> str: notes = params.get("notes") or [] trash_notes = params.get("trash_notes") or [] + notes_before = dict(gc_state.notes) + for tid in trashed: gc_state.trashed_ids.add(tid) gc_state.snippets.pop(tid, None) @@ -40,9 +95,30 @@ def process_gc_call(params: dict, config: dict) -> str: if name: gc_state.notes[name] = content + methodology_protected = False for name in trash_notes: + if name == METHODOLOGY_NOTE: + methodology_protected = True + continue gc_state.notes.pop(name, None) + if params.get("compact_xml"): + gc_state.compact_xml = True + + # Track notes timeline + added = [k for k in gc_state.notes if k not in notes_before] + updated = [k for k in gc_state.notes if k in notes_before and gc_state.notes[k] != notes_before[k]] + removed = [k for k in notes_before if k not in gc_state.notes] + if added or updated or removed: + state = config.get("_state") + if state is not None and hasattr(state, "notes_timeline"): + state.notes_timeline.append({ + "turn": getattr(state, "turn_count", 0), + "timestamp": time.time(), + "notes": dict(gc_state.notes), + "delta": {"added": added, "updated": updated, "removed": removed}, + }) + parts = [] if trashed: parts.append(f"trashed {len(trashed)} results") @@ -51,47 +127,132 @@ def process_gc_call(params: dict, config: dict) -> str: if notes: parts.append(f"{len(notes)} notes saved") if trash_notes: - parts.append(f"{len(trash_notes)} notes removed") + trashed_count = len(trash_notes) - (1 if methodology_protected else 0) + if trashed_count: + parts.append(f"{trashed_count} notes removed") + if methodology_protected: + parts.append(f"note '{METHODOLOGY_NOTE}' protected from trash") + if params.get("compact_xml"): + parts.append("XML compaction enabled") parts.append(f"{len(gc_state.notes)} active notes, {len(gc_state.trashed_ids)} total trashed") return "GC applied: " + ", ".join(parts) -def apply_gc(messages: list, gc_state: GCState) -> list: - """Return a new message list with trashed tool_results stubbed and kept snippets trimmed. +# ── NoteSave / NoteRead ─────────────────────────────────────────────────── - Non-destructive: original messages are preserved in state.messages so /save + /load - can restore the full conversation. Only the message list sent to the API is reshaped. - """ - if not gc_state.trashed_ids and not gc_state.snippets: - return messages - return [_apply_gc_to_message(msg, gc_state) for msg in messages] +def note_save(params: dict, config: dict) -> str: + gc_state: GCState = config.get("_gc_state") + if gc_state is None: + return "Error: no GC state available" + + name = params.get("name", "") + content = params.get("content", "") + if not name: + return "Error: 'name' is required" + + notes_before = dict(gc_state.notes) + gc_state.notes[name] = content + + is_new = name not in notes_before + changed = not is_new and notes_before[name] != content + if is_new or changed: + state = config.get("_state") + if state is not None and hasattr(state, "notes_timeline"): + state.notes_timeline.append({ + "turn": getattr(state, "turn_count", 0), + "timestamp": time.time(), + "notes": dict(gc_state.notes), + "delta": { + "added": [name] if is_new else [], + "updated": [name] if changed else [], + "removed": [], + }, + }) + + action = "created" if is_new else ("updated" if changed else "unchanged") + return f"Note '{name}' {action}. {len(gc_state.notes)} active notes." + + +def note_read(params: dict, config: dict) -> str: + gc_state: GCState = config.get("_gc_state") + if gc_state is None: + return "Error: no GC state available" + name = params.get("name") + if name: + content = gc_state.notes.get(name) + if content is None: + available = ", ".join(sorted(gc_state.notes)) or "(none)" + return f"Note '{name}' not found. Active notes: {available}" + return f"## {name}\n{content}" -def _apply_gc_to_message(msg: dict, gc_state: GCState) -> dict: - """Apply GC rules to a single message; returns original msg untouched if no rule matches.""" - if msg.get("role") != "tool": - return msg - tool_call_id = msg.get("tool_call_id", "") - if tool_call_id in gc_state.trashed_ids: - return _stub_trashed_tool_result(msg) - if tool_call_id in gc_state.snippets: - return _apply_snippet_to_message(msg, gc_state.snippets[tool_call_id]) - return msg + if not gc_state.notes: + return "No active notes." + parts = [] + for n, c in gc_state.notes.items(): + parts.append(f"## {n}\n{c}") + return "\n\n".join(parts) -def _stub_trashed_tool_result(msg: dict) -> dict: - """Replace a tool_result's content with a short stub the model can recognise.""" - stubbed = dict(msg) - name = msg.get("name", "tool") - stubbed["content"] = f"[{name} result -- trashed by model]" - return stubbed +# ── Apply GC (transform messages before API call) ───────────────────────── +def apply_gc(messages: list, gc_state: GCState) -> list: + if not gc_state.trashed_ids and not gc_state.snippets and not gc_state.compact_xml: + return messages -def _apply_snippet_to_message(msg: dict, snippet: dict) -> dict: - """Apply a keep_{after,before,between} snippet rule to a tool_result message.""" - transformed = dict(msg) - transformed["content"] = _apply_snippet(msg.get("content", ""), snippet) - return transformed + _compact_all = None + _compact_selective = None + last_asst_idx = -1 + + if gc_state.compact_xml: + from followup_compaction import compact_assistant_xml + _compact_all = compact_assistant_xml + for i in range(len(messages) - 1, -1, -1): + if messages[i].get("role") == "assistant": + last_asst_idx = i + break + + if gc_state.trashed_ids: + from followup_compaction import compact_assistant_xml_selective + _compact_selective = compact_assistant_xml_selective + + result = [] + for idx, msg in enumerate(messages): + role = msg.get("role") + if role == "assistant" and msg.get("tool_calls"): + if _compact_all and idx != last_asst_idx: + stubbed = dict(msg) + stubbed["content"] = _compact_all(msg["content"], msg["tool_calls"]) + result.append(stubbed) + continue + if _compact_selective: + tc_ids = {tc.get("id") for tc in msg["tool_calls"]} + targeted = tc_ids & gc_state.trashed_ids + if targeted: + stubbed = dict(msg) + stubbed["content"] = _compact_selective( + msg["content"], msg["tool_calls"], targeted, + ) + result.append(stubbed) + continue + result.append(msg) + continue + if role != "tool": + result.append(msg) + continue + tc_id = msg.get("tool_call_id", "") + if tc_id in gc_state.trashed_ids: + stubbed = dict(msg) + name = msg.get("name", "tool") + stubbed["content"] = f"[{name} result -- trashed by model]" + result.append(stubbed) + elif tc_id in gc_state.snippets: + transformed = dict(msg) + transformed["content"] = _apply_snippet(msg["content"], gc_state.snippets[tc_id]) + result.append(transformed) + else: + result.append(msg) + return result def _apply_snippet(content: str, snippet: dict) -> str: @@ -145,6 +306,8 @@ def _find_anchor_line(lines: list, text: str, start_from: int = 0) -> int | None return None +# ── Notes injection ─────────────────────────────────────────────────────── + def inject_notes(messages: list, notes: dict) -> list: if not notes: return messages @@ -161,8 +324,49 @@ def inject_notes(messages: list, notes: dict) -> list: return result +# ── Verbatim audit ──────────────────────────────────────────────────────── + +_ARGS_PREFERRED_KEY = { + "Read": "file_path", "Edit": "file_path", "Write": "file_path", + "NotebookEdit": "notebook_path", + "Glob": "pattern", "Grep": "pattern", + "Bash": "command", + "WebFetch": "url", "WebSearch": "query", +} + + +def _summarize_args(tool_name: str, input_dict: dict, max_len: int = 60) -> str: + if not input_dict: + return "" + val = input_dict.get(_ARGS_PREFERRED_KEY.get(tool_name, "")) + if val is None: + for v in input_dict.values(): + if isinstance(v, str) and v: + val = v + break + if val is None: + return "" + val = str(val).replace("\n", " ") + if len(val) > max_len: + val = val[: max_len - 3] + "..." + return val + + def build_verbatim_audit_note(messages: list) -> str: + """List every tool_result still kept verbatim with its token size. + + Each entry includes the tool's key arg (file_path, pattern, command...) + so the model can correlate notes with results already in context. + """ from compaction import estimate_tokens + args_by_id: dict[str, dict] = {} + for message in messages: + if message.get("role") != "assistant": + continue + for tc in message.get("tool_calls") or []: + tc_id = tc.get("id") + if tc_id: + args_by_id[tc_id] = tc.get("input") or {} lines = [] for message in messages: if message.get("role") != "tool": @@ -173,12 +377,14 @@ def build_verbatim_audit_note(messages: list) -> str: block.get("text", "") if isinstance(block, dict) else str(block) for block in content ) - if " str: def prepend_verbatim_audit(messages: list) -> list: + """Prepend the verbatim audit note to the last user message.""" note = build_verbatim_audit_note(messages) if not note: return messages @@ -199,3 +406,42 @@ def prepend_verbatim_audit(messages: list) -> list: result[i]["content"] = note + "\n\n" + result[i]["content"] break return result + + +# ── Strip auto-trashed stubs ───────────────────────────────────────────── + +def strip_trashed_stubs(messages: list) -> list: + """Remove auto-trashed tool messages and their tool_call entries entirely.""" + stubbed_ids = set() + for msg in messages: + if msg.get("role") == "tool": + content = msg.get("content", "") + if _is_auto_trashed_stub(content): + tc_id = msg.get("tool_call_id", "") + if tc_id: + stubbed_ids.add(tc_id) + if not stubbed_ids: + return messages + result = [] + for msg in messages: + role = msg.get("role") + if role == "tool" and msg.get("tool_call_id", "") in stubbed_ids: + continue + if role == "assistant" and msg.get("tool_calls"): + original_tcs = msg["tool_calls"] + remaining = [tc for tc in original_tcs if tc.get("id") not in stubbed_ids] + if len(remaining) == len(original_tcs): + result.append(msg) + continue + cleaned = dict(msg) + content = cleaned.get("content", "") or "" + if not remaining: + content = _ELIDED_RE.sub("", content).strip() + cleaned.pop("tool_calls", None) + else: + cleaned["tool_calls"] = remaining + cleaned["content"] = content + result.append(cleaned) + continue + result.append(msg) + return result diff --git a/tests/test_context_gc.py b/tests/test_context_gc.py index 60d05f2..9ea2d2d 100644 --- a/tests/test_context_gc.py +++ b/tests/test_context_gc.py @@ -2,9 +2,11 @@ import pytest from context_gc import ( - GCState, process_gc_call, apply_gc, _apply_snippet, - _find_anchor_line, inject_notes, build_verbatim_audit_note, - prepend_verbatim_audit, + GCState, process_gc_call, note_save, note_read, + apply_gc, _apply_snippet, _find_anchor_line, + inject_notes, build_verbatim_audit_note, prepend_verbatim_audit, + _is_stub, _is_auto_trashed_stub, strip_trashed_stubs, + METHODOLOGY_NOTE, _summarize_args, ) @@ -14,6 +16,7 @@ def test_defaults(self): assert gs.trashed_ids == set() assert gs.snippets == {} assert gs.notes == {} + assert gs.compact_xml is False class TestProcessGCCall: @@ -46,6 +49,13 @@ def test_trash_notes(self): assert "1 notes removed" in result assert "old" not in cfg["_gc_state"].notes + def test_methodology_protected(self): + cfg = self._make_config() + cfg["_gc_state"].notes[METHODOLOGY_NOTE] = "important" + result = process_gc_call({"trash_notes": [METHODOLOGY_NOTE]}, cfg) + assert "protected from trash" in result + assert METHODOLOGY_NOTE in cfg["_gc_state"].notes + def test_keep_snippets(self): cfg = self._make_config() result = process_gc_call( @@ -62,6 +72,75 @@ def test_snippet_ignored_if_trashed(self): ) assert "r1" not in cfg["_gc_state"].snippets + def test_compact_xml(self): + cfg = self._make_config() + result = process_gc_call({"compact_xml": True}, cfg) + assert "XML compaction enabled" in result + assert cfg["_gc_state"].compact_xml is True + + +class TestNoteSave: + def _make_config(self): + return {"_gc_state": GCState()} + + def test_no_gc_state(self): + assert "Error" in note_save({}, {}) + + def test_missing_name(self): + cfg = self._make_config() + assert "Error" in note_save({"content": "x"}, cfg) + + def test_create(self): + cfg = self._make_config() + result = note_save({"name": "k", "content": "v"}, cfg) + assert "created" in result + assert cfg["_gc_state"].notes["k"] == "v" + + def test_update(self): + cfg = self._make_config() + cfg["_gc_state"].notes["k"] = "old" + result = note_save({"name": "k", "content": "new"}, cfg) + assert "updated" in result + assert cfg["_gc_state"].notes["k"] == "new" + + def test_unchanged(self): + cfg = self._make_config() + cfg["_gc_state"].notes["k"] = "same" + result = note_save({"name": "k", "content": "same"}, cfg) + assert "unchanged" in result + + +class TestNoteRead: + def _make_config(self): + return {"_gc_state": GCState()} + + def test_no_gc_state(self): + assert "Error" in note_read({}, {}) + + def test_read_specific(self): + cfg = self._make_config() + cfg["_gc_state"].notes["k"] = "v" + result = note_read({"name": "k"}, cfg) + assert "## k\nv" in result + + def test_read_not_found(self): + cfg = self._make_config() + result = note_read({"name": "missing"}, cfg) + assert "not found" in result + + def test_read_all(self): + cfg = self._make_config() + cfg["_gc_state"].notes["a"] = "1" + cfg["_gc_state"].notes["b"] = "2" + result = note_read({}, cfg) + assert "## a\n1" in result + assert "## b\n2" in result + + def test_read_all_empty(self): + cfg = self._make_config() + result = note_read({}, cfg) + assert "No active notes" in result + class TestApplyGC: def test_no_changes(self): @@ -171,6 +250,65 @@ def test_injects_in_last_user_msg(self): assert result[0]["content"] == "first" +class TestStubs: + def test_is_stub_trashed(self): + assert _is_stub("[Read result -- trashed by model]") is True + + def test_is_stub_auto_trashed(self): + assert _is_stub("[ContextGC result -- auto-trashed]") is True + + def test_is_stub_normal_content(self): + assert _is_stub("some normal content") is False + + def test_is_stub_too_long(self): + assert _is_stub("x" * 201) is False + + def test_is_auto_trashed(self): + assert _is_auto_trashed_stub("[ContextGC result -- auto-trashed]") is True + assert _is_auto_trashed_stub("[Read result -- trashed by model]") is False + + def test_strip_trashed_stubs(self): + msgs = [ + {"role": "assistant", "content": "text", "tool_calls": [ + {"id": "gc1", "name": "ContextGC", "input": {}}, + {"id": "r1", "name": "Read", "input": {}}, + ]}, + {"role": "tool", "tool_call_id": "gc1", "name": "ContextGC", + "content": "[ContextGC result -- auto-trashed]"}, + {"role": "tool", "tool_call_id": "r1", "name": "Read", + "content": "file content"}, + ] + result = strip_trashed_stubs(msgs) + assert len(result) == 2 # assistant + Read result + # gc1 tool_call removed from assistant + assert len(result[0]["tool_calls"]) == 1 + assert result[0]["tool_calls"][0]["id"] == "r1" + + def test_strip_no_stubs(self): + msgs = [{"role": "user", "content": "hi"}] + assert strip_trashed_stubs(msgs) is msgs + + +class TestSummarizeArgs: + def test_read(self): + assert _summarize_args("Read", {"file_path": "/a/b.py"}) == "/a/b.py" + + def test_bash(self): + assert "echo" in _summarize_args("Bash", {"command": "echo hi"}) + + def test_truncate(self): + result = _summarize_args("Read", {"file_path": "x" * 100}, max_len=20) + assert len(result) == 20 + assert result.endswith("...") + + def test_empty(self): + assert _summarize_args("Read", {}) == "" + + def test_fallback(self): + result = _summarize_args("Custom", {"arg": "val"}) + assert result == "val" + + class TestVerbatimAudit: def test_empty(self): assert build_verbatim_audit_note([]) == "" @@ -180,16 +318,23 @@ def test_skips_trashed(self): "content": "[Read result -- trashed by model]"}] assert build_verbatim_audit_note(msgs) == "" - def test_skips_elided(self): - msgs = [{"role": "tool", "tool_call_id": "t1", "name": "Read", - "content": ''}] + def test_skips_auto_trashed(self): + msgs = [{"role": "tool", "tool_call_id": "t1", "name": "ContextGC", + "content": "[ContextGC result -- auto-trashed]"}] assert build_verbatim_audit_note(msgs) == "" def test_includes_verbatim(self): - msgs = [{"role": "tool", "tool_call_id": "r1", "name": "Read", - "content": "file content here"}] + msgs = [ + {"role": "assistant", "content": "", "tool_calls": [ + {"id": "r1", "name": "Read", "input": {"file_path": "test.py"}} + ]}, + {"role": "tool", "tool_call_id": "r1", "name": "Read", + "content": "file content here"}, + ] result = build_verbatim_audit_note(msgs) - assert "r1 (Read)" in result + assert "r1" in result + assert "Read" in result + assert "test.py" in result assert "tk" in result def test_prepend(self): diff --git a/tool_registry.py b/tool_registry.py index 1438715..2d0132f 100644 --- a/tool_registry.py +++ b/tool_registry.py @@ -64,6 +64,12 @@ def get_tool(name: str) -> Optional[ToolDef]: return _registry.get(name) +def is_concurrent_safe(name: str) -> bool: + """Return True if the named tool is safe to run in parallel.""" + tool = _registry.get(name) + return tool.concurrent_safe if tool else False + + def get_all_tools() -> List[ToolDef]: """Return all registered tools (insertion order).""" return list(_registry.values()) diff --git a/tools/__init__.py b/tools/__init__.py index 50508f1..cee1e55 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -333,20 +333,99 @@ "required": ["seconds"], }, }, + { + "name": "NoteSave", + "description": ( + "Save or update a working-memory note. Notes persist across turns and are " + "injected into your context automatically. Use for plans, key findings, " + "extracted facts, and methodology tracking." + ), + "input_schema": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Unique note name (overwrites if exists)"}, + "content": {"type": "string", "description": "Note content (markdown supported)"}, + }, + "required": ["name", "content"], + }, + }, + { + "name": "NoteRead", + "description": ( + "Read one or all working-memory notes. " + "Omit 'name' to list all active notes with their content." + ), + "input_schema": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Note name to read. Omit to read all."}, + }, + "required": [], + }, + }, { "name": "ContextGC", "description": ( - "Garbage-collect your context to free space. Trash tool results you no longer " - "need, keep only relevant snippets from large results, and save key information " - "in notes that persist across turns." + "Garbage-collect your context to free space. MANDATORY: call this at the end of " + "every turn with tool calls. Trash tool results you no longer need, keep only " + "relevant snippets from large results, and save key information in notes that " + "persist across turns. Use compact_xml=true to strip verbose XML from your own " + "old assistant outputs." ), "input_schema": { "type": "object", "properties": { - "trash": {"type": "array", "items": {"type": "string"}, "description": "tool_call_ids to fully discard"}, - "keep_snippets": {"type": "array", "description": "Partial keeps with text anchors"}, - "notes": {"type": "array", "description": "Named scratchpad entries: {name, content}"}, - "trash_notes": {"type": "array", "items": {"type": "string"}, "description": "Note names to discard"}, + "trash": { + "type": "array", + "items": {"type": "string"}, + "description": ( + "tool_call_ids to fully discard. Works on ANY tool result: " + "Read, Grep, Bash, Skill, GetFolderDescription, WebFetch, etc." + ), + }, + "keep_snippets": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string", "description": "tool_call_id of the result to trim"}, + "keep_after": {"type": "string", "description": "Keep from line containing this text to end"}, + "keep_before": {"type": "string", "description": "Keep from start to line before this text"}, + "keep_between": { + "type": "array", + "items": {"type": "string"}, + "description": "Keep between two text anchors [start_text, end_text]", + }, + }, + "required": ["id"], + }, + "description": "Partial keeps: trim results using text anchors", + }, + "notes": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Unique note name"}, + "content": {"type": "string", "description": "Note content"}, + }, + "required": ["name", "content"], + }, + "description": "Named scratchpad entries that persist across turns", + }, + "trash_notes": { + "type": "array", + "items": {"type": "string"}, + "description": "Note names to discard", + }, + "compact_xml": { + "type": "boolean", + "description": ( + "Strip verbose XML tool_use blocks from old assistant messages, " + "replacing each with a one-line summary. Keeps prose intact. " + "Once enabled, stays on for the rest of the session." + ), + }, }, "required": [], }, @@ -498,13 +577,28 @@ def _register_builtins() -> None: ), ] + # NoteSave / NoteRead tools + from context_gc import note_save, note_read + _tool_defs.append(ToolDef( + name="NoteSave", + schema=_schemas["NoteSave"], + func=lambda p, c: note_save(p, c), + read_only=False, concurrent_safe=True, + )) + _tool_defs.append(ToolDef( + name="NoteRead", + schema=_schemas["NoteRead"], + func=lambda p, c: note_read(p, c), + read_only=True, concurrent_safe=True, + )) + # ContextGC tool from context_gc import process_gc_call _tool_defs.append(ToolDef( name="ContextGC", schema=_schemas["ContextGC"], func=lambda p, c: process_gc_call(p, c), - read_only=False, concurrent_safe=False, + read_only=True, concurrent_safe=True, )) for td in _tool_defs: