From ef782008658072a3e9ea0e93090a2addfd81e823 Mon Sep 17 00:00:00 2001 From: Feng GAO Date: Tue, 28 Apr 2026 17:48:16 +0800 Subject: [PATCH] runtime: bound bash logs and skip runtime mirrors --- src/deepscientist/bash_exec/monitor.py | 33 ++++++- src/deepscientist/bash_exec/service.py | 118 ++++++++++++++++++++++++- src/deepscientist/quest/service.py | 14 +++ tests/test_bash_exec_monitor.py | 98 ++++++++++++++++++++ tests/test_memory_and_artifact.py | 25 ++++++ 5 files changed, 284 insertions(+), 4 deletions(-) create mode 100644 tests/test_bash_exec_monitor.py diff --git a/src/deepscientist/bash_exec/monitor.py b/src/deepscientist/bash_exec/monitor.py index f38844e7..7dc5dd89 100644 --- a/src/deepscientist/bash_exec/monitor.py +++ b/src/deepscientist/bash_exec/monitor.py @@ -33,6 +33,9 @@ DEFAULT_STOP_GRACE_SECONDS = 5 TERMINAL_IO_POLL_SECONDS = 0.02 +MAX_BUFFERED_LINE_CHARS = 128_000 +MAX_TERMINAL_LOG_LINE_CHARS = 32_768 +TERMINAL_LOG_PREVIEW_EDGE_CHARS = 4_096 def _append_jsonl(path: Path, payload: dict[str, Any]) -> None: @@ -209,6 +212,7 @@ def _drain_buffer( *, flush_partial: bool = False, carriage_mode: str = "marker", + max_buffer_chars: int = MAX_BUFFERED_LINE_CHARS, ) -> str: while True: index_r = buffer.find("\r") @@ -230,12 +234,34 @@ def _drain_buffer( segment = buffer[:index_n] buffer = buffer[index_n + 1 :] append_line(segment) + while max_buffer_chars > 0 and len(buffer) > max_buffer_chars: + append_line(buffer[:max_buffer_chars], stream="partial") + buffer = buffer[max_buffer_chars:] if flush_partial and buffer: append_line(buffer, stream="partial") return "" return buffer +def _render_terminal_log_line( + line: str, + *, + max_chars: int = MAX_TERMINAL_LOG_LINE_CHARS, + edge_chars: int = TERMINAL_LOG_PREVIEW_EDGE_CHARS, +) -> str: + if max_chars <= 0 or len(line) <= max_chars: + return line + preview_chars = max(0, min(edge_chars, max_chars // 2)) + if preview_chars <= 0: + return f"[truncated oversized output line: original_length={len(line)} chars]" + omitted = len(line) - (preview_chars * 2) + return ( + f"{line[:preview_chars]}" + f"[... omitted {omitted} chars from oversized output line; full content remains in log.jsonl ...]" + f"{line[-preview_chars:]}" + ) + + def _parse_terminal_prompt_marker(line: str) -> dict[str, str] | None: if not line.startswith(BASH_TERMINAL_PROMPT_PREFIX): return None @@ -354,13 +380,14 @@ def append_line(line: str, *, stream: str = "stdout") -> None: return seq += 1 timestamp = utc_now() + terminal_line = _render_terminal_log_line(line) with terminal_log_path.open("a", encoding="utf-8") as handle: if stream == "partial": - handle.write(line) + handle.write(terminal_line) elif stream == "carriage": - handle.write(f"\r{line}") + handle.write(f"\r{terminal_line}") else: - handle.write(f"{line}\n") + handle.write(f"{terminal_line}\n") _append_jsonl( log_path, { diff --git a/src/deepscientist/bash_exec/service.py b/src/deepscientist/bash_exec/service.py index 637ee9c5..883d6d38 100644 --- a/src/deepscientist/bash_exec/service.py +++ b/src/deepscientist/bash_exec/service.py @@ -35,6 +35,8 @@ BASH_WATCHDOG_AFTER_SECONDS = 1800 SUMMARY_RECENT_SESSION_LIMIT = 256 SUMMARY_RUNNING_SESSION_LIMIT = 64 +COMPLETED_RUNTIME_LOG_COMPACT_BYTES = 1_000_000 +COMPLETED_RUNTIME_LOG_WINDOW_BYTES = 256_000 INPUT_ESCAPE_SEQUENCE_RE = re.compile(r"\x1b\[[0-9;?]*[ -/]*[@-~]|\x1b[@-_]") @@ -174,6 +176,83 @@ def _compact_command(command: object, *, max_length: int = 140) -> str: return normalized[: max(0, max_length - 3)].rstrip() + "..." +def _unique_sibling_path(path: Path) -> Path: + if not path.exists(): + return path + stem = path.stem + suffix = path.suffix + for index in range(1, 10_000): + candidate = path.with_name(f"{stem}.{index}{suffix}") + if not candidate.exists(): + return candidate + raise RuntimeError(f"unable_to_allocate_backup_path:{path}") + + +def _relative_to_root(path: Path, root: Path) -> str: + try: + return path.relative_to(root).as_posix() + except ValueError: + return str(path) + + +def _read_head_tail_bytes(path: Path, *, edge_bytes: int) -> tuple[bytes, bytes, int]: + size = path.stat().st_size + resolved_edge = max(0, min(edge_bytes, max(0, size // 2))) + with path.open("rb") as handle: + head = handle.read(resolved_edge) + if resolved_edge > 0: + handle.seek(max(0, size - resolved_edge)) + tail = handle.read(resolved_edge) + else: + tail = b"" + return head, tail, size + + +def _compact_completed_text_log( + path: Path, + *, + quest_root: Path, + max_bytes: int = COMPLETED_RUNTIME_LOG_COMPACT_BYTES, + edge_bytes: int = COMPLETED_RUNTIME_LOG_WINDOW_BYTES, +) -> dict[str, Any] | None: + if not path.exists() or not path.is_file(): + return None + try: + original_bytes = path.stat().st_size + except OSError: + return None + if original_bytes <= max(1, max_bytes): + return None + + backup_path = _unique_sibling_path(path.with_name(f"{path.stem}.full{path.suffix}")) + shutil.move(str(path), str(backup_path)) + head, tail, original_bytes = _read_head_tail_bytes(backup_path, edge_bytes=edge_bytes) + omitted = max(0, original_bytes - len(head) - len(tail)) + backup_ref = _relative_to_root(backup_path, quest_root) + compact_ref = _relative_to_root(path, quest_root) + marker = ( + "\n" + f"[compacted completed runtime log: omitted {omitted} bytes; " + f"full log backup={backup_ref}]\n" + ) + compact_text = ( + head.decode("utf-8", errors="replace") + + marker + + tail.decode("utf-8", errors="replace") + ) + ensure_dir(path.parent) + path.write_text(compact_text, encoding="utf-8") + return { + "mode": "text_head_tail_bytes", + "compact_path": compact_ref, + "backup_path": backup_ref, + "original_bytes": original_bytes, + "compact_bytes": path.stat().st_size, + "omitted_bytes": omitted, + "compacted_at": utc_now(), + } + + class BashExecService: def __init__(self, home: Path) -> None: self.home = home @@ -669,6 +748,38 @@ def _write_meta(self, quest_root: Path, bash_id: str, meta: dict[str, Any]) -> d ) return self._write_summary(quest_root, summary) + def compact_completed_session_logs( + self, + quest_root: Path, + bash_id: str, + *, + max_text_bytes: int = COMPLETED_RUNTIME_LOG_COMPACT_BYTES, + ) -> dict[str, Any]: + meta_path = self.meta_path(quest_root, bash_id) + meta = read_json(meta_path, {}) + if not isinstance(meta, dict) or not meta: + return {} + if _coerce_session_status(meta.get("status")) not in TERMINAL_STATUSES: + return {} + + existing = meta.get("runtime_log_compaction") + compaction: dict[str, Any] = dict(existing) if isinstance(existing, dict) else {} + terminal_compaction = _compact_completed_text_log( + self.terminal_log_path(quest_root, bash_id), + quest_root=quest_root, + max_bytes=max_text_bytes, + ) + if terminal_compaction is not None: + compaction["terminal_log"] = terminal_compaction + + if not compaction or compaction == existing: + return compaction + + meta["runtime_log_compaction"] = compaction + meta["updated_at"] = utc_now() + self._write_meta(quest_root, bash_id, meta) + return compaction + def reconcile_session(self, quest_root: Path, bash_id: str) -> dict[str, Any]: meta_path = self.meta_path(quest_root, bash_id) meta = read_json(meta_path, {}) @@ -676,6 +787,8 @@ def reconcile_session(self, quest_root: Path, bash_id: str) -> dict[str, Any]: raise FileNotFoundError(f"Unknown bash session `{bash_id}`.") status = _coerce_session_status(meta.get("status")) if status in TERMINAL_STATUSES: + self.compact_completed_session_logs(quest_root, bash_id) + meta = read_json(meta_path, meta) or meta return self._session_payload(quest_root, meta) kind = _normalize_string(meta.get("kind")).lower() if kind == "terminal": @@ -695,9 +808,12 @@ def reconcile_session(self, quest_root: Path, bash_id: str) -> dict[str, Any]: return self._session_payload(quest_root, meta) stop_reason = _normalize_string(meta.get("stop_reason")) meta["status"] = "terminated" if stop_reason else "failed" - meta.setdefault("finished_at", utc_now()) + if not _normalize_string(meta.get("finished_at")): + meta["finished_at"] = utc_now() meta["updated_at"] = utc_now() self._write_meta(quest_root, bash_id, meta) + self.compact_completed_session_logs(quest_root, bash_id) + meta = read_json(meta_path, meta) or meta return self._session_payload(quest_root, meta) def get_session(self, quest_root: Path, bash_id: str) -> dict[str, Any]: diff --git a/src/deepscientist/quest/service.py b/src/deepscientist/quest/service.py index 390ac88e..24b118a2 100644 --- a/src/deepscientist/quest/service.py +++ b/src/deepscientist/quest/service.py @@ -7614,6 +7614,20 @@ def _skip_explorer_relative(relative: str) -> bool: return True if relative.startswith(".ds/worktrees/"): return True + heavy_runtime_roots = { + ".ds/bash_exec", + ".ds/runs", + ".ds/codex_history", + ".ds/codex_homes", + ".ds/claude-home", + ".ds/opencode-home", + ".ds/evidence_packets", + ".ds/slim_backups", + ".ds/cold_archive", + } + normalized = relative.strip("/") + if any(normalized == root or normalized.startswith(f"{root}/") for root in heavy_runtime_roots): + return True parts = PurePosixPath(relative).parts return "__pycache__" in parts or ".pytest_cache" in parts diff --git a/tests/test_bash_exec_monitor.py b/tests/test_bash_exec_monitor.py new file mode 100644 index 00000000..ca1ef47f --- /dev/null +++ b/tests/test_bash_exec_monitor.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from pathlib import Path + +from deepscientist.bash_exec import BashExecService +from deepscientist.bash_exec.monitor import _drain_buffer, _render_terminal_log_line +from deepscientist.home import ensure_home_layout +from deepscientist.shared import read_json, write_json + + +def test_drain_buffer_flushes_oversized_unterminated_exec_lines() -> None: + oversized = "x" * 200_000 + emitted: list[tuple[str, str]] = [] + + def append_line(line: str, *, stream: str = "stdout") -> None: + emitted.append((stream, line)) + + remainder = _drain_buffer(oversized, append_line, flush_partial=False) + + partial_chunks = [line for stream, line in emitted if stream == "partial"] + assert partial_chunks + assert "".join(partial_chunks) + remainder == oversized + assert max(len(chunk) for chunk in partial_chunks) <= 128_000 + assert len(remainder) <= 128_000 + + +def test_render_terminal_log_line_truncates_huge_single_line_output() -> None: + line = "prefix:" + ("x" * 100_000) + ":suffix" + + rendered = _render_terminal_log_line(line) + + assert rendered.startswith("prefix:") + assert rendered.endswith(":suffix") + assert "full content remains in log.jsonl" in rendered + assert len(rendered) < len(line) + + +def test_reconcile_session_sets_finished_at_when_existing_value_is_empty(temp_home: Path) -> None: + ensure_home_layout(temp_home) + quest_root = temp_home / "quests" / "q-bash" + service = BashExecService(temp_home) + session_dir = service.session_dir(quest_root, "bash-stale") + session_dir.mkdir(parents=True, exist_ok=True) + write_json( + service.meta_path(quest_root, "bash-stale"), + { + "bash_id": "bash-stale", + "quest_id": "q-bash", + "status": "running", + "kind": "exec", + "command": "printf stale", + "started_at": "2026-04-10T00:00:00+00:00", + "finished_at": None, + "process_pid": 999_999_991, + "monitor_pid": 999_999_992, + }, + ) + + session = service.reconcile_session(quest_root, "bash-stale") + + assert session["status"] == "failed" + assert session["finished_at"] + assert read_json(service.meta_path(quest_root, "bash-stale"), {})["finished_at"] + + +def test_completed_session_compacts_huge_terminal_log_with_backup_reference(temp_home: Path) -> None: + ensure_home_layout(temp_home) + quest_root = temp_home / "quests" / "q-bash" + service = BashExecService(temp_home) + session_dir = service.session_dir(quest_root, "bash-huge-log") + session_dir.mkdir(parents=True, exist_ok=True) + write_json( + service.meta_path(quest_root, "bash-huge-log"), + { + "bash_id": "bash-huge-log", + "quest_id": "q-bash", + "status": "completed", + "kind": "exec", + "command": "cat huge.log", + "started_at": "2026-04-10T00:00:00+00:00", + "finished_at": "2026-04-10T00:00:01+00:00", + }, + ) + huge_line = "prefix:" + ("x" * 1_300_000) + ":suffix\n" + service.terminal_log_path(quest_root, "bash-huge-log").write_text(huge_line, encoding="utf-8") + + session = service.reconcile_session(quest_root, "bash-huge-log") + + compaction = session["runtime_log_compaction"]["terminal_log"] + compact_path = quest_root / compaction["compact_path"] + backup_path = quest_root / compaction["backup_path"] + assert compact_path == service.terminal_log_path(quest_root, "bash-huge-log") + assert compact_path.exists() + assert backup_path.exists() + assert backup_path.read_text(encoding="utf-8") == huge_line + compacted_text = compact_path.read_text(encoding="utf-8") + assert "compacted completed runtime log" in compacted_text + assert compact_path.stat().st_size < 600_000 diff --git a/tests/test_memory_and_artifact.py b/tests/test_memory_and_artifact.py index 8e29f31f..d86a3596 100644 --- a/tests/test_memory_and_artifact.py +++ b/tests/test_memory_and_artifact.py @@ -4160,6 +4160,31 @@ def test_explorer_search_finds_paths_and_normalizes_legacy_glob_wrappers(temp_ho assert any(item["path"] == "experiments/analysis/new-slice/scripts/run_probe.py" for item in wrapped_result["items"]) +def test_explorer_search_skips_heavy_runtime_mirrors_by_default(temp_home: Path) -> None: + ensure_home_layout(temp_home) + ConfigManager(temp_home).ensure_files() + quest_service = QuestService(temp_home, skill_installer=SkillInstaller(repo_root(), temp_home)) + quest = quest_service.create("explorer search runtime hygiene quest") + quest_root = Path(quest["quest_root"]) + + note_path = quest_root / "notes" / "runtime-note.md" + note_path.parent.mkdir(parents=True, exist_ok=True) + note_path.write_text("needle-runtime-hygiene in user content\n", encoding="utf-8") + terminal_log = quest_root / ".ds" / "bash_exec" / "bash-001" / "terminal.log" + terminal_log.parent.mkdir(parents=True, exist_ok=True) + terminal_log.write_text("needle-runtime-hygiene in heavy runtime mirror\n", encoding="utf-8") + run_prompt = quest_root / ".ds" / "runs" / "run-001" / "prompt.md" + run_prompt.parent.mkdir(parents=True, exist_ok=True) + run_prompt.write_text("needle-runtime-hygiene in runner prompt mirror\n", encoding="utf-8") + + result = quest_service.search_files(quest["quest_id"], "needle-runtime-hygiene", limit=10) + + paths = {item["path"] for item in result["items"]} + assert "notes/runtime-note.md" in paths + assert ".ds/bash_exec/bash-001/terminal.log" not in paths + assert ".ds/runs/run-001/prompt.md" not in paths + + def test_explorer_opens_image_files_as_assets(temp_home: Path) -> None: ensure_home_layout(temp_home) ConfigManager(temp_home).ensure_files()