Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions src/deepscientist/bash_exec/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@

DEFAULT_STOP_GRACE_SECONDS = 5
TERMINAL_IO_POLL_SECONDS = 0.02
MAX_BUFFERED_LINE_CHARS = 128_000
MAX_TERMINAL_LOG_LINE_CHARS = 32_768
TERMINAL_LOG_PREVIEW_EDGE_CHARS = 4_096


def _append_jsonl(path: Path, payload: dict[str, Any]) -> None:
Expand Down Expand Up @@ -209,6 +212,7 @@ def _drain_buffer(
*,
flush_partial: bool = False,
carriage_mode: str = "marker",
max_buffer_chars: int = MAX_BUFFERED_LINE_CHARS,
) -> str:
while True:
index_r = buffer.find("\r")
Expand All @@ -230,12 +234,34 @@ def _drain_buffer(
segment = buffer[:index_n]
buffer = buffer[index_n + 1 :]
append_line(segment)
while max_buffer_chars > 0 and len(buffer) > max_buffer_chars:
append_line(buffer[:max_buffer_chars], stream="partial")
buffer = buffer[max_buffer_chars:]
if flush_partial and buffer:
append_line(buffer, stream="partial")
return ""
return buffer


def _render_terminal_log_line(
line: str,
*,
max_chars: int = MAX_TERMINAL_LOG_LINE_CHARS,
edge_chars: int = TERMINAL_LOG_PREVIEW_EDGE_CHARS,
) -> str:
if max_chars <= 0 or len(line) <= max_chars:
return line
preview_chars = max(0, min(edge_chars, max_chars // 2))
if preview_chars <= 0:
return f"[truncated oversized output line: original_length={len(line)} chars]"
omitted = len(line) - (preview_chars * 2)
return (
f"{line[:preview_chars]}"
f"[... omitted {omitted} chars from oversized output line; full content remains in log.jsonl ...]"
f"{line[-preview_chars:]}"
)


def _parse_terminal_prompt_marker(line: str) -> dict[str, str] | None:
if not line.startswith(BASH_TERMINAL_PROMPT_PREFIX):
return None
Expand Down Expand Up @@ -354,13 +380,14 @@ def append_line(line: str, *, stream: str = "stdout") -> None:
return
seq += 1
timestamp = utc_now()
terminal_line = _render_terminal_log_line(line)
with terminal_log_path.open("a", encoding="utf-8") as handle:
if stream == "partial":
handle.write(line)
handle.write(terminal_line)
elif stream == "carriage":
handle.write(f"\r{line}")
handle.write(f"\r{terminal_line}")
else:
handle.write(f"{line}\n")
handle.write(f"{terminal_line}\n")
_append_jsonl(
log_path,
{
Expand Down
118 changes: 117 additions & 1 deletion src/deepscientist/bash_exec/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
BASH_WATCHDOG_AFTER_SECONDS = 1800
SUMMARY_RECENT_SESSION_LIMIT = 256
SUMMARY_RUNNING_SESSION_LIMIT = 64
COMPLETED_RUNTIME_LOG_COMPACT_BYTES = 1_000_000
COMPLETED_RUNTIME_LOG_WINDOW_BYTES = 256_000
INPUT_ESCAPE_SEQUENCE_RE = re.compile(r"\x1b\[[0-9;?]*[ -/]*[@-~]|\x1b[@-_]")


Expand Down Expand Up @@ -174,6 +176,83 @@ def _compact_command(command: object, *, max_length: int = 140) -> str:
return normalized[: max(0, max_length - 3)].rstrip() + "..."


def _unique_sibling_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
for index in range(1, 10_000):
candidate = path.with_name(f"{stem}.{index}{suffix}")
if not candidate.exists():
return candidate
raise RuntimeError(f"unable_to_allocate_backup_path:{path}")


def _relative_to_root(path: Path, root: Path) -> str:
try:
return path.relative_to(root).as_posix()
except ValueError:
return str(path)


def _read_head_tail_bytes(path: Path, *, edge_bytes: int) -> tuple[bytes, bytes, int]:
size = path.stat().st_size
resolved_edge = max(0, min(edge_bytes, max(0, size // 2)))
with path.open("rb") as handle:
head = handle.read(resolved_edge)
if resolved_edge > 0:
handle.seek(max(0, size - resolved_edge))
tail = handle.read(resolved_edge)
else:
tail = b""
return head, tail, size


def _compact_completed_text_log(
path: Path,
*,
quest_root: Path,
max_bytes: int = COMPLETED_RUNTIME_LOG_COMPACT_BYTES,
edge_bytes: int = COMPLETED_RUNTIME_LOG_WINDOW_BYTES,
) -> dict[str, Any] | None:
if not path.exists() or not path.is_file():
return None
try:
original_bytes = path.stat().st_size
except OSError:
return None
if original_bytes <= max(1, max_bytes):
return None

backup_path = _unique_sibling_path(path.with_name(f"{path.stem}.full{path.suffix}"))
shutil.move(str(path), str(backup_path))
head, tail, original_bytes = _read_head_tail_bytes(backup_path, edge_bytes=edge_bytes)
omitted = max(0, original_bytes - len(head) - len(tail))
backup_ref = _relative_to_root(backup_path, quest_root)
compact_ref = _relative_to_root(path, quest_root)
marker = (
"\n"
f"[compacted completed runtime log: omitted {omitted} bytes; "
f"full log backup={backup_ref}]\n"
)
compact_text = (
head.decode("utf-8", errors="replace")
+ marker
+ tail.decode("utf-8", errors="replace")
)
ensure_dir(path.parent)
path.write_text(compact_text, encoding="utf-8")
return {
"mode": "text_head_tail_bytes",
"compact_path": compact_ref,
"backup_path": backup_ref,
"original_bytes": original_bytes,
"compact_bytes": path.stat().st_size,
"omitted_bytes": omitted,
"compacted_at": utc_now(),
}


class BashExecService:
def __init__(self, home: Path) -> None:
self.home = home
Expand Down Expand Up @@ -669,13 +748,47 @@ def _write_meta(self, quest_root: Path, bash_id: str, meta: dict[str, Any]) -> d
)
return self._write_summary(quest_root, summary)

def compact_completed_session_logs(
self,
quest_root: Path,
bash_id: str,
*,
max_text_bytes: int = COMPLETED_RUNTIME_LOG_COMPACT_BYTES,
) -> dict[str, Any]:
meta_path = self.meta_path(quest_root, bash_id)
meta = read_json(meta_path, {})
if not isinstance(meta, dict) or not meta:
return {}
if _coerce_session_status(meta.get("status")) not in TERMINAL_STATUSES:
return {}

existing = meta.get("runtime_log_compaction")
compaction: dict[str, Any] = dict(existing) if isinstance(existing, dict) else {}
terminal_compaction = _compact_completed_text_log(
self.terminal_log_path(quest_root, bash_id),
quest_root=quest_root,
max_bytes=max_text_bytes,
)
if terminal_compaction is not None:
compaction["terminal_log"] = terminal_compaction

if not compaction or compaction == existing:
return compaction

meta["runtime_log_compaction"] = compaction
meta["updated_at"] = utc_now()
self._write_meta(quest_root, bash_id, meta)
return compaction

def reconcile_session(self, quest_root: Path, bash_id: str) -> dict[str, Any]:
meta_path = self.meta_path(quest_root, bash_id)
meta = read_json(meta_path, {})
if not meta:
raise FileNotFoundError(f"Unknown bash session `{bash_id}`.")
status = _coerce_session_status(meta.get("status"))
if status in TERMINAL_STATUSES:
self.compact_completed_session_logs(quest_root, bash_id)
meta = read_json(meta_path, meta) or meta
return self._session_payload(quest_root, meta)
kind = _normalize_string(meta.get("kind")).lower()
if kind == "terminal":
Expand All @@ -695,9 +808,12 @@ def reconcile_session(self, quest_root: Path, bash_id: str) -> dict[str, Any]:
return self._session_payload(quest_root, meta)
stop_reason = _normalize_string(meta.get("stop_reason"))
meta["status"] = "terminated" if stop_reason else "failed"
meta.setdefault("finished_at", utc_now())
if not _normalize_string(meta.get("finished_at")):
meta["finished_at"] = utc_now()
meta["updated_at"] = utc_now()
self._write_meta(quest_root, bash_id, meta)
self.compact_completed_session_logs(quest_root, bash_id)
meta = read_json(meta_path, meta) or meta
return self._session_payload(quest_root, meta)

def get_session(self, quest_root: Path, bash_id: str) -> dict[str, Any]:
Expand Down
14 changes: 14 additions & 0 deletions src/deepscientist/quest/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7614,6 +7614,20 @@ def _skip_explorer_relative(relative: str) -> bool:
return True
if relative.startswith(".ds/worktrees/"):
return True
heavy_runtime_roots = {
".ds/bash_exec",
".ds/runs",
".ds/codex_history",
".ds/codex_homes",
".ds/claude-home",
".ds/opencode-home",
".ds/evidence_packets",
".ds/slim_backups",
".ds/cold_archive",
}
normalized = relative.strip("/")
if any(normalized == root or normalized.startswith(f"{root}/") for root in heavy_runtime_roots):
return True
parts = PurePosixPath(relative).parts
return "__pycache__" in parts or ".pytest_cache" in parts

Expand Down
98 changes: 98 additions & 0 deletions tests/test_bash_exec_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from __future__ import annotations

from pathlib import Path

from deepscientist.bash_exec import BashExecService
from deepscientist.bash_exec.monitor import _drain_buffer, _render_terminal_log_line
from deepscientist.home import ensure_home_layout
from deepscientist.shared import read_json, write_json


def test_drain_buffer_flushes_oversized_unterminated_exec_lines() -> None:
oversized = "x" * 200_000
emitted: list[tuple[str, str]] = []

def append_line(line: str, *, stream: str = "stdout") -> None:
emitted.append((stream, line))

remainder = _drain_buffer(oversized, append_line, flush_partial=False)

partial_chunks = [line for stream, line in emitted if stream == "partial"]
assert partial_chunks
assert "".join(partial_chunks) + remainder == oversized
assert max(len(chunk) for chunk in partial_chunks) <= 128_000
assert len(remainder) <= 128_000


def test_render_terminal_log_line_truncates_huge_single_line_output() -> None:
line = "prefix:" + ("x" * 100_000) + ":suffix"

rendered = _render_terminal_log_line(line)

assert rendered.startswith("prefix:")
assert rendered.endswith(":suffix")
assert "full content remains in log.jsonl" in rendered
assert len(rendered) < len(line)


def test_reconcile_session_sets_finished_at_when_existing_value_is_empty(temp_home: Path) -> None:
ensure_home_layout(temp_home)
quest_root = temp_home / "quests" / "q-bash"
service = BashExecService(temp_home)
session_dir = service.session_dir(quest_root, "bash-stale")
session_dir.mkdir(parents=True, exist_ok=True)
write_json(
service.meta_path(quest_root, "bash-stale"),
{
"bash_id": "bash-stale",
"quest_id": "q-bash",
"status": "running",
"kind": "exec",
"command": "printf stale",
"started_at": "2026-04-10T00:00:00+00:00",
"finished_at": None,
"process_pid": 999_999_991,
"monitor_pid": 999_999_992,
},
)

session = service.reconcile_session(quest_root, "bash-stale")

assert session["status"] == "failed"
assert session["finished_at"]
assert read_json(service.meta_path(quest_root, "bash-stale"), {})["finished_at"]


def test_completed_session_compacts_huge_terminal_log_with_backup_reference(temp_home: Path) -> None:
ensure_home_layout(temp_home)
quest_root = temp_home / "quests" / "q-bash"
service = BashExecService(temp_home)
session_dir = service.session_dir(quest_root, "bash-huge-log")
session_dir.mkdir(parents=True, exist_ok=True)
write_json(
service.meta_path(quest_root, "bash-huge-log"),
{
"bash_id": "bash-huge-log",
"quest_id": "q-bash",
"status": "completed",
"kind": "exec",
"command": "cat huge.log",
"started_at": "2026-04-10T00:00:00+00:00",
"finished_at": "2026-04-10T00:00:01+00:00",
},
)
huge_line = "prefix:" + ("x" * 1_300_000) + ":suffix\n"
service.terminal_log_path(quest_root, "bash-huge-log").write_text(huge_line, encoding="utf-8")

session = service.reconcile_session(quest_root, "bash-huge-log")

compaction = session["runtime_log_compaction"]["terminal_log"]
compact_path = quest_root / compaction["compact_path"]
backup_path = quest_root / compaction["backup_path"]
assert compact_path == service.terminal_log_path(quest_root, "bash-huge-log")
assert compact_path.exists()
assert backup_path.exists()
assert backup_path.read_text(encoding="utf-8") == huge_line
compacted_text = compact_path.read_text(encoding="utf-8")
assert "compacted completed runtime log" in compacted_text
assert compact_path.stat().st_size < 600_000
25 changes: 25 additions & 0 deletions tests/test_memory_and_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -4160,6 +4160,31 @@ def test_explorer_search_finds_paths_and_normalizes_legacy_glob_wrappers(temp_ho
assert any(item["path"] == "experiments/analysis/new-slice/scripts/run_probe.py" for item in wrapped_result["items"])


def test_explorer_search_skips_heavy_runtime_mirrors_by_default(temp_home: Path) -> None:
ensure_home_layout(temp_home)
ConfigManager(temp_home).ensure_files()
quest_service = QuestService(temp_home, skill_installer=SkillInstaller(repo_root(), temp_home))
quest = quest_service.create("explorer search runtime hygiene quest")
quest_root = Path(quest["quest_root"])

note_path = quest_root / "notes" / "runtime-note.md"
note_path.parent.mkdir(parents=True, exist_ok=True)
note_path.write_text("needle-runtime-hygiene in user content\n", encoding="utf-8")
terminal_log = quest_root / ".ds" / "bash_exec" / "bash-001" / "terminal.log"
terminal_log.parent.mkdir(parents=True, exist_ok=True)
terminal_log.write_text("needle-runtime-hygiene in heavy runtime mirror\n", encoding="utf-8")
run_prompt = quest_root / ".ds" / "runs" / "run-001" / "prompt.md"
run_prompt.parent.mkdir(parents=True, exist_ok=True)
run_prompt.write_text("needle-runtime-hygiene in runner prompt mirror\n", encoding="utf-8")

result = quest_service.search_files(quest["quest_id"], "needle-runtime-hygiene", limit=10)

paths = {item["path"] for item in result["items"]}
assert "notes/runtime-note.md" in paths
assert ".ds/bash_exec/bash-001/terminal.log" not in paths
assert ".ds/runs/run-001/prompt.md" not in paths


def test_explorer_opens_image_files_as_assets(temp_home: Path) -> None:
ensure_home_layout(temp_home)
ConfigManager(temp_home).ensure_files()
Expand Down