diff --git a/backend/web/models/requests.py b/backend/web/models/requests.py index e54e6aa45..74d01b2f3 100644 --- a/backend/web/models/requests.py +++ b/backend/web/models/requests.py @@ -7,7 +7,7 @@ class CreateThreadRequest(BaseModel): sandbox: str = "local" cwd: str | None = None model: str | None = None - agent: str | None = None + mode: str = "normal" class RunRequest(BaseModel): diff --git a/backend/web/models/thread_config.py b/backend/web/models/thread_config.py index 340e4fb2f..a45f2ead0 100644 --- a/backend/web/models/thread_config.py +++ b/backend/web/models/thread_config.py @@ -11,4 +11,5 @@ class ThreadConfig(BaseModel): model: str | None = None queue_mode: str = "steer" # Deprecated: kept for SQLite schema compat, not read/written observation_provider: str | None = None # "langfuse" | "langsmith" | None - agent: str | None = None # Member name for this thread + thread_mode: str = "normal" # "normal" | "evaluation" + keep_full_trace: bool = False diff --git a/backend/web/monitor.py b/backend/web/monitor.py index cedf32041..03bf30d43 100644 --- a/backend/web/monitor.py +++ b/backend/web/monitor.py @@ -5,19 +5,29 @@ No business logic in frontend. """ +import asyncio import json +import os +import re import sqlite3 +import time +import uuid from datetime import datetime +from pathlib import Path +from subprocess import PIPE -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Query, Request +from pydantic import BaseModel, Field +from backend.web.core.config import DB_PATH from sandbox.db import DEFAULT_DB_PATH router = APIRouter(prefix="/api/monitor") def get_db(): - db = sqlite3.connect(str(DEFAULT_DB_PATH)) + # @@@fastapi-threadpool-sqlite - sync endpoints may execute in worker threads; disable same-thread guard for shared request-scoped connection. + db = sqlite3.connect(str(DEFAULT_DB_PATH), check_same_thread=False) db.row_factory = sqlite3.Row try: yield db @@ -25,6 +35,771 @@ def get_db(): db.close() +class EvaluationCreateRequest(BaseModel): + dataset: str = "SWE-bench/SWE-bench_Lite" + split: str = "test" + start: int = 0 + count: int = Field(default=5, ge=1, le=50) + prompt_profile: str = "heuristic" + model_name: str | None = None + timeout_sec: int = Field(default=180, ge=30, le=3600) + eval_timeout_sec: int = Field(default=10800, ge=300, le=86400) + git_timeout_sec: int = Field(default=90, ge=15, le=600) + recursion_limit: int = Field(default=256, ge=1, le=512) + sandbox: str = "local" + cwd: str = "/home/ubuntu/specops0/Projects/leonai-main" + arm: str = "monitor" + output_dir: str = "artifacts/swebench" + run_eval: bool = True + thread_prefix: str = "swebench" + + +def _ensure_evaluation_tables() -> None: + if not DB_PATH.exists(): + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + with sqlite3.connect(str(DB_PATH)) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS evaluation_jobs ( + evaluation_id TEXT PRIMARY KEY, + dataset TEXT NOT NULL, + split TEXT NOT NULL, + start_idx INTEGER NOT NULL, + slice_count INTEGER NOT NULL, + prompt_profile TEXT NOT NULL, + timeout_sec INTEGER NOT NULL, + recursion_limit INTEGER NOT NULL, + sandbox TEXT NOT NULL, + cwd TEXT, + arm TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'running', + notes TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS evaluation_job_threads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + evaluation_id TEXT NOT NULL, + thread_id TEXT NOT NULL, + run_id TEXT, + start_idx INTEGER NOT NULL, + item_index INTEGER NOT NULL, + created_at TEXT NOT NULL, + UNIQUE(evaluation_id, thread_id), + FOREIGN KEY (evaluation_id) REFERENCES evaluation_jobs(evaluation_id) + ) + """ + ) + conn.execute( + """ + CREATE INDEX IF NOT EXISTS idx_evaluation_job_threads_eval + ON evaluation_job_threads(evaluation_id, item_index) + """ + ) + conn.commit() + + +def _ensure_eval_task_map(app: object) -> dict[str, asyncio.Task]: + tasks = getattr(app.state, "evaluation_tasks", None) + if tasks is None: + tasks = {} + app.state.evaluation_tasks = tasks + return tasks + + +def _resolve_output_dir(cwd: str, output_dir: str) -> Path: + root = Path(output_dir).expanduser() + if not root.is_absolute(): + root = (Path(cwd).expanduser().resolve() / root).resolve() + return root + + +def _build_run_slice_command(payload: EvaluationCreateRequest, evaluation_id: str) -> list[str]: + cmd = [ + "uv", + "run", + "python", + "eval/swebench/run_slice.py", + "--dataset", + payload.dataset, + "--split", + payload.split, + "--start", + str(payload.start), + "--count", + str(payload.count), + "--run-id", + evaluation_id, + "--arm", + payload.arm, + "--prompt-profile", + payload.prompt_profile, + "--timeout-sec", + str(payload.timeout_sec), + "--eval-timeout-sec", + str(payload.eval_timeout_sec), + "--git-timeout-sec", + str(payload.git_timeout_sec), + "--recursion-limit", + str(payload.recursion_limit), + "--output-dir", + payload.output_dir, + "--thread-prefix", + payload.thread_prefix, + ] + if not payload.run_eval: + cmd.append("--no-eval") + if payload.model_name: + cmd.extend(["--model-name", payload.model_name]) + return cmd + + +def _update_evaluation_job_status(evaluation_id: str, status: str, notes: str) -> None: + now = datetime.now().isoformat() + with sqlite3.connect(str(DB_PATH)) as conn: + conn.execute( + "UPDATE evaluation_jobs SET status = ?, notes = ?, updated_at = ? WHERE evaluation_id = ?", + (status, notes, now, evaluation_id), + ) + conn.commit() + + +def _ingest_evaluation_threads( + *, + evaluation_id: str, + thread_prefix: str, + start_idx: int, + run_dir: Path, +) -> int: + ids_path = run_dir / "instance_ids.txt" + if not ids_path.exists(): + return 0 + instance_ids = [line.strip() for line in ids_path.read_text(encoding="utf-8").splitlines() if line.strip()] + now = datetime.now().isoformat() + with sqlite3.connect(str(DB_PATH)) as conn: + conn.execute("DELETE FROM evaluation_job_threads WHERE evaluation_id = ?", (evaluation_id,)) + for idx, instance_id in enumerate(instance_ids): + thread_id = f"{thread_prefix}-{evaluation_id}-{instance_id}" + run = _load_run_stats(thread_id, None) + conn.execute( + """ + INSERT INTO evaluation_job_threads ( + evaluation_id, thread_id, run_id, start_idx, item_index, created_at + ) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + evaluation_id, + thread_id, + run.get("run_id"), + start_idx + idx, + idx, + now, + ), + ) + conn.commit() + return len(instance_ids) + + +async def _run_evaluation_job(evaluation_id: str, payload: EvaluationCreateRequest) -> None: + cwd = str(Path(payload.cwd).expanduser().resolve()) + output_root = _resolve_output_dir(cwd, payload.output_dir) + run_dir = output_root / evaluation_id + run_dir.mkdir(parents=True, exist_ok=True) + stdout_path = run_dir / "monitor_stdout.log" + stderr_path = run_dir / "monitor_stderr.log" + command = _build_run_slice_command(payload, evaluation_id) + # @@@monitor-eval-sandbox-env - pass sandbox selection via env so run_slice -> LeonAgent resolves non-local provider, and isolate sandbox state per evaluation run. + env = dict(os.environ) + env["LEON_SANDBOX"] = payload.sandbox + env["LEON_SANDBOX_DB_PATH"] = str(run_dir / "sandbox.db") + try: + # @@@monitor-eval-direct-runner - evaluate by invoking SWE runner directly, not by sending a control prompt to another agent. + with stdout_path.open("wb") as stdout_fh, stderr_path.open("wb") as stderr_fh: + proc = await asyncio.create_subprocess_exec( + *command, + cwd=cwd, + stdout=stdout_fh, + stderr=stderr_fh, + env=env, + start_new_session=True, + ) + _update_evaluation_job_status( + evaluation_id, + "running", + ( + f"runner=direct pid={proc.pid} sandbox={payload.sandbox} run_dir={run_dir} " + f"stdout_log={stdout_path} stderr_log={stderr_path}" + ), + ) + # @@@monitor-eval-hard-timeout-budget - wall-time must include both solve budget and harness scoring budget for batch runs. + solve_budget_sec = payload.timeout_sec * payload.count + eval_budget_sec = payload.eval_timeout_sec if payload.run_eval else 0 + hard_timeout_sec = solve_budget_sec + eval_budget_sec + 180 + try: + await asyncio.wait_for(proc.wait(), timeout=hard_timeout_sec) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + notes = ( + f"runner=direct timeout={hard_timeout_sec}s solve_budget={solve_budget_sec}s " + f"eval_budget={eval_budget_sec}s sandbox={payload.sandbox} run_dir={run_dir} " + f"stdout_log={stdout_path} stderr_log={stderr_path}" + ) + _update_evaluation_job_status(evaluation_id, "error", notes) + return + if proc.returncode != 0: + notes = ( + f"runner=direct rc={proc.returncode} sandbox={payload.sandbox} run_dir={run_dir} " + f"stdout_log={stdout_path} stderr_log={stderr_path}" + ) + _update_evaluation_job_status(evaluation_id, "error", notes) + return + thread_count = _ingest_evaluation_threads( + evaluation_id=evaluation_id, + thread_prefix=payload.thread_prefix, + start_idx=payload.start, + run_dir=run_dir, + ) + notes = ( + f"runner=direct rc=0 sandbox={payload.sandbox} run_dir={run_dir} stdout_log={stdout_path} " + f"stderr_log={stderr_path} threads={thread_count}" + ) + score = _load_evaluation_score( + evaluation_id=evaluation_id, + cwd=payload.cwd, + notes=notes, + ) + final_status = _derive_evaluation_status("completed", score) + _update_evaluation_job_status(evaluation_id, final_status, notes) + except Exception as exc: + notes = ( + f"runner=direct error={exc} sandbox={payload.sandbox} run_dir={run_dir} " + f"stdout_log={stdout_path} stderr_log={stderr_path}" + ) + _update_evaluation_job_status(evaluation_id, "error", notes) + + +def _load_latest_session(db: sqlite3.Connection, thread_id: str) -> sqlite3.Row | None: + return db.execute( + """ + SELECT chat_session_id, status, started_at, last_active_at + FROM chat_sessions + WHERE thread_id = ? + ORDER BY started_at DESC + LIMIT 1 + """, + (thread_id,), + ).fetchone() + + +def _load_run_stats(thread_id: str, run_id: str | None) -> dict: + with sqlite3.connect(str(DB_PATH)) as conn: + conn.row_factory = sqlite3.Row + if run_id: + row = conn.execute( + """ + SELECT run_id, COUNT(*) AS event_count, MAX(seq) AS last_seq, MAX(created_at) AS last_event_at + FROM run_events + WHERE thread_id = ? AND run_id = ? + GROUP BY run_id + """, + (thread_id, run_id), + ).fetchone() + if row: + return { + "run_id": row["run_id"], + "event_count": int(row["event_count"] or 0), + "last_seq": int(row["last_seq"] or 0), + "last_event_at": row["last_event_at"], + "last_event_ago": format_time_ago(row["last_event_at"]) if row["last_event_at"] else None, + } + row = conn.execute( + """ + SELECT run_id, COUNT(*) AS event_count, MAX(seq) AS last_seq, MAX(created_at) AS last_event_at + FROM run_events + WHERE thread_id = ? + GROUP BY run_id + ORDER BY last_seq DESC + LIMIT 1 + """, + (thread_id,), + ).fetchone() + if not row: + return {"run_id": run_id, "event_count": 0, "last_seq": 0, "last_event_at": None, "last_event_ago": None} + return { + "run_id": row["run_id"], + "event_count": int(row["event_count"] or 0), + "last_seq": int(row["last_seq"] or 0), + "last_event_at": row["last_event_at"], + "last_event_ago": format_time_ago(row["last_event_at"]) if row["last_event_at"] else None, + } + + +def _read_json_file(path: Path | None) -> dict | None: + if not path or not path.exists(): + return None + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return None + + +def _read_jsonl_rows(path: Path | None) -> list[dict]: + if not path or not path.exists(): + return [] + rows: list[dict] = [] + try: + with path.open("r", encoding="utf-8") as fh: + for line in fh: + text = line.strip() + if not text: + continue + obj = json.loads(text) + if isinstance(obj, dict): + rows.append(obj) + except Exception: + return [] + return rows + + +def _note_value(notes: str, key: str) -> str | None: + prefix = f"{key}=" + for token in (notes or "").split(): + if token.startswith(prefix): + return token[len(prefix):] + return None + + +def _resolve_eval_run_dir(evaluation_id: str, cwd: str | None, notes: str) -> Path | None: + candidates: list[Path] = [] + note_run_dir = _note_value(notes, "run_dir") + if note_run_dir: + candidates.append(Path(note_run_dir).expanduser()) + if cwd: + candidates.append((Path(cwd).expanduser().resolve() / "artifacts" / "swebench" / evaluation_id).resolve()) + + for run_dir in candidates: + if (run_dir / "run_manifest.json").exists(): + return run_dir + for run_dir in candidates: + if run_dir.exists(): + return run_dir + return None + + +def _infer_sandbox_from_run_id(run_id: str, fallback: str | None = None) -> str: + value = run_id.lower() + if "docker" in value: + return "docker" + if "daytona" in value: + return "daytona" + if "local" in value: + return "local" + return fallback or "local" + + +def _iter_artifact_run_dirs(cwd_candidates: list[str], max_dirs: int = 500) -> list[Path]: + run_dirs: list[Path] = [] + seen: set[str] = set() + for cwd in cwd_candidates: + if not cwd: + continue + root = (Path(cwd).expanduser().resolve() / "artifacts" / "swebench").resolve() + if not root.exists(): + continue + for item in sorted(root.glob("eval-*"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True): + manifest_path = item / "run_manifest.json" + if not item.is_dir() or not manifest_path.exists(): + continue + key = str(item) + if key in seen: + continue + seen.add(key) + run_dirs.append(item) + if len(run_dirs) >= max_dirs: + return run_dirs + return run_dirs + + +def _backfill_evaluations_from_artifacts(app: object | None, base_cwd: str = "/home/ubuntu/specops0/Projects/leonai-main") -> int: + # @@@eval-artifact-backfill-throttle - list endpoint polls every 2.5s; throttle filesystem backfill scan to keep monitor responsive. + now = time.time() + if app is not None: + last_ts = float(getattr(app.state, "eval_artifact_backfill_ts", 0.0) or 0.0) + if now - last_ts < 20.0: + return 0 + + _ensure_evaluation_tables() + inserted = 0 + with sqlite3.connect(str(DB_PATH)) as conn: + conn.row_factory = sqlite3.Row + existing_ids = {str(row["evaluation_id"]) for row in conn.execute("SELECT evaluation_id FROM evaluation_jobs").fetchall()} + cwd_rows = conn.execute("SELECT DISTINCT cwd FROM evaluation_jobs WHERE cwd IS NOT NULL AND cwd != ''").fetchall() + cwd_candidates = [base_cwd] + [str(row["cwd"]) for row in cwd_rows if row["cwd"]] + run_dirs = _iter_artifact_run_dirs(cwd_candidates) + for run_dir in run_dirs: + manifest = _read_json_file(run_dir / "run_manifest.json") or {} + evaluation_id = str(manifest.get("run_id") or run_dir.name) + if not evaluation_id.startswith("eval-"): + continue + if evaluation_id in existing_ids: + continue + + created_at = str(manifest.get("generated_at_utc") or datetime.now().isoformat()) + dataset = str(manifest.get("dataset") or "SWE-bench/SWE-bench_Lite") + split = str(manifest.get("split") or "test") + start_idx = int(manifest.get("start") or 0) + slice_count = int(manifest.get("count") or 0) + prompt_profile = str(manifest.get("prompt_profile") or "heuristic") + timeout_sec = int(manifest.get("timeout_sec") or 180) + recursion_limit = int(manifest.get("recursion_limit") or 256) + sandbox = _infer_sandbox_from_run_id(evaluation_id, fallback=manifest.get("sandbox")) + cwd = str(run_dir.parents[2]) if len(run_dir.parents) >= 3 else base_cwd + arm = str(manifest.get("arm") or "artifact_backfill") + status = "error" if str(manifest.get("eval_error") or "").strip() else "completed" + notes = f"runner=artifact_backfill run_dir={run_dir}" + conn.execute( + """ + INSERT INTO evaluation_jobs ( + evaluation_id, dataset, split, start_idx, slice_count, prompt_profile, + timeout_sec, recursion_limit, sandbox, cwd, arm, status, notes, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + evaluation_id, + dataset, + split, + start_idx, + slice_count, + prompt_profile, + timeout_sec, + recursion_limit, + sandbox, + cwd, + arm, + status, + notes, + created_at, + created_at, + ), + ) + + trace_path = Path(str(manifest.get("trace_summaries_path") or (run_dir / "trace_summaries.jsonl"))).expanduser() + trace_rows = _read_jsonl_rows(trace_path) + if trace_rows: + for idx, row in enumerate(trace_rows): + instance_id = str(row.get("instance_id") or f"item-{idx}") + thread_id = str(row.get("thread_id") or f"swebench-{evaluation_id}-{instance_id}") + conn.execute( + """ + INSERT OR IGNORE INTO evaluation_job_threads ( + evaluation_id, thread_id, run_id, start_idx, item_index, created_at + ) VALUES (?, ?, ?, ?, ?, ?) + """, + ( + evaluation_id, + thread_id, + evaluation_id, + start_idx + idx, + idx, + created_at, + ), + ) + inserted += 1 + existing_ids.add(evaluation_id) + conn.commit() + + if app is not None: + app.state.eval_artifact_backfill_ts = now + app.state.eval_artifact_backfill_inserted = int(getattr(app.state, "eval_artifact_backfill_inserted", 0) or 0) + inserted + return inserted + + +def _pct(numerator: int, denominator: int) -> float | None: + if denominator <= 0: + return None + return round((numerator / denominator) * 100.0, 2) + + +def _derive_evaluation_status(status: str, score: dict | None) -> str: + if status == "running": + return status + if not score: + return status + if str(score.get("manifest_eval_error") or "").strip(): + return "provisional" + if not bool(score.get("scored")): + return "provisional" + return "completed_with_errors" if int(score.get("error_instances") or 0) > 0 else "completed" + + +def _count_live_eval_threads(evaluation_id: str) -> int: + if not DB_PATH.exists(): + return 0 + thread_prefix = f"swebench-{evaluation_id}-%" + with sqlite3.connect(str(DB_PATH)) as conn: + row = conn.execute( + "SELECT COUNT(DISTINCT thread_id) FROM checkpoints WHERE thread_id LIKE ?", + (thread_prefix,), + ).fetchone() + return int(row[0] or 0) if row else 0 + + +def _load_live_eval_session_progress(evaluation_id: str, cwd: str | None, notes: str) -> dict | None: + run_dir = _resolve_eval_run_dir(evaluation_id, cwd, notes) + if not run_dir: + return None + trace_db = run_dir / "sandbox.db" + if not trace_db.exists(): + return None + thread_prefix = f"swebench-{evaluation_id}-%" + try: + with sqlite3.connect(str(trace_db)) as conn: + conn.row_factory = sqlite3.Row + row = conn.execute( + """ + SELECT + COUNT(*) AS total, + SUM(CASE WHEN status = 'active' THEN 1 ELSE 0 END) AS running, + SUM(CASE WHEN status != 'active' THEN 1 ELSE 0 END) AS done, + MAX(idle_ttl_sec) AS idle_ttl_sec, + ROUND((julianday('now') - julianday(MAX(last_active_at))) * 24 * 60, 1) AS idle_minutes + FROM chat_sessions + WHERE thread_id LIKE ? + """, + (thread_prefix,), + ).fetchone() + except sqlite3.OperationalError: + # @@@eval-session-table-warmup - sandbox.db may exist before chat_sessions table initialization; treat as no live session data. + return None + if not row: + return None + total = int(row["total"] or 0) + running = int(row["running"] or 0) + done = int(row["done"] or 0) + idle_ttl_sec = int(row["idle_ttl_sec"] or 300) + idle_minutes = float(row["idle_minutes"]) if row["idle_minutes"] is not None else None + if total <= 0: + return None + # @@@eval-progress-live-session - when thread mapping rows are not persisted yet, use per-run sandbox session states for true running/done counts. + # @@@eval-running-freshness - treat stale "active" sessions as non-running to avoid fake-running UI after runner exits unexpectedly. + stale_after_minutes = max(2.0, (idle_ttl_sec / 60.0) + 1.0) + active_recent = bool(running > 0 and idle_minutes is not None and idle_minutes <= stale_after_minutes) + running_effective = running if active_recent else 0 + done_effective = done if active_recent else min(total, done + running) + return { + "total": total, + "running": max(0, running_effective), + "done": max(0, done_effective), + "idle_minutes": idle_minutes, + "idle_ttl_sec": idle_ttl_sec, + "stale_after_minutes": stale_after_minutes, + "active_recent": active_recent, + } + + +def _load_live_eval_sessions(evaluation_id: str, cwd: str | None, notes: str) -> list[dict]: + run_dir = _resolve_eval_run_dir(evaluation_id, cwd, notes) + if not run_dir: + return [] + trace_db = run_dir / "sandbox.db" + if not trace_db.exists(): + return [] + thread_prefix = f"swebench-{evaluation_id}-%" + try: + with sqlite3.connect(str(trace_db)) as conn: + conn.row_factory = sqlite3.Row + rows = conn.execute( + """ + SELECT thread_id, chat_session_id, status, started_at, last_active_at, ended_at, close_reason + FROM chat_sessions + WHERE thread_id LIKE ? + ORDER BY started_at ASC + """, + (thread_prefix,), + ).fetchall() + except sqlite3.OperationalError: + return [] + sessions: list[dict] = [] + for row in rows: + sessions.append( + { + "thread_id": str(row["thread_id"]), + "chat_session_id": str(row["chat_session_id"]), + "status": str(row["status"] or "active"), + "started_at": row["started_at"], + "last_active_at": row["last_active_at"], + "ended_at": row["ended_at"], + "close_reason": row["close_reason"], + } + ) + return sessions + + +def _is_eval_runner_alive(evaluation_id: str, notes: str) -> bool: + # @@@eval-runner-pid-liveness - after backend restart, task map is empty; use persisted runner pid as direct liveness source before session rows appear. + m = re.search(r"\bpid=(\d+)\b", notes or "") + if not m: + return False + pid = int(m.group(1)) + proc_dir = Path(f"/proc/{pid}") + if not proc_dir.exists(): + return False + try: + cmdline = (proc_dir / "cmdline").read_text(encoding="utf-8", errors="ignore") + except Exception: + return False + if "run_slice.py" not in cmdline: + return False + if evaluation_id and evaluation_id not in cmdline: + return False + return True + + +def _load_evaluation_score(evaluation_id: str, cwd: str | None, notes: str) -> dict: + run_dir = _resolve_eval_run_dir(evaluation_id, cwd, notes) + manifest_path = (run_dir / "run_manifest.json") if run_dir else None + manifest = _read_json_file(manifest_path) or {} + + summary_path: Path | None = None + if manifest.get("eval_summary_path"): + summary_path = Path(str(manifest["eval_summary_path"])).expanduser() + elif cwd: + candidate = Path(cwd).expanduser().resolve() / f"leonai-main.{evaluation_id}.json" + if candidate.exists(): + summary_path = candidate + + summary = _read_json_file(summary_path) or {} + trace_summaries_path: Path | None = None + if manifest.get("trace_summaries_path"): + trace_summaries_path = Path(str(manifest["trace_summaries_path"])).expanduser() + trace_rows = _read_jsonl_rows(trace_summaries_path) + + manifest_total = int(manifest.get("instances_total") or 0) + summary_total = int(summary.get("total_instances") or 0) + submitted_instances = int(summary.get("submitted_instances") or 0) + completed_instances = int(summary.get("completed_instances") or 0) + resolved_instances = int(summary.get("resolved_instances") or 0) + unresolved_instances = int(summary.get("unresolved_instances") or 0) + empty_patch_instances = int(summary.get("empty_patch_instances") or manifest.get("empty_patch_total") or 0) + error_instances = int(summary.get("error_instances") or manifest.get("errors_total") or 0) + + total_instances = manifest_total or summary_total + if total_instances <= 0: + total_instances = max(summary_total, submitted_instances, completed_instances, resolved_instances + unresolved_instances) + if submitted_instances > total_instances: + total_instances = submitted_instances + if completed_instances > total_instances: + total_instances = completed_instances + + patch_base = submitted_instances or total_instances + non_empty_patch_instances = max(patch_base - empty_patch_instances, 0) + + active_trace_threads = 0 + tool_call_threads = 0 + tool_calls_total = 0 + for row in trace_rows: + tool_calls = int(row.get("tool_calls_total") or 0) + checkpoints = int(row.get("checkpoint_count") or 0) + messages = int(row.get("message_count") or 0) + if checkpoints > 0 or messages > 0: + active_trace_threads += 1 + if tool_calls > 0: + tool_call_threads += 1 + tool_calls_total += tool_calls + avg_tool_calls_per_active_thread = round(tool_calls_total / active_trace_threads, 2) if active_trace_threads > 0 else None + + recursion_limit = int(manifest.get("recursion_limit") or 0) + recursion_cap_hits = 0 + if recursion_limit > 0: + recursion_cap_hits = sum(1 for row in trace_rows if int(row.get("last_step") or 0) >= recursion_limit) + + # @@@eval-score-source - score must come from persisted run artifacts instead of in-memory thread counters so reload stays consistent. + score_gate = "final" if bool(summary_path and summary) and not str(manifest.get("eval_error") or "").strip() else "provisional" + publishable = score_gate == "final" + + return { + "scored": bool(summary_path and summary), + "score_gate": score_gate, + "publishable": publishable, + "manifest_eval_error": str(manifest.get("eval_error") or "").strip(), + "run_dir": str(run_dir) if run_dir else None, + "manifest_path": str(manifest_path) if manifest_path else None, + "eval_summary_path": str(summary_path) if summary_path else None, + "trace_summaries_path": str(trace_summaries_path) if trace_summaries_path else None, + "total_instances": total_instances, + "submitted_instances": submitted_instances, + "completed_instances": completed_instances, + "resolved_instances": resolved_instances, + "unresolved_instances": unresolved_instances, + "non_empty_patch_instances": non_empty_patch_instances, + "empty_patch_instances": empty_patch_instances, + "error_instances": error_instances, + "primary_score_pct": _pct(resolved_instances, total_instances), + "completed_rate_pct": _pct(completed_instances, total_instances), + "resolved_rate_pct": _pct(resolved_instances, total_instances), + "non_empty_patch_rate_pct": _pct(non_empty_patch_instances, total_instances), + "empty_patch_rate_pct": _pct(empty_patch_instances, total_instances), + "active_trace_threads": active_trace_threads, + "active_trace_thread_rate_pct": _pct(active_trace_threads, total_instances), + "tool_call_threads": tool_call_threads, + "tool_call_thread_rate_pct": _pct(tool_call_threads, total_instances), + "tool_calls_total": tool_calls_total, + "avg_tool_calls_per_active_thread": avg_tool_calls_per_active_thread, + "recursion_limit": recursion_limit or None, + "recursion_cap_hits": recursion_cap_hits, + "recursion_cap_hit_rate_pct": _pct(recursion_cap_hits, active_trace_threads), + } + + +def _backfill_eval_threads_from_score( + conn: sqlite3.Connection, + *, + evaluation_id: str, + start_idx: int, + created_at: str | None, + score: dict | None, +) -> int: + if not score: + return 0 + trace_path_value = score.get("trace_summaries_path") + if not trace_path_value: + return 0 + trace_path = Path(str(trace_path_value)).expanduser() + trace_rows = _read_jsonl_rows(trace_path) + if not trace_rows: + return 0 + + ts = created_at or datetime.now().isoformat() + inserted = 0 + for idx, row in enumerate(trace_rows): + instance_id = str(row.get("instance_id") or f"item-{idx}") + thread_id = str(row.get("thread_id") or f"swebench-{evaluation_id}-{instance_id}") + cur = conn.execute( + """ + INSERT OR IGNORE INTO evaluation_job_threads ( + evaluation_id, thread_id, run_id, start_idx, item_index, created_at + ) VALUES (?, ?, ?, ?, ?, ?) + """, + ( + evaluation_id, + thread_id, + evaluation_id, + start_idx + idx, + idx, + ts, + ), + ) + if int(cur.rowcount or 0) > 0: + inserted += 1 + return inserted + + def format_time_ago(iso_timestamp: str) -> str: """Convert ISO timestamp to human readable 'X hours ago'""" if not iso_timestamp: @@ -64,8 +839,269 @@ def make_badge(desired, observed): } +def load_thread_mode_map(thread_ids: list[str]) -> dict[str, dict]: + """Load thread mode metadata from thread_config.""" + if not thread_ids or not DB_PATH.exists(): + return {} + with sqlite3.connect(str(DB_PATH)) as conn: + conn.row_factory = sqlite3.Row + placeholders = ",".join("?" for _ in thread_ids) + rows = conn.execute( + f""" + SELECT thread_id, thread_mode, keep_full_trace + FROM thread_config + WHERE thread_id IN ({placeholders}) + """, + thread_ids, + ).fetchall() + mode_map = {} + for row in rows: + mode_map[row["thread_id"]] = { + "thread_mode": row["thread_mode"] or "normal", + "keep_full_trace": str(row["keep_full_trace"] or "0") in {"1", "true", "True"}, + } + return mode_map + + +def load_thread_mode(thread_id: str) -> dict: + """Load single thread mode metadata.""" + mode_map = load_thread_mode_map([thread_id]) + return mode_map.get(thread_id, {"thread_mode": "normal", "keep_full_trace": False}) + + +def load_run_candidates(thread_id: str, limit: int = 20) -> list[dict]: + """List recent run_ids for a thread with basic stats.""" + run_db_path = Path.home() / ".leon" / "leon.db" + if not run_db_path.exists(): + return [] + # @@@run-candidates - Keep selector data lightweight so session page can switch run trace quickly. + with sqlite3.connect(str(run_db_path)) as conn: + conn.row_factory = sqlite3.Row + rows = conn.execute( + """ + SELECT + run_id, + COUNT(*) AS event_count, + MIN(seq) AS first_seq, + MAX(seq) AS last_seq, + MIN(created_at) AS started_at, + MAX(created_at) AS ended_at + FROM run_events + WHERE thread_id = ? + GROUP BY run_id + ORDER BY MAX(seq) DESC + LIMIT ? + """, + (thread_id, limit), + ).fetchall() + return [ + { + "run_id": row["run_id"], + "event_count": int(row["event_count"] or 0), + "first_seq": int(row["first_seq"] or 0), + "last_seq": int(row["last_seq"] or 0), + "started_at": row["started_at"], + "started_ago": format_time_ago(row["started_at"]) if row["started_at"] else None, + "ended_at": row["ended_at"], + "ended_ago": format_time_ago(row["ended_at"]) if row["ended_at"] else None, + } + for row in rows + ] + + +def _msg_text(content: object) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + texts: list[str] = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + texts.append(str(block.get("text", ""))) + return "".join(texts) + return str(content or "") + + +def _load_checkpoint_events(thread_id: str, limit: int) -> tuple[list[dict], dict[str, int]]: + with sqlite3.connect(str(DB_PATH)) as conn: + row = conn.execute( + "SELECT checkpoint FROM checkpoints WHERE thread_id=? ORDER BY rowid DESC LIMIT 1", + (thread_id,), + ).fetchone() + if not row: + return [], {} + + from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer + + checkpoint_blob = row[0] + serde = JsonPlusSerializer() + checkpoint = serde.loads_typed(("msgpack", checkpoint_blob)) + messages = checkpoint.get("channel_values", {}).get("messages", []) + + call_name_by_id: dict[str, str] = {} + events: list[dict] = [] + counts: dict[str, int] = {} + seq = 1 + for msg in messages: + cls = msg.__class__.__name__ + if cls == "AIMessage": + text = _msg_text(getattr(msg, "content", "")) + if text.strip(): + payload = {"content": text, "_seq": seq, "_run_id": "checkpoint"} + events.append( + { + "seq": seq, + "event_type": "text", + "payload": payload, + "message_id": None, + "created_at": None, + "created_ago": None, + } + ) + counts["text"] = counts.get("text", 0) + 1 + seq += 1 + for call in getattr(msg, "tool_calls", None) or []: + call_id = str(call.get("id", "")) + name = str(call.get("name", "tool")) + if call_id: + call_name_by_id[call_id] = name + payload = {"id": call_id, "name": name, "args": call.get("args", {}), "_seq": seq, "_run_id": "checkpoint"} + events.append( + { + "seq": seq, + "event_type": "tool_call", + "payload": payload, + "message_id": None, + "created_at": None, + "created_ago": None, + } + ) + counts["tool_call"] = counts.get("tool_call", 0) + 1 + seq += 1 + elif cls == "ToolMessage": + tool_call_id = str(getattr(msg, "tool_call_id", "") or "") + name = call_name_by_id.get(tool_call_id, "tool") + payload = { + "tool_call_id": tool_call_id, + "name": name, + "content": _msg_text(getattr(msg, "content", "")), + "_seq": seq, + "_run_id": "checkpoint", + } + events.append( + { + "seq": seq, + "event_type": "tool_result", + "payload": payload, + "message_id": None, + "created_at": None, + "created_ago": None, + } + ) + counts["tool_result"] = counts.get("tool_result", 0) + 1 + seq += 1 + # @@@checkpoint-trace-fallback - convert latest checkpoint messages into event-like rows so thread trace still renders when run_events are absent. + if limit > 0: + events = events[-limit:] + return events, counts + + +def load_thread_trace_payload(thread_id: str, run_id: str | None = None, limit: int = 2000) -> dict: + """Load persisted trace bound to thread/run (not session).""" + run_candidates = load_run_candidates(thread_id, limit=50) + if not run_id: + run_id = run_candidates[0]["run_id"] if run_candidates else None + + if run_id == "checkpoint": + checkpoint_events, checkpoint_counts = _load_checkpoint_events(thread_id, limit) + return { + "thread_id": thread_id, + "run_id": "checkpoint", + "run_candidates": [], + "event_count": len(checkpoint_events), + "events": checkpoint_events, + "event_type_counts": checkpoint_counts, + } + + if not run_id: + checkpoint_events, checkpoint_counts = _load_checkpoint_events(thread_id, limit) + if checkpoint_events: + return { + "thread_id": thread_id, + "run_id": "checkpoint", + "run_candidates": [], + "event_count": len(checkpoint_events), + "events": checkpoint_events, + "event_type_counts": checkpoint_counts, + } + return { + "thread_id": thread_id, + "run_id": None, + "run_candidates": run_candidates, + "event_count": 0, + "events": [], + "event_type_counts": {}, + } + + run_db_path = Path.home() / ".leon" / "leon.db" + if not run_db_path.exists(): + raise HTTPException(status_code=404, detail="Trace database not found") + + with sqlite3.connect(str(run_db_path)) as conn: + conn.row_factory = sqlite3.Row + rows = conn.execute( + """ + SELECT seq, event_type, data, message_id, created_at + FROM run_events + WHERE thread_id = ? AND run_id = ? + ORDER BY seq ASC + LIMIT ? + """, + (thread_id, run_id, limit), + ).fetchall() + + events: list[dict] = [] + event_type_counts: dict[str, int] = {} + for row in rows: + event_type = row["event_type"] + try: + payload = json.loads(row["data"]) if row["data"] else {} + except json.JSONDecodeError: + payload = {"raw": row["data"]} + event_type_counts[event_type] = event_type_counts.get(event_type, 0) + 1 + events.append( + { + "seq": int(row["seq"]), + "event_type": event_type, + "payload": payload, + "message_id": row["message_id"], + "created_at": row["created_at"], + "created_ago": format_time_ago(row["created_at"]) if row["created_at"] else None, + } + ) + + return { + "thread_id": thread_id, + "run_id": run_id, + "run_candidates": run_candidates, + "event_count": len(events), + "events": events, + "event_type_counts": event_type_counts, + } + + @router.get("/threads") -def list_threads(db: sqlite3.Connection = Depends(get_db)): +def list_threads( + offset: int = Query(default=0, ge=0), + limit: int = Query(default=50, ge=1, le=200), + db: sqlite3.Connection = Depends(get_db), +): + total_row = db.execute( + """ + SELECT COUNT(DISTINCT thread_id) AS total_threads + FROM chat_sessions + """ + ).fetchone() + total = int(total_row["total_threads"] if total_row else 0) rows = db.execute(""" SELECT cs.thread_id, @@ -80,15 +1116,21 @@ def list_threads(db: sqlite3.Connection = Depends(get_db)): LEFT JOIN sandbox_leases sl ON cs.lease_id = sl.lease_id GROUP BY cs.thread_id ORDER BY MAX(cs.last_active_at) DESC - """).fetchall() + LIMIT ? OFFSET ? + """, (limit, offset)).fetchall() + # @@@threads-pagination-mode-map - only load mode metadata for current page to keep list endpoint lightweight on large thread sets. + mode_map = load_thread_mode_map([row["thread_id"] for row in rows if row["thread_id"]]) items = [] for row in rows: badge = make_badge(row["desired_state"], row["observed_state"]) + mode_info = mode_map.get(row["thread_id"], {"thread_mode": "normal", "keep_full_trace": False}) items.append( { "thread_id": row["thread_id"], "thread_url": f"/thread/{row['thread_id']}", + "thread_mode": mode_info["thread_mode"], + "keep_full_trace": mode_info["keep_full_trace"], "session_count": row["session_count"], "last_active": row["last_active"], "last_active_ago": format_time_ago(row["last_active"]), @@ -102,7 +1144,22 @@ def list_threads(db: sqlite3.Connection = Depends(get_db)): } ) - return {"title": "All Threads", "count": len(items), "items": items} + page = (offset // limit) + 1 + return { + "title": "All Threads", + "count": len(items), + "items": items, + "pagination": { + "offset": offset, + "limit": limit, + "total": total, + "page": page, + "has_prev": offset > 0, + "has_next": (offset + len(items)) < total, + "prev_offset": max(offset - limit, 0) if offset > 0 else None, + "next_offset": (offset + limit) if (offset + len(items)) < total else None, + }, + } @router.get("/thread/{thread_id}") @@ -157,8 +1214,11 @@ def get_thread(thread_id: str, db: sqlite3.Connection = Depends(get_db)): } ) + mode_info = load_thread_mode(thread_id) return { "thread_id": thread_id, + "thread_mode": mode_info["thread_mode"], + "keep_full_trace": mode_info["keep_full_trace"], "breadcrumb": [ {"label": "Threads", "url": "/threads"}, {"label": thread_id[:8], "url": f"/thread/{thread_id}"}, @@ -171,6 +1231,498 @@ def get_thread(thread_id: str, db: sqlite3.Connection = Depends(get_db)): } +@router.post("/evaluations") +async def create_evaluation(payload: EvaluationCreateRequest, request: Request): + """Create one evaluation job and run SWE-bench slice in backend runner.""" + _ensure_evaluation_tables() + app = request.app + now = datetime.now().isoformat() + evaluation_id = f"eval-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:6]}" + with sqlite3.connect(str(DB_PATH)) as conn: + conn.execute( + """ + INSERT INTO evaluation_jobs ( + evaluation_id, dataset, split, start_idx, slice_count, prompt_profile, + timeout_sec, recursion_limit, sandbox, cwd, arm, status, notes, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running', ?, ?, ?) + """, + ( + evaluation_id, + payload.dataset, + payload.split, + payload.start, + payload.count, + payload.prompt_profile, + payload.timeout_sec, + payload.recursion_limit, + payload.sandbox, + payload.cwd, + payload.arm, + "runner=direct (backend subprocess)", + now, + now, + ), + ) + conn.commit() + + tasks = _ensure_eval_task_map(app) + task = asyncio.create_task(_run_evaluation_job(evaluation_id, payload)) + tasks[evaluation_id] = task + + def _cleanup_task(done_task: asyncio.Task) -> None: + task_map = _ensure_eval_task_map(app) + task_map.pop(evaluation_id, None) + _ = done_task + + task.add_done_callback(_cleanup_task) + + return { + "evaluation_id": evaluation_id, + "status": "running", + "count": payload.count, + "dataset": payload.dataset, + "split": payload.split, + "start": payload.start, + "runner": "backend_subprocess", + "threads": [], + } + + +@router.get("/evaluations") +def list_evaluations( + limit: int = Query(default=30, ge=1, le=200), + offset: int = Query(default=0, ge=0), + request: Request = None, +): + _ensure_evaluation_tables() + _backfill_evaluations_from_artifacts(request.app if request else None) + running_jobs = set() + pending_status_updates: dict[str, tuple[str, str]] = {} + if request: + tasks = _ensure_eval_task_map(request.app) + running_jobs = {evaluation_id for evaluation_id, task in tasks.items() if not task.done()} + with sqlite3.connect(str(DB_PATH)) as conn: + conn.row_factory = sqlite3.Row + total_jobs = int(conn.execute("SELECT COUNT(*) AS n FROM evaluation_jobs").fetchone()["n"]) + jobs = conn.execute( + """ + SELECT evaluation_id, dataset, split, start_idx, slice_count, prompt_profile, timeout_sec, + recursion_limit, sandbox, cwd, arm, status, notes, created_at, updated_at + FROM evaluation_jobs + ORDER BY created_at DESC + LIMIT ? OFFSET ? + """, + (limit, offset), + ).fetchall() + items = [] + for row in jobs: + notes = row["notes"] or "" + status = str(row["status"] or "pending") + # @@@monitor-eval-orphan-reconcile - if backend restarted and task map no longer tracks a running job, mark it error to avoid permanent fake-running rows. + if status == "running" and row["evaluation_id"] not in running_jobs: + if _is_eval_runner_alive(str(row["evaluation_id"]), notes): + if "runner_lost_pid_alive:" not in notes: + notes = f"{notes} | runner_lost_pid_alive: runner process still alive".strip(" |") + pending_status_updates[str(row["evaluation_id"])] = ("running", notes) + status = "running" + else: + if "runner_lost:" not in notes: + notes = f"{notes} | runner_lost: task not active after restart".strip(" |") + pending_status_updates[str(row["evaluation_id"])] = ("error", notes) + status = "error" + + score = _load_evaluation_score( + evaluation_id=str(row["evaluation_id"]), + cwd=row["cwd"], + notes=notes, + ) + # @@@eval-status-recover-pid - historical rows may already be marked error after backend restart; + # if score is still pending and runner pid is still alive, recover status back to running. + if status == "error" and not bool(score.get("scored")): + if _is_eval_runner_alive(str(row["evaluation_id"]), notes): + if "runner_recovered_pid_alive:" not in notes: + notes = f"{notes} | runner_recovered_pid_alive: runner process still alive".strip(" |") + pending_status_updates[str(row["evaluation_id"])] = ("running", notes) + status = "running" + inserted = _backfill_eval_threads_from_score( + conn, + evaluation_id=str(row["evaluation_id"]), + start_idx=int(row["start_idx"] or 0), + created_at=row["created_at"], + score=score, + ) + if inserted > 0: + conn.commit() + + threads = conn.execute( + """ + SELECT thread_id + FROM evaluation_job_threads + WHERE evaluation_id = ? + """, + (row["evaluation_id"],), + ).fetchall() + mapped_threads = len(threads) + threads_total = mapped_threads + if row["evaluation_id"] in running_jobs: + status = "running" + running_count = threads_total if status == "running" else 0 + threads_done = max(threads_total - running_count, 0) + threads_started = running_count + live_session_progress = _load_live_eval_session_progress(str(row["evaluation_id"]), row["cwd"], notes) + if status == "running": + # @@@eval-live-progress-from-checkpoints - thread rows are ingested after runner exits; use live checkpoint thread ids for in-flight progress. + running_count = max(running_count, _count_live_eval_threads(str(row["evaluation_id"]))) + threads_total = max(threads_total, running_count) + if live_session_progress: + threads_total = max(threads_total, int(live_session_progress["total"])) + running_count = max(0, min(threads_total, int(live_session_progress["running"]))) + threads_done = max(0, min(threads_total, int(live_session_progress["done"]))) + threads_started = max(0, min(threads_total, threads_done + running_count)) + else: + threads_done = max(threads_total - running_count, 0) + threads_started = running_count + elif threads_total == 0 and int(score.get("active_trace_threads") or 0) > 0: + threads_total = int(score.get("active_trace_threads") or 0) + threads_done = max(threads_total - running_count, 0) + threads_started = running_count + # @@@eval-progress-source - while running, monitor may only have checkpoint-derived started thread count + # (no persisted thread rows yet), so "running" is an estimate and should be labeled accordingly in UI. + progress_source = "thread_rows" + if status == "running" and mapped_threads == 0: + progress_source = "session_rows" if live_session_progress else "checkpoint_estimate" + status = _derive_evaluation_status(status, score) + if status != str(row["status"] or "pending"): + pending_status_updates[str(row["evaluation_id"])] = (status, notes) + items.append( + { + "evaluation_id": row["evaluation_id"], + "evaluation_url": f"/evaluation/{row['evaluation_id']}", + "dataset": row["dataset"], + "split": row["split"], + "start_idx": int(row["start_idx"] or 0), + "slice_count": int(row["slice_count"] or 0), + "prompt_profile": row["prompt_profile"], + "timeout_sec": int(row["timeout_sec"] or 0), + "recursion_limit": int(row["recursion_limit"] or 0), + "status": status, + "sandbox": row["sandbox"], + "threads_total": threads_total, + "threads_running": running_count, + "threads_done": threads_done, + "threads_started": threads_started, + "progress_source": progress_source, + "notes": notes, + "score": score, + "created_at": row["created_at"], + "created_ago": format_time_ago(row["created_at"]) if row["created_at"] else None, + "updated_at": row["updated_at"], + "updated_ago": format_time_ago(row["updated_at"]) if row["updated_at"] else None, + } + ) + for evaluation_id, (status, notes) in pending_status_updates.items(): + try: + _update_evaluation_job_status(evaluation_id, status, notes) + except sqlite3.OperationalError as exc: + # @@@eval-status-update-lock - avoid surfacing sqlite lock as 500 in list API; keep response serving and retry next poll. + print(f"[monitor] status update skipped due to sqlite lock: evaluation_id={evaluation_id} error={exc}", flush=True) + page = (offset // limit) + 1 + return { + "title": "Evaluations", + "count": len(items), + "total": total_jobs, + "items": items, + "pagination": { + "offset": offset, + "limit": limit, + "total": total_jobs, + "page": page, + "has_prev": offset > 0, + "has_next": (offset + len(items)) < total_jobs, + "prev_offset": max(offset - limit, 0) if offset > 0 else None, + "next_offset": (offset + limit) if (offset + len(items)) < total_jobs else None, + }, + } + + +@router.get("/evaluation/{evaluation_id}") +def get_evaluation_detail(evaluation_id: str, request: Request, db: sqlite3.Connection = Depends(get_db)): + _ensure_evaluation_tables() + running_jobs = set() + if request: + tasks = _ensure_eval_task_map(request.app) + running_jobs = {job_id for job_id, task in tasks.items() if not task.done()} + with sqlite3.connect(str(DB_PATH)) as conn: + conn.row_factory = sqlite3.Row + job = conn.execute( + """ + SELECT evaluation_id, dataset, split, start_idx, slice_count, prompt_profile, timeout_sec, + recursion_limit, sandbox, cwd, arm, status, notes, created_at, updated_at + FROM evaluation_jobs + WHERE evaluation_id = ? + LIMIT 1 + """, + (evaluation_id,), + ).fetchone() + if not job: + raise HTTPException(status_code=404, detail="evaluation not found") + rows = conn.execute( + """ + SELECT thread_id, run_id, start_idx, item_index, created_at + FROM evaluation_job_threads + WHERE evaluation_id = ? + ORDER BY item_index ASC + """, + (evaluation_id,), + ).fetchall() + + status = str(job["status"] or "pending") + notes = job["notes"] or "" + if status == "running" and evaluation_id not in running_jobs: + if _is_eval_runner_alive(evaluation_id, notes): + if "runner_lost_pid_alive:" not in notes: + notes = f"{notes} | runner_lost_pid_alive: runner process still alive".strip(" |") + _update_evaluation_job_status(evaluation_id, "running", notes) + status = "running" + else: + if "runner_lost:" not in notes: + notes = f"{notes} | runner_lost: task not active after restart".strip(" |") + _update_evaluation_job_status(evaluation_id, "error", notes) + status = "error" + if evaluation_id in running_jobs: + status = "running" + score = _load_evaluation_score( + evaluation_id=evaluation_id, + cwd=job["cwd"], + notes=notes, + ) + # @@@eval-status-recover-pid - recover stale error rows to running when runner pid is still alive and score has not closed. + if status == "error" and not bool(score.get("scored")): + if _is_eval_runner_alive(evaluation_id, notes): + if "runner_recovered_pid_alive:" not in notes: + notes = f"{notes} | runner_recovered_pid_alive: runner process still alive".strip(" |") + _update_evaluation_job_status(evaluation_id, "running", notes) + status = "running" + if len(rows) == 0: + with sqlite3.connect(str(DB_PATH)) as conn: + inserted = _backfill_eval_threads_from_score( + conn, + evaluation_id=evaluation_id, + start_idx=int(job["start_idx"] or 0), + created_at=job["created_at"], + score=score, + ) + if inserted > 0: + conn.commit() + conn.row_factory = sqlite3.Row + rows = conn.execute( + """ + SELECT thread_id, run_id, start_idx, item_index, created_at + FROM evaluation_job_threads + WHERE evaluation_id = ? + ORDER BY item_index ASC + """, + (evaluation_id,), + ).fetchall() + status = _derive_evaluation_status(status, score) + if status != str(job["status"] or "pending"): + _update_evaluation_job_status(evaluation_id, status, notes) + thread_items = [] + mapped_threads = len(rows) + running_count = 0 + done_count = 0 + live_session_progress = _load_live_eval_session_progress(evaluation_id, job["cwd"], notes) + live_sessions = _load_live_eval_sessions(evaluation_id, job["cwd"], notes) + live_session_by_thread = {str(s["thread_id"]): s for s in live_sessions} + row_by_thread = {str(r["thread_id"]): r for r in rows} + merged_thread_ids: list[str] = [] + for s in live_sessions: + tid = str(s["thread_id"]) + if tid not in merged_thread_ids: + merged_thread_ids.append(tid) + for r in rows: + tid = str(r["thread_id"]) + if tid not in merged_thread_ids: + merged_thread_ids.append(tid) + + # @@@eval-detail-thread-source-unify - running phase has live sessions before evaluation_job_threads is persisted; + # build detail rows from merged(live sessions, persisted mappings) so "count" and table rows stay consistent. + start_idx_base = int(job["start_idx"] or 0) + for idx, thread_id in enumerate(merged_thread_ids): + row = row_by_thread.get(thread_id) + live_session = live_session_by_thread.get(thread_id) + session = _load_latest_session(db, thread_id) + session_row = session if session else None + if not session_row and live_session: + session_row = { + "chat_session_id": live_session["chat_session_id"], + "status": live_session["status"], + "started_at": live_session["started_at"], + "last_active_at": live_session["last_active_at"], + } + run = _load_run_stats(thread_id, row["run_id"] if row else evaluation_id) + running = bool(status == "running" and session and session["status"] == "active") + if not session and live_session: + running = bool(status == "running" and str(live_session["status"]) == "active") + if running: + running_count += 1 + elif session_row and session_row["status"] and session_row["status"] != "active": + done_count += 1 + thread_items.append( + { + "thread_id": thread_id, + "thread_url": f"/thread/{thread_id}", + "start_idx": int(row["start_idx"] or (start_idx_base + idx)) if row else (start_idx_base + idx), + "item_index": int(row["item_index"] or idx) if row else idx, + "created_at": (row["created_at"] if row else (live_session["started_at"] if live_session else None)), + "created_ago": ( + format_time_ago(row["created_at"]) + if row and row["created_at"] + else (format_time_ago(live_session["started_at"]) if live_session and live_session["started_at"] else None) + ), + "run": run, + "session": { + "session_id": session_row["chat_session_id"] if session_row else None, + "session_url": f"/session/{session_row['chat_session_id']}" if session else None, + "status": session_row["status"] if session_row else None, + "started_ago": format_time_ago(session_row["started_at"]) if session_row and session_row["started_at"] else None, + "last_active_ago": format_time_ago(session_row["last_active_at"]) + if session_row and session_row["last_active_at"] + else None, + }, + "status": "running" if running else (session_row["status"] if session_row else ("running" if status == "running" else "idle")), + "running": running, + } + ) + + total = len(thread_items) + if status == "running": + # @@@eval-live-progress-from-checkpoints - evaluation thread mappings are persisted at the end, so derive interim running count from live checkpoint data. + checkpoint_started = _count_live_eval_threads(evaluation_id) + running_count = max(running_count, checkpoint_started) + total = max(total, running_count) + if live_session_progress: + total = max(total, int(live_session_progress["total"])) + if mapped_threads == 0: + running_count = max(0, min(total, int(live_session_progress["running"]))) + done_count = max(0, min(total, int(live_session_progress["done"]))) + else: + running_count = max(running_count, min(total, int(live_session_progress["running"]))) + done_count = max(done_count, min(total, int(live_session_progress["done"]))) + threads_done = max(total - running_count, 0) + if live_session_progress: + threads_done = max(threads_done, min(total, int(live_session_progress["done"]))) + threads_started = max(0, min(total, threads_done + running_count)) + # @@@eval-progress-source - when no persisted thread mapping exists yet, running count is checkpoint-derived + # "started thread" estimate and must not be presented as exact in-flight count. + progress_source = "thread_rows" + if status == "running" and mapped_threads == 0: + progress_source = "session_rows" if live_session_progress else "checkpoint_estimate" + + return { + "evaluation_id": evaluation_id, + "breadcrumb": [ + {"label": "Evaluation", "url": "/evaluation"}, + {"label": evaluation_id, "url": f"/evaluation/{evaluation_id}"}, + ], + "info": { + "dataset": job["dataset"], + "split": job["split"], + "start_idx": int(job["start_idx"] or 0), + "slice_count": int(job["slice_count"] or 0), + "prompt_profile": job["prompt_profile"], + "timeout_sec": int(job["timeout_sec"] or 0), + "recursion_limit": int(job["recursion_limit"] or 0), + "sandbox": job["sandbox"], + "cwd": job["cwd"], + "arm": job["arm"], + "status": status, + "notes": notes, + "created_at": job["created_at"], + "created_ago": format_time_ago(job["created_at"]) if job["created_at"] else None, + "updated_at": job["updated_at"], + "updated_ago": format_time_ago(job["updated_at"]) if job["updated_at"] else None, + "threads_total": total, + "threads_running": running_count, + "threads_done": threads_done, + "threads_started": threads_started, + "progress_source": progress_source, + "score": score, + }, + "threads": {"title": "Evaluation Threads", "count": total, "items": thread_items}, + } + + +@router.get("/evaluation/runs") +def list_evaluation_runs(limit: int = 30, request: Request = None): + """Backward-compatible endpoint, now returns evaluation jobs.""" + return list_evaluations(limit=limit, request=request) + + +@router.get("/session/{session_id}") +def get_session(session_id: str, db: sqlite3.Connection = Depends(get_db)): + session = db.execute( + """ + SELECT + cs.chat_session_id, + cs.thread_id, + cs.terminal_id, + cs.lease_id, + cs.status, + cs.started_at, + cs.last_active_at, + cs.ended_at, + cs.close_reason, + sl.provider_name, + sl.desired_state, + sl.observed_state, + sl.current_instance_id, + sl.last_error + FROM chat_sessions cs + LEFT JOIN sandbox_leases sl ON cs.lease_id = sl.lease_id + WHERE cs.chat_session_id = ? + LIMIT 1 + """, + (session_id,), + ).fetchone() + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + return { + "session_id": session_id, + "thread_id": session["thread_id"], + "thread_url": f"/thread/{session['thread_id']}", + "breadcrumb": [ + {"label": "Threads", "url": "/threads"}, + {"label": session["thread_id"][:8], "url": f"/thread/{session['thread_id']}"}, + {"label": session_id[:8], "url": f"/session/{session_id}"}, + ], + "info": { + "status": session["status"], + "terminal_id": session["terminal_id"], + "lease_id": session["lease_id"], + "provider": session["provider_name"], + "instance_id": session["current_instance_id"], + "started_at": session["started_at"], + "started_ago": format_time_ago(session["started_at"]), + "last_active_at": session["last_active_at"], + "last_active_ago": format_time_ago(session["last_active_at"]), + "ended_at": session["ended_at"], + "ended_ago": format_time_ago(session["ended_at"]) if session["ended_at"] else None, + "close_reason": session["close_reason"], + "error": session["last_error"], + "state_badge": make_badge(session["desired_state"], session["observed_state"]), + }, + } + + +@router.get("/thread/{thread_id}/trace") +def get_thread_trace(thread_id: str, run_id: str | None = None, limit: int = 2000): + """Canonical trace endpoint: trace belongs to thread/run.""" + return load_thread_trace_payload(thread_id=thread_id, run_id=run_id, limit=limit) + + @router.get("/leases") def list_leases(db: sqlite3.Connection = Depends(get_db)): rows = db.execute(""" diff --git a/backend/web/routers/threads.py b/backend/web/routers/threads.py index 185c733cd..3d479d236 100644 --- a/backend/web/routers/threads.py +++ b/backend/web/routers/threads.py @@ -30,7 +30,7 @@ get_session_status, get_terminal_status, ) -from backend.web.utils.helpers import delete_thread_in_db +from backend.web.utils.helpers import delete_thread_in_db, lookup_thread_mode from backend.web.utils.serializers import serialize_message logger = logging.getLogger(__name__) @@ -59,6 +59,10 @@ async def create_thread( """Create a new thread with optional sandbox and cwd.""" sandbox_type = payload.sandbox if payload else "local" + thread_mode = (payload.mode if payload else "normal").strip().lower() + if thread_mode not in {"normal", "evaluation"}: + raise HTTPException(status_code=400, detail="mode must be one of: normal, evaluation") + keep_full_trace = thread_mode == "evaluation" thread_id = str(uuid.uuid4()) cwd = payload.cwd if payload else None agent_name = payload.agent if payload else None @@ -67,8 +71,14 @@ async def create_thread( app.state.thread_cwd[thread_id] = cwd from backend.web.utils.helpers import get_active_observation_provider, init_thread_config, save_thread_config - init_thread_config(thread_id, sandbox_type, cwd) model = payload.model if payload else None + init_thread_config( + thread_id, + sandbox_type, + cwd, + thread_mode=thread_mode, + keep_full_trace=keep_full_trace, + ) obs_provider = get_active_observation_provider() updates = {} if model: @@ -79,7 +89,12 @@ async def create_thread( updates["agent"] = agent_name if updates: save_thread_config(thread_id, **updates) - return {"thread_id": thread_id, "sandbox": sandbox_type, "agent": agent_name} + return { + "thread_id": thread_id, + "sandbox": sandbox_type, + "mode": thread_mode, + "keep_full_trace": keep_full_trace, + } @router.get("") @@ -338,13 +353,21 @@ async def run_thread( # Per-request model override (lightweight, no rebuild) if payload.model: await asyncio.to_thread(agent.update_config, model=payload.model) + thread_mode = lookup_thread_mode(thread_id) + enable_trajectory = payload.enable_trajectory or thread_mode == "evaluation" lock = await get_thread_lock(app, thread_id) async with lock: if hasattr(agent, "runtime") and not agent.runtime.transition(AgentState.ACTIVE): raise HTTPException(status_code=409, detail="Thread is already running") - buf = start_agent_run(agent, thread_id, payload.message, app, payload.enable_trajectory) - return {"run_id": buf.run_id, "thread_id": thread_id} + + buf = start_agent_run(agent, thread_id, payload.message, app, enable_trajectory) + return { + "run_id": buf.run_id, + "thread_id": thread_id, + "mode": thread_mode, + "enable_trajectory": enable_trajectory, + } @router.get("/{thread_id}/runs/events") diff --git a/backend/web/services/streaming_service.py b/backend/web/services/streaming_service.py index 355f99210..8a3b0deaf 100644 --- a/backend/web/services/streaming_service.py +++ b/backend/web/services/streaming_service.py @@ -539,22 +539,10 @@ async def run_agent_stream(): # Consume followup queue: if messages are pending, start a new run followup = None try: - qm = app.state.queue_manager - followup = qm.dequeue(thread_id) - if followup and app: - if hasattr(agent, "runtime") and agent.runtime.transition(AgentState.ACTIVE): - start_agent_run(agent, thread_id, followup, app) - except Exception: - logger.exception("Failed to consume followup queue for thread %s", thread_id) - # Re-enqueue the message if it was already dequeued to prevent data loss - if followup: - try: - app.state.queue_manager.enqueue(followup, thread_id) - except Exception: - logger.error("Failed to re-enqueue followup for thread %s — message lost: %.200s", thread_id, followup) + from backend.web.utils.helpers import should_keep_full_trace - try: - await cleanup_old_runs(thread_id, keep_latest=1, run_event_repo=run_event_repo) + if not should_keep_full_trace(thread_id): + await cleanup_old_runs(thread_id, keep_latest=1) except Exception: pass if run_event_repo is not None: diff --git a/backend/web/utils/helpers.py b/backend/web/utils/helpers.py index 0868851d2..43cb5da50 100644 --- a/backend/web/utils/helpers.py +++ b/backend/web/utils/helpers.py @@ -71,17 +71,28 @@ def extract_webhook_instance_id(payload: dict[str, Any]) -> str | None: return None -def _get_container() -> StorageContainer: - global _cached_container, _cached_container_db_path - if _cached_container is not None and _cached_container_db_path == DB_PATH: - return _cached_container - _cached_container = build_storage_container(main_db_path=DB_PATH) - _cached_container_db_path = DB_PATH - return _cached_container - - -def _build_thread_config_repo() -> ThreadConfigRepo: - return _get_container().thread_config_repo() +def _ensure_thread_config_table(conn: sqlite3.Connection) -> None: + """Create thread_config table and run migrations from old thread_metadata.""" + # Migrate old table name if exists + tables = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} + if "thread_metadata" in tables and "thread_config" not in tables: + conn.execute("ALTER TABLE thread_metadata RENAME TO thread_config") + conn.execute( + "CREATE TABLE IF NOT EXISTS thread_config" + "(thread_id TEXT PRIMARY KEY, sandbox_type TEXT NOT NULL, cwd TEXT, model TEXT, queue_mode TEXT DEFAULT 'steer')" + ) + for col, default in [ + ("model", None), + ("queue_mode", "'steer'"), + ("observation_provider", None), + ("thread_mode", "'normal'"), + ("keep_full_trace", "'0'"), + ]: + try: + default_clause = f" DEFAULT {default}" if default else "" + conn.execute(f"ALTER TABLE thread_config ADD COLUMN {col} TEXT{default_clause}") + except sqlite3.OperationalError: + pass def save_thread_config(thread_id: str, **fields: Any) -> None: @@ -89,7 +100,15 @@ def save_thread_config(thread_id: str, **fields: Any) -> None: Usage: save_thread_config(thread_id, model="gpt-4") """ - allowed = {"sandbox_type", "cwd", "model", "observation_provider", "agent"} + allowed = { + "sandbox_type", + "cwd", + "model", + "queue_mode", + "observation_provider", + "thread_mode", + "keep_full_trace", + } updates = {k: v for k, v in fields.items() if k in allowed} if not updates: return @@ -104,30 +123,48 @@ def load_thread_config(thread_id: str): """Load full thread config from SQLite. Returns ThreadConfig or None.""" repo = _build_thread_config_repo() try: - row = repo.lookup_config(thread_id) - if not row: - return None - from backend.web.models.thread_config import ThreadConfig - - return ThreadConfig( - sandbox_type=row["sandbox_type"] or "local", - cwd=row["cwd"], - model=row["model"], - queue_mode=row["queue_mode"] or "steer", - observation_provider=row["observation_provider"], - agent=row.get("agent"), + with sqlite3.connect(str(DB_PATH)) as conn: + conn.row_factory = sqlite3.Row + _ensure_thread_config_table(conn) + row = conn.execute( + "SELECT sandbox_type, cwd, model, queue_mode, observation_provider, thread_mode, keep_full_trace " + "FROM thread_config WHERE thread_id = ?", + (thread_id,), + ).fetchone() + if not row: + return None + from backend.web.models.thread_config import ThreadConfig + + return ThreadConfig( + sandbox_type=row["sandbox_type"], + cwd=row["cwd"], + model=row["model"], + queue_mode=row["queue_mode"] or "steer", + observation_provider=row["observation_provider"], + thread_mode=row["thread_mode"] or "normal", + keep_full_trace=str(row["keep_full_trace"] or "0") in {"1", "true", "True"}, + ) + except sqlite3.OperationalError: + return None + + +def init_thread_config( + thread_id: str, + sandbox_type: str, + cwd: str | None, + thread_mode: str = "normal", + keep_full_trace: bool = False, +) -> None: + """Create initial thread config row in SQLite.""" + with sqlite3.connect(str(DB_PATH)) as conn: + _ensure_thread_config_table(conn) + conn.execute( + "INSERT OR REPLACE INTO thread_config " + "(thread_id, sandbox_type, cwd, thread_mode, keep_full_trace) " + "VALUES (?, ?, ?, ?, ?)", + (thread_id, sandbox_type, cwd, thread_mode, "1" if keep_full_trace else "0"), ) - finally: - repo.close() - - -def init_thread_config(thread_id: str, sandbox_type: str, cwd: str | None) -> None: - """Create initial thread config row in storage repo.""" - repo = _build_thread_config_repo() - try: - repo.save_metadata(thread_id, sandbox_type, cwd) - finally: - repo.close() + conn.commit() def get_active_observation_provider() -> str | None: @@ -156,6 +193,18 @@ def lookup_thread_model(thread_id: str) -> str | None: repo.close() +def lookup_thread_mode(thread_id: str) -> str: + """Look up thread mode. Defaults to normal.""" + config = load_thread_config(thread_id) + return config.thread_mode if config and config.thread_mode else "normal" + + +def should_keep_full_trace(thread_id: str) -> bool: + """Whether thread run events should be fully retained.""" + config = load_thread_config(thread_id) + return bool(config.keep_full_trace) if config else False + + def resolve_local_workspace_path( raw_path: str | None, thread_id: str | None = None, diff --git a/frontend/monitor/src/App.tsx b/frontend/monitor/src/App.tsx index 3cfe19393..c677ec799 100644 --- a/frontend/monitor/src/App.tsx +++ b/frontend/monitor/src/App.tsx @@ -1,5 +1,5 @@ import React from 'react'; -import { BrowserRouter, Routes, Route, Link, useParams, useNavigate } from 'react-router-dom'; +import { BrowserRouter, Routes, Route, Link, NavLink, useLocation, useParams } from 'react-router-dom'; import './styles.css'; const API_BASE = '/api/monitor'; @@ -7,7 +7,36 @@ const API_BASE = '/api/monitor'; // Utility: Fetch JSON from API async function fetchAPI(path: string) { const res = await fetch(`${API_BASE}${path}`); - return res.json(); + const text = await res.text(); + let payload: any = {}; + try { + payload = text ? JSON.parse(text) : {}; + } catch { + throw new Error(`Invalid JSON from ${path} (${res.status}): ${text.slice(0, 180)}`); + } + if (!res.ok) { + throw new Error(payload?.detail || `${res.status} ${res.statusText}`); + } + return payload; +} + +async function fetchJSON(path: string, init?: RequestInit) { + const res = await fetch(path, init); + const text = await res.text(); + let payload: any = {}; + try { + payload = text ? JSON.parse(text) : {}; + } catch { + throw new Error(`Invalid JSON from ${path} (${res.status}): ${text.slice(0, 180)}`); + } + if (!res.ok) { + throw new Error(payload?.detail || `${res.status} ${res.statusText}`); + } + return payload; +} + +async function fetchThreadAPI(path: string) { + return fetchJSON(`/api/threads${path}`); } // Component: Breadcrumb navigation @@ -40,45 +69,104 @@ function StateBadge({ badge }: { badge: any }) { // Page: Threads List function ThreadsPage() { const [data, setData] = React.useState(null); + const [loading, setLoading] = React.useState(false); + const [offset, setOffset] = React.useState(0); + const [limit, setLimit] = React.useState(50); + + const loadThreads = React.useCallback(async () => { + setLoading(true); + try { + const payload = await fetchAPI(`/threads?offset=${offset}&limit=${limit}`); + setData(payload); + } finally { + setLoading(false); + } + }, [offset, limit]); React.useEffect(() => { - fetchAPI('/threads').then(setData); - }, []); + void loadThreads(); + }, [loadThreads]); if (!data) return
Loading...
; + const pagination = data.pagination || {}; + const total = Number(pagination.total || data.count || 0); + const currentCount = Number(data.count || 0); + const from = total > 0 ? offset + 1 : 0; + const to = offset + currentCount; + const page = Number(pagination.page || 1); return (

{data.title}

-

Total: {data.count}

- - - - - - - - - - - - - {data.items.map((item: any) => ( - - - - - - - +

Showing {from}-{to} of {total} | page {page}

+
+
+
+ + + +
+
+ Rows: + +
+
+
Thread IDSessionsLast ActiveLeaseProviderState
{item.thread_id.slice(0, 8)}{item.session_count}{item.last_active_ago} - {item.lease.lease_id ? ( - {item.lease.lease_id} - ) : '-'} - {item.lease.provider || '-'}
+ + + + + + + + + - ))} - -
Thread IDModeSessionsLast ActiveLeaseProviderState
+ + + {data.items.map((item: any) => ( + + {item.thread_id.slice(0, 8)} + {item.thread_mode || 'normal'} / trace={item.keep_full_trace ? 'full' : 'latest'} + {item.session_count} + {item.last_active_ago} + + {item.lease.lease_id ? ( + {item.lease.lease_id} + ) : '-'} + + {item.lease.provider || '-'} + + + ))} + + +
); } @@ -93,11 +181,15 @@ function ThreadDetailPage() { }, [threadId]); if (!data) return
Loading...
; + const threadIsActive = Array.isArray(data?.sessions?.items) + ? data.sessions.items.some((s: any) => s.status === 'active') + : false; return (

Thread: {data.thread_id.slice(0, 8)}

+

mode: {data.thread_mode || 'normal'} | trace: {data.keep_full_trace ? 'full' : 'latest'}

{data.sessions.title} ({data.sessions.count})

@@ -143,6 +235,829 @@ function ThreadDetailPage() { ))}
+ + +
+ ); +} + +function summarizeTraceEvent(eventType: string, payload: any): string { + if (eventType === 'tool_call') return `${payload?.name || 'tool'}(${JSON.stringify(payload?.args || {})})`; + if (eventType === 'tool_result') return `${payload?.name || 'tool'} -> ${String(payload?.content || '').slice(0, 240)}`; + if (eventType === 'text') return String(payload?.content || '').slice(0, 120); + if (eventType === 'status') { + const state = typeof payload?.state === 'string' ? payload.state : JSON.stringify(payload?.state || '-'); + return `state=${state} calls=${payload?.call_count ?? '-'}`; + } + if (eventType === 'error') return payload?.error || 'error'; + if (eventType === 'done') return 'done'; + return JSON.stringify(payload).slice(0, 120); +} + +type TraceItem = { + seq: number | null; + run_id: string | null; + created_at?: string | null; + created_ago?: string | null; + event_type: string; + actor: 'assistant' | 'tool' | 'runtime'; + summary: string; + payload: any; +}; + +function normalizeTraceEvent(eventType: string, payload: any): TraceItem | null { + const seq = payload?._seq ?? null; + const run_id = payload?._run_id ?? null; + + if (eventType === 'text') { + const content = typeof payload?.content === 'string' ? payload.content : String(payload?.content ?? ''); + if (!content) return null; + return { seq, run_id, event_type: 'assistant_text', actor: 'assistant', summary: content, payload }; + } + + if (eventType === 'tool_call') { + return { + seq, + run_id, + event_type: 'tool_call', + actor: 'tool', + summary: `${payload?.name || 'tool'}`, + payload, + }; + } + + if (eventType === 'tool_result') { + return { + seq, + run_id, + event_type: 'tool_result', + actor: 'tool', + summary: `${payload?.name || 'tool'}`, + payload, + }; + } + + if (eventType === 'status') { + const state = typeof payload?.state === 'string' ? payload.state : JSON.stringify(payload?.state || '-'); + return { + seq, + run_id, + event_type: 'status', + actor: 'runtime', + summary: `state=${state} calls=${payload?.call_count ?? '-'}`, + payload, + }; + } + + if (eventType === 'error' || eventType === 'cancelled' || eventType === 'done') { + return { + seq, + run_id, + event_type: eventType, + actor: 'runtime', + summary: summarizeTraceEvent(eventType, payload), + payload, + }; + } + return null; +} + +function normalizeStoredTraceEvent(row: any, fallbackRunId: string | null): TraceItem | null { + const payload = row?.payload || {}; + if (payload?._seq == null && row?.seq != null) payload._seq = row.seq; + if (payload?._run_id == null && fallbackRunId) payload._run_id = fallbackRunId; + const normalized = normalizeTraceEvent(String(row?.event_type || ''), payload); + if (!normalized) return null; + return { + ...normalized, + seq: row?.seq ?? normalized.seq, + run_id: fallbackRunId || normalized.run_id, + created_at: row?.created_at || null, + created_ago: row?.created_ago || null, + }; +} + +function mergeTraceItems(prev: TraceItem[], next: TraceItem): TraceItem[] { + const last = prev.length ? prev[prev.length - 1] : null; + + // @@@streaming-text-fold - collapse token-level text stream into one assistant step for readable trace timeline. + if (next.event_type === 'assistant_text' && last && last.event_type === 'assistant_text' && last.run_id === next.run_id) { + const merged = [...prev]; + merged[merged.length - 1] = { + ...last, + seq: next.seq ?? last.seq, + summary: `${last.summary}${next.summary}`, + payload: next.payload, + }; + return merged; + } + + // @@@status-coalesce - keep only latest status snapshot for same run to reduce noise. + if (next.event_type === 'status' && last && last.event_type === 'status' && last.run_id === next.run_id) { + const merged = [...prev]; + merged[merged.length - 1] = next; + return merged; + } + + return [...prev, next]; +} + +type TraceStep = { + step: number; + run_id: string | null; + seq_start: number | null; + seq_end: number | null; + created_ago: string | null; + assistant_text: string; + tool_name: string | null; + tool_args: any; + command_line: string | null; + tool_output: string | null; + runtime_notes: string[]; + raw_items: TraceItem[]; +}; + +function buildTraceSteps(items: TraceItem[]): TraceStep[] { + const steps: TraceStep[] = []; + let assistantBuffer: string[] = []; + let pending: Omit | null = null; + + const pushStep = (step: Omit) => { + steps.push({ ...step, step: steps.length + 1 }); + }; + + for (const item of items) { + if (item.event_type === 'assistant_text') { + if (pending) { + pending.runtime_notes.push(item.summary); + pending.raw_items.push(item); + pending.seq_end = item.seq ?? pending.seq_end; + } else { + assistantBuffer.push(item.summary); + } + continue; + } + + if (item.event_type === 'tool_call') { + if (pending) { + pushStep(pending); + pending = null; + } + pending = { + run_id: item.run_id, + seq_start: item.seq, + seq_end: item.seq, + created_ago: item.created_ago || null, + assistant_text: assistantBuffer.join('\n').trim(), + tool_name: item.payload?.name || item.summary, + tool_args: item.payload?.args || {}, + command_line: item.payload?.args?.CommandLine ? String(item.payload.args.CommandLine) : null, + tool_output: null, + runtime_notes: [], + raw_items: [item], + }; + assistantBuffer = []; + continue; + } + + if (item.event_type === 'tool_result') { + if (pending && !pending.tool_output) { + pending.tool_output = String(item.payload?.content || '(no output)'); + pending.raw_items.push(item); + pending.seq_end = item.seq ?? pending.seq_end; + } else { + pushStep({ + run_id: item.run_id, + seq_start: item.seq, + seq_end: item.seq, + created_ago: item.created_ago || null, + assistant_text: assistantBuffer.join('\n').trim(), + tool_name: item.payload?.name || item.summary, + tool_args: null, + command_line: null, + tool_output: String(item.payload?.content || '(no output)'), + runtime_notes: [], + raw_items: [item], + }); + assistantBuffer = []; + } + continue; + } + + const runtimeNote = item.event_type === 'status' ? formatStatusSummary(item.payload) : item.summary; + if (pending) { + pending.runtime_notes.push(runtimeNote); + pending.raw_items.push(item); + pending.seq_end = item.seq ?? pending.seq_end; + if (item.event_type === 'error' || item.event_type === 'cancelled' || item.event_type === 'done') { + pushStep(pending); + pending = null; + } + } else { + pushStep({ + run_id: item.run_id, + seq_start: item.seq, + seq_end: item.seq, + created_ago: item.created_ago || null, + assistant_text: assistantBuffer.join('\n').trim(), + tool_name: null, + tool_args: null, + command_line: null, + tool_output: null, + runtime_notes: [runtimeNote], + raw_items: [item], + }); + assistantBuffer = []; + } + } + + if (pending) pushStep(pending); + + const remain = assistantBuffer.join('\n').trim(); + if (remain) { + pushStep({ + run_id: items.length ? items[items.length - 1].run_id : null, + seq_start: null, + seq_end: null, + created_ago: null, + assistant_text: remain, + tool_name: null, + tool_args: null, + command_line: null, + tool_output: null, + runtime_notes: [], + raw_items: [], + }); + } + + return steps; +} + +function shortId(value: string | null, size = 8): string { + if (!value) return '-'; + return String(value).slice(0, size); +} + +function evalThreadLabel(threadId: string | null, evaluationId: string | null): string { + if (!threadId) return '-'; + if (!evaluationId) return shortId(threadId, 20); + const prefix = `swebench-${evaluationId}-`; + if (threadId.startsWith(prefix)) { + const instanceId = threadId.slice(prefix.length); + return instanceId || shortId(threadId, 20); + } + return shortId(threadId, 20); +} + +function formatPct(value: any): string { + const num = Number(value); + if (!Number.isFinite(num)) return '-'; + return `${num.toFixed(1)}%`; +} + +function formatResolvedScore(item: any): string { + const resolved = Number(item?.score?.resolved_instances ?? 0); + const total = Number(item?.score?.total_instances ?? 0); + return `${resolved}/${total} (${formatPct(item?.score?.resolved_rate_pct)})`; +} + +function evalProgress(item: any): { + done: number; + target: number; + running: number; + pct: number; + mode: 'thread_rows' | 'session_rows' | 'checkpoint_estimate'; +} { + const doneRaw = Number(item?.threads_done ?? 0); + const runningRaw = Number(item?.threads_running ?? 0); + const targetRaw = Number(item?.slice_count ?? item?.threads_total ?? 0); + const modeRaw = String(item?.progress_source || ''); + const done = Number.isFinite(doneRaw) ? Math.max(0, doneRaw) : 0; + const running = Number.isFinite(runningRaw) ? Math.max(0, runningRaw) : 0; + const targetCandidate = Number.isFinite(targetRaw) ? Math.max(0, targetRaw) : 0; + const mode = + modeRaw === 'checkpoint_estimate' || modeRaw === 'session_rows' + ? modeRaw + : 'thread_rows'; + const target = targetCandidate > 0 ? targetCandidate : Math.max(done + running, 0); + // @@@progress-active-ratio - evaluation threads can be running long before any thread reaches "done". + // Use (done + running) to reflect visible in-flight progress instead of a flat 0% bar. + const active = Math.min(target, done + running); + const pct = target > 0 ? Math.min(100, (active / target) * 100) : 0; + return { done, target, running, pct, mode }; +} + +function formatProgressSummary(progress: { + done: number; + target: number; + running: number; + pct: number; + mode: 'thread_rows' | 'session_rows' | 'checkpoint_estimate'; +}): string { + const pending = Math.max(0, progress.target - progress.done - progress.running); + const activeLabel = progress.mode === 'checkpoint_estimate' ? 'Started' : 'In Progress'; + const sourceSuffix = progress.mode === 'thread_rows' ? '' : ` · source=${progress.mode}`; + return `Total ${progress.target} · Completed ${progress.done} · ${activeLabel} ${progress.running} · Pending ${pending} · Progress ${formatPct(progress.pct)}${sourceSuffix}`; +} + +function formatStatusSummary(payload: any): string { + const stateText = + typeof payload?.state === 'string' + ? payload.state + : payload?.state?.state || JSON.stringify(payload?.state || '-'); + const calls = payload?.call_count ?? '-'; + const inTokens = payload?.input_tokens ?? payload?.token_count ?? '-'; + const outTokens = payload?.output_tokens ?? '-'; + return `state=${stateText} calls=${calls} tokens=${inTokens}/${outTokens}`; +} + +function conversationText(content: any): string { + if (typeof content === 'string') return content; + if (Array.isArray(content)) { + return content + .map((part) => { + if (typeof part === 'string') return part; + if (part && typeof part === 'object' && part.type === 'text') return String(part.text || ''); + return JSON.stringify(part); + }) + .join(''); + } + if (content == null) return ''; + return typeof content === 'object' ? JSON.stringify(content, null, 2) : String(content); +} + +function ConversationTraceCard({ message, index }: { message: any; index: number }) { + const msgType = String(message?.type || 'Unknown'); + const text = conversationText(message?.content); + const toolCalls = Array.isArray(message?.tool_calls) ? message.tool_calls : []; + return ( +
+
+
+ [{index}] + {msgType} +
+ id {shortId(message?.id || '-', 12)} +
+ + {toolCalls.length > 0 && ( +
+
tool_calls
+
{JSON.stringify(toolCalls, null, 2)}
+
+ )} + + {message?.tool_call_id && ( +
+
tool_call_id
+
{String(message.tool_call_id)}
+
+ )} + +
+
content
+
{text || '(empty)'}
+
+ +
+ Raw message +
{JSON.stringify(message, null, 2)}
+
+
+ ); +} + +function TraceCard({ item }: { item: TraceItem }) { + const statusText = item.event_type === 'status' ? formatStatusSummary(item.payload) : null; + const commandLine = item.payload?.args?.CommandLine; + const toolArgs = item.payload?.args; + const toolOutput = item.payload?.content; + return ( +
+
+
+ #{item.seq ?? '-'} + {item.actor} + {item.event_type} +
+ run {shortId(item.run_id)} +
+ + {item.event_type === 'assistant_text' && ( +
{item.summary}
+ )} + + {item.event_type === 'tool_call' && ( +
+
Tool
+
{item.payload?.name || item.summary}
+ {commandLine && ( + <> +
CommandLine
+
{String(commandLine)}
+ + )} +
Args
+
{JSON.stringify(toolArgs || {}, null, 2)}
+
+ )} + + {item.event_type === 'tool_result' && ( +
+
Tool
+
{item.payload?.name || item.summary}
+
Output
+
{String(toolOutput || '(no output)')}
+
+ )} + + {item.event_type === 'status' && ( +
+
Runtime
+
{statusText}
+
+ )} + + {(item.event_type === 'error' || item.event_type === 'cancelled' || item.event_type === 'done') && ( +
{item.summary}
+ )} + +
+ Raw payload +
{JSON.stringify(item.payload, null, 2)}
+
+
+ ); +} + +function TraceStepCard({ step }: { step: TraceStep }) { + return ( +
+
+
+ Step {step.step} + seq {step.seq_start ?? '-'}..{step.seq_end ?? '-'} + run {shortId(step.run_id)} +
+ {step.created_ago || '-'} +
+ + {step.assistant_text && ( +
+
Intent
+
{step.assistant_text}
+
+ )} + + {step.tool_name && ( +
+
Action
+
{step.tool_name}
+ {step.command_line && ( + <> +
CommandLine
+
{step.command_line}
+ + )} + {step.tool_args && ( + <> +
Args
+
{JSON.stringify(step.tool_args, null, 2)}
+ + )} +
+ )} + + {step.tool_output != null && ( +
+
Observation
+
{step.tool_output}
+
+ )} + + {step.runtime_notes.length > 0 && ( +
+
Runtime
+
{step.runtime_notes.join('\n')}
+
+ )} + +
+ Raw events ({step.raw_items.length}) + {step.raw_items.map((item, idx) => ( +
+
+ #{item.seq || '-'} + {item.event_type} +
+
{JSON.stringify(item.payload, null, 2)}
+
+ ))} +
+
+ ); +} + +function ThreadTraceSection({ threadId, autoRefreshEnabled }: { threadId: string; autoRefreshEnabled: boolean }) { + const [traceEvents, setTraceEvents] = React.useState([]); + const [traceError, setTraceError] = React.useState(null); + const [traceLoading, setTraceLoading] = React.useState(false); + const [rawEventCount, setRawEventCount] = React.useState(0); + const [streamState, setStreamState] = React.useState<'idle' | 'polling' | 'error'>('idle'); + const [eventFilter, setEventFilter] = React.useState<'all' | 'assistant' | 'tool' | 'runtime'>('all'); + const [traceView, setTraceView] = React.useState<'conversation' | 'events' | 'steps'>('conversation'); + const [showRawTable, setShowRawTable] = React.useState(false); + const [selectedRunId, setSelectedRunId] = React.useState(''); + const [runCandidates, setRunCandidates] = React.useState([]); + const [autoRefresh, setAutoRefresh] = React.useState(true); + const [conversationMessages, setConversationMessages] = React.useState([]); + const [conversationLoading, setConversationLoading] = React.useState(false); + const [conversationError, setConversationError] = React.useState(null); + + const loadTrace = React.useCallback((runId: string) => { + if (!threadId) return; + const query = runId ? `?run_id=${encodeURIComponent(runId)}` : ''; + setTraceLoading(true); + setTraceError(null); + setStreamState('polling'); + fetchAPI(`/thread/${threadId}/trace${query}`) + .then((payload) => { + setRawEventCount(payload?.event_count || 0); + setRunCandidates(payload?.run_candidates || []); + if (!runId && payload?.run_id) { + setSelectedRunId((prev) => prev || String(payload.run_id)); + } + const normalized = (payload?.events || []) + .map((row: any) => normalizeStoredTraceEvent(row, payload?.run_id || runId || null)) + .filter(Boolean) as TraceItem[]; + const merged = normalized.reduce((acc: TraceItem[], item) => mergeTraceItems(acc, item), []); + setTraceEvents(merged); + setStreamState('idle'); + }) + .catch((e) => { + setTraceError(e.message); + setStreamState('error'); + }) + .finally(() => setTraceLoading(false)); + }, [threadId]); + + const loadConversation = React.useCallback(() => { + if (!threadId) return; + setConversationLoading(true); + setConversationError(null); + fetchThreadAPI(`/${threadId}`) + .then((payload) => { + setConversationMessages(Array.isArray(payload?.messages) ? payload.messages : []); + }) + .catch((e) => setConversationError(e.message)) + .finally(() => setConversationLoading(false)); + }, [threadId]); + + React.useEffect(() => { + if (!threadId) return; + setTraceEvents([]); + setRunCandidates([]); + setSelectedRunId(''); + loadTrace(''); + loadConversation(); + }, [threadId, loadTrace, loadConversation]); + + React.useEffect(() => { + if (!selectedRunId) return; + loadTrace(selectedRunId); + }, [selectedRunId, loadTrace]); + + React.useEffect(() => { + if (!threadId || !autoRefreshEnabled || !autoRefresh) return; + const timer = window.setInterval(() => { + loadTrace(selectedRunId); + loadConversation(); + }, 2000); + return () => window.clearInterval(timer); + }, [threadId, autoRefreshEnabled, autoRefresh, selectedRunId, loadTrace, loadConversation]); + + const traceTail = traceEvents.slice(-300); + const visibleTrace = traceTail.filter((item) => eventFilter === 'all' || item.actor === eventFilter); + const traceSteps = buildTraceSteps(visibleTrace); + const conversationTail = conversationMessages.slice(-200); + const traceStats = { + assistant: traceTail.filter((item) => item.actor === 'assistant').length, + tool: traceTail.filter((item) => item.actor === 'tool').length, + runtime: traceTail.filter((item) => item.actor === 'runtime').length, + }; + + return ( +
+

+ Thread Trace { + traceView === 'conversation' + ? 'Conversation' + : traceView === 'events' + ? 'Events' + : 'Steps' + } + {' '} + ({ + traceView === 'conversation' + ? `${conversationTail.length} messages` + : traceView === 'events' + ? `${visibleTrace.length} events` + : `${traceSteps.length} steps / ${visibleTrace.length} events` + }) +

+

+ status: {streamState} | run: {selectedRunId ? shortId(selectedRunId, 12) : '-'} | raw_events: {rawEventCount} | messages: {conversationTail.length} +

+
+ {traceView !== 'conversation' && ( + <> +
+ Run + +
+
+ {(['all', 'assistant', 'tool', 'runtime'] as const).map((kind) => ( + + ))} +
+ + )} +
+ + + +
+ + + +
+ {traceView === 'conversation' ? ( +
+ messages: {conversationTail.length} + loading: {conversationLoading ? 'yes' : 'no'} +
+ ) : ( +
+ assistant: {traceStats.assistant} + tool: {traceStats.tool} + runtime: {traceStats.runtime} + loading: {traceLoading ? 'yes' : 'no'} +
+ )} + {traceError &&
Trace load failed: {traceError}
} + {conversationError &&
Conversation load failed: {conversationError}
} +
+ {traceView === 'conversation' ? ( + <> + {conversationTail.map((message, idx) => ( + + ))} + {conversationTail.length === 0 &&
No conversation messages yet.
} + + ) : traceView === 'events' ? ( + <> + {visibleTrace.map((item, idx) => ( + + ))} + {visibleTrace.length === 0 &&
No trace events for this filter.
} + + ) : ( + <> + {traceSteps.map((step) => ( + + ))} + {traceSteps.length === 0 &&
No trace events for this filter.
} + + )} +
+ + {showRawTable && traceView !== 'conversation' && ( +
+ Raw trace table + + + + + + + + + + + + + + {traceTail.slice().reverse().map((item, idx) => ( + + + + + + + + + + ))} + +
StepActorEventSummaryRunWhenPayload
{item.seq || '-'}{item.actor}{item.event_type}{item.summary}{shortId(item.run_id)}{item.created_ago || '-'} +
+ view +
{JSON.stringify(item.payload, null, 2)}
+
+
+
+ )} +
+ ); +} + +// Page: Session Detail +function SessionDetailPage() { + const { sessionId } = useParams(); + const [data, setData] = React.useState(null); + const [error, setError] = React.useState(null); + + React.useEffect(() => { + if (!sessionId) return; + setError(null); + fetchAPI(`/session/${sessionId}`) + .then((payload) => setData(payload)) + .catch((e) => setError(e.message)); + }, [sessionId]); + + if (error) return
Session load failed: {error}
; + if (!data) return
Loading...
; + + return ( +
+ +

Session: {data.session_id.slice(0, 8)}

+ +
+
Thread: {data.thread_id.slice(0, 8)}
+
Status: {data.info.status}
+
Provider: {data.info.provider || '-'}
+
Started: {data.info.started_ago}
+
Last Active: {data.info.last_active_ago}
+
Ended: {data.info.ended_ago || '-'}
+
); } @@ -439,17 +1354,598 @@ function EventDetailPage() { ); } +// Page: Evaluation +function EvaluationPage() { + const location = useLocation(); + const [dataset, setDataset] = React.useState('SWE-bench/SWE-bench_Lite'); + const [split, setSplit] = React.useState('test'); + const [startIdx, setStartIdx] = React.useState('0'); + const [sliceCount, setSliceCount] = React.useState('10'); + const [promptProfile, setPromptProfile] = React.useState('heuristic'); + const [timeoutSec, setTimeoutSec] = React.useState('180'); + const [recursionLimit, setRecursionLimit] = React.useState('256'); + const [sandbox, setSandbox] = React.useState('local'); + const [runStatus, setRunStatus] = React.useState<'idle' | 'starting' | 'submitted' | 'error'>('idle'); + const [evaluationId, setEvaluationId] = React.useState(''); + const [runError, setRunError] = React.useState(null); + const [evaluations, setEvaluations] = React.useState([]); + const [evalOffset, setEvalOffset] = React.useState(0); + const [evalLimit] = React.useState(30); + const [evalPagination, setEvalPagination] = React.useState(null); + const [runsLoading, setRunsLoading] = React.useState(false); + const [composerOpen, setComposerOpen] = React.useState(false); + + const loadEvaluations = React.useCallback(async () => { + setRunsLoading(true); + try { + const payload = await fetchAPI(`/evaluations?limit=${evalLimit}&offset=${evalOffset}`); + setEvaluations(Array.isArray(payload?.items) ? payload.items : []); + setEvalPagination(payload?.pagination || null); + } catch (e: any) { + setRunError(e?.message || String(e)); + } finally { + setRunsLoading(false); + } + }, [evalLimit, evalOffset]); + + React.useEffect(() => { + void loadEvaluations(); + const timer = window.setInterval(() => { + void loadEvaluations(); + }, 2500); + return () => window.clearInterval(timer); + }, [loadEvaluations]); + + async function handleStart() { + if (runStatus === 'starting') return; + setRunError(null); + setEvaluationId(''); + setRunStatus('starting'); + + try { + const payload = await fetchJSON('/api/monitor/evaluations', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + dataset, + split, + start: Number(startIdx), + count: Number(sliceCount), + prompt_profile: promptProfile, + timeout_sec: Number(timeoutSec), + recursion_limit: Number(recursionLimit), + sandbox, + cwd: '/home/ubuntu/specops0/Projects/leonai-main', + arm: 'monitor', + }), + }); + const nextEvalId = String(payload?.evaluation_id || ''); + if (!nextEvalId) throw new Error('create evaluation returned empty evaluation_id'); + setEvaluationId(nextEvalId); + setRunStatus('submitted'); + setComposerOpen(false); + await loadEvaluations(); + } catch (e: any) { + setRunStatus('error'); + setRunError(e?.message || String(e)); + } + } + + const currentEval = evaluations.find((item: any) => item.evaluation_id === evaluationId); + const submissionPreview = { + dataset, + split, + start: Number(startIdx || '0'), + count: Number(sliceCount || '0'), + prompt_profile: promptProfile, + timeout_sec: Number(timeoutSec || '0'), + recursion_limit: Number(recursionLimit || '0'), + sandbox, + arm: 'monitor', + }; + const parameterReference = [ + ['Dataset', 'Benchmark source', 'Lite for fast iteration, Verified for strict runs'], + ['Split', 'Data partition', 'Use test for formal comparison'], + ['Start / Slice', 'Case range', 'Run small slices first, then scale up'], + ['Prompt Profile', 'Prompt strategy', 'Compare baseline vs heuristic in A/B'], + ['Timeout(s)', 'Per-case wall clock limit', '180~300 for initial runs'], + ['Recursion', 'Agent iteration budget', '256 default, raise to 512 for hard tasks'], + ['Sandbox', 'Execution provider', 'Use local for quick checks, daytona for infra parity'], + ]; + const statusReference = [ + ['queued', 'Job is persisted and waiting for executor slots.'], + ['running', 'At least one thread is active and writing status updates.'], + ['provisional', 'Artifacts are incomplete (missing eval summary or eval error). Score is not final.'], + ['completed', 'Runner finished and artifacts were written.'], + ['completed_with_errors', 'Runner finished, but summary reports failed items/errors.'], + ['error', 'Runner failed; open detail page to inspect stderr and trace.'], + ]; + const currentProgress = currentEval ? evalProgress(currentEval) : null; + + React.useEffect(() => { + window.scrollTo({ top: 0, left: 0, behavior: 'auto' }); + }, []); + React.useEffect(() => { + // @@@evaluation-query-open - allow deterministic screenshot/review entry to open config panel via ?new=1. + const query = new URLSearchParams(location.search); + setComposerOpen(query.get('new') === '1'); + }, [location.search]); + + return ( +
+

Evaluation

+

One evaluation contains many threads. Start jobs from config panel, track durable progress in list, then drill into thread trace.

+ +
+
+

1. Submit

+

Open config, choose scope/profile/sandbox, then submit one batch run.

+
+
+

2. Track

+

List auto-refreshes every 2.5s and survives reload. Status is backend-persisted.

+
+
+

3. Inspect

+

Open evaluation detail to jump to per-thread trace and tool-call timeline.

+
+
+ +
+
+

Current Submission

+

Latest evaluation submitted from this page.

+
evaluation: {evaluationId || '-'}
+

status: {currentEval?.status || runStatus}

+ {currentEval && currentProgress && ( +
+
phase: {String(currentEval.status || '-').toUpperCase()}
+
+
+
+
+ {formatProgressSummary(currentProgress)} +
+
+ )} + {runError &&
run error: {runError}
} + {evaluationId && ( +

+ open evaluation detail +

+ )} +
+ +
+

Start New Evaluation

+

Open a focused config panel. After submit, track progress in the evaluation list below.

+ +
+
+ +
+
+

Evaluations ({evalPagination?.total ?? evaluations.length})

+ +
+

+ Auto refresh: 2.5s {runsLoading ? '| loading...' : ''} + {' '}| page {evalPagination?.page ?? 1} +

+

Evaluation = one batch run. Progress shows total/completed/started-or-running/pending. Click Evaluation ID for detail trace and thread links.

+ + + + + + + + + + + + + + + {evaluations.map((item: any) => ( + + + + + + + + + + + ))} + {evaluations.length === 0 && ( + + + + )} + +
EvaluationDatasetRangeProfile / SandboxStatusProgressScoreUpdated
{shortId(item.evaluation_id, 14)}{item.dataset}{item.start_idx}..{item.start_idx + item.slice_count - 1}{item.prompt_profile || '-'} / {item.sandbox || '-'} + {(() => { + // @@@publishable-preferred - publishable is the canonical release gate; score_gate stays as compatibility fallback. + const publishable = item.score?.publishable ?? (item.score?.score_gate === 'final'); + return ( + <> +
{String(item.status || '-').toUpperCase()}
+
publishable: {publishable ? 'TRUE' : 'FALSE'}
+ + ); + })()} +
+ {(() => { + const p = evalProgress(item); + return ( +
+
+
+
+
{formatProgressSummary(p)}
+
+ ); + })()} +
+ {(item.score?.publishable ?? (item.score?.score_gate === 'final')) ? ( + <> +
R {formatResolvedScore(item)}
+
C {formatPct(item.score?.completed_rate_pct)} | T {formatPct(item.score?.tool_call_thread_rate_pct)}
+ + ) : ( + <> +
R PROVISIONAL
+
C - | T -
+ + )} +
{item.updated_ago || '-'}
No evaluations yet.
+
+ +

+ offset={evalPagination?.offset ?? 0} | limit={evalPagination?.limit ?? evalLimit} | total={evalPagination?.total ?? evaluations.length} +

+ +
+
+ +
+
+

Status Guide

+
    + {statusReference.map((row) => ( +
  • {row[0]}: {row[1]}
  • + ))} +
+
+
+

Field Guide

+
    + {parameterReference.slice(0, 4).map((row) => ( +
  • {row[0]}: {row[1]}
  • + ))} +
+
+
+ + {composerOpen && ( + // @@@evaluation-composer-modal - keep config editing in a fixed layer to avoid "tail jump" in long list pages. +
setComposerOpen(false)}> +
e.stopPropagation()}> +
+

New Evaluation Config

+ +
+

Configure run scope, profile and runtime, then submit.

+ +
+
+

Run Scope

+
+
+ + +

Benchmark source. Lite is faster; Verified is stricter and slower.

+
+
+ + +

Dataset partition. Use test for formal comparison.

+
+
+ + setStartIdx(e.target.value)} /> +

Starting index inside the selected split.

+
+
+ + +

How many items to run in this evaluation batch.

+
+
+
+ +
+

Agent Profile

+
+
+ + +

Prompt strategy passed to runner. Used for A/B profile comparison.

+
+
+ + setRecursionLimit(e.target.value)} /> +

Agent recursion/iteration budget per item.

+
+
+
+ +
+

Runtime

+
+
+ + setTimeoutSec(e.target.value)} /> +

Per-item wall-clock timeout in seconds.

+
+
+ + +

Execution environment provider for this run.

+
+
+
+ +
+
+ + +
+

Submits config to backend and starts an evaluation job.

+
+
+ +
+ Submission Preview +
{JSON.stringify(submissionPreview, null, 2)}
+
+ +
+ Parameter Reference + + + + + + + + + + {parameterReference.map((row) => ( + + + + + + ))} + +
FieldMeaningRecommendation
{row[0]}{row[1]}{row[2]}
+
+
+
+ )} +
+ ); +} + +function EvaluationDetailPage() { + const { evaluationId } = useParams(); + const [data, setData] = React.useState(null); + + React.useEffect(() => { + fetchAPI(`/evaluation/${evaluationId}`).then(setData); + }, [evaluationId]); + + if (!data) return
Loading...
; + const detailProgress = evalProgress({ + threads_done: data.info?.threads_done ?? 0, + threads_running: data.info?.threads_running ?? 0, + slice_count: data.info?.slice_count ?? data.info?.threads_total ?? 0, + progress_source: data.info?.progress_source ?? 'thread_rows', + }); + const threadStateLabel = detailProgress.mode === 'checkpoint_estimate' ? 'started' : 'running'; + const scoreGate = String(data.info?.score?.score_gate || 'provisional'); + const publishable = Boolean(data.info?.score?.publishable ?? (scoreGate === 'final')); + const scoreFinal = publishable; + const summaryReady = !!data.info?.score?.eval_summary_path; + + return ( +
+ +

Evaluation: {shortId(data.evaluation_id, 14)}

+

+ {data.info.status} | dataset={data.info.dataset} | {threadStateLabel}={data.info.threads_running}/{data.info.threads_total} + {' '}| gate={scoreGate} + {' '}| publishable={String(publishable)} + {' '}| score={scoreFinal ? `${data.info.score?.resolved_instances ?? 0}/${data.info.score?.total_instances ?? 0} (${formatPct(data.info.score?.primary_score_pct)})` : 'PROVISIONAL'} +

+
+
phase: {String(data.info.status || '-').toUpperCase()}
+
+
+
+
+ {formatProgressSummary(detailProgress)} +
+
+ +
+
Split: {data.info.split}
+
Start: {data.info.start_idx}
+
Count: {data.info.slice_count}
+
Profile: {data.info.prompt_profile}
+
Timeout: {data.info.timeout_sec}s
+
Recursion: {data.info.recursion_limit}
+
Score Gate: {scoreGate}
+
Publishable: {String(publishable)}
+
Summary: {summaryReady ? 'ready' : 'missing'}
+ {scoreFinal ? ( + <> +
Resolved: {data.info.score?.resolved_instances ?? 0}/{data.info.score?.total_instances ?? 0}
+
Resolved Rate: {formatPct(data.info.score?.resolved_rate_pct)}
+
Completed: {data.info.score?.completed_instances ?? 0}/{data.info.score?.total_instances ?? 0}
+
Completed Rate: {formatPct(data.info.score?.completed_rate_pct)}
+
Non-empty Patch: {data.info.score?.non_empty_patch_instances ?? 0}/{data.info.score?.total_instances ?? 0}
+
Non-empty Rate: {formatPct(data.info.score?.non_empty_patch_rate_pct)}
+
Empty Patch: {data.info.score?.empty_patch_instances ?? 0}/{data.info.score?.total_instances ?? 0}
+
Errors: {data.info.score?.error_instances ?? 0}
+
Trace Active: {data.info.score?.active_trace_threads ?? 0}/{data.info.score?.total_instances ?? 0}
+
Tool-call Threads: {data.info.score?.tool_call_threads ?? 0}/{data.info.score?.total_instances ?? 0}
+
Tool-call Coverage: {formatPct(data.info.score?.tool_call_thread_rate_pct)}
+
Tool Calls Total: {data.info.score?.tool_calls_total ?? 0}
+
Avg Tool Calls(active): {data.info.score?.avg_tool_calls_per_active_thread ?? '-'}
+
Recursion Cap Hits: {data.info.score?.recursion_cap_hits ?? 0}{data.info.score?.recursion_limit ? ` / cap ${data.info.score.recursion_limit}` : ''}
+ + ) : ( + <> +
Final Score: blocked (provisional)
+
Block Reason: {data.info.score?.manifest_eval_error ? 'manifest_eval_error' : 'missing_eval_summary'}
+ + )} +
Run Dir: {data.info.score?.run_dir || '-'}
+
+ +
+

{data.threads.title} ({data.threads.count})

+ + + + + + + + + + + + + + {data.threads.items.map((item: any) => ( + + + + + + + + + + ))} + {data.threads.items.length === 0 && ( + + + + )} + +
#ThreadSessionRunEventsStatusStart
{item.item_index} + + {evalThreadLabel(item.thread_id, data.evaluation_id)} + + + {item.session?.session_url ? ( + {shortId(item.session.session_id)} + ) : '-'} + {item.run?.run_id ? shortId(item.run.run_id, 12) : '-'}{item.run?.event_count ?? 0}{item.status}{item.start_idx}
No threads in this evaluation.
+
+
+ ); +} + // Layout: Top navigation +function ScrollToTopOnRouteChange() { + const { pathname } = useLocation(); + React.useEffect(() => { + // @@@history-scroll-restore-disable - browser may restore stale scroll offsets and make user land at page tail. + const prev = window.history.scrollRestoration; + window.history.scrollRestoration = 'manual'; + return () => { + window.history.scrollRestoration = prev; + }; + }, []); + React.useEffect(() => { + // @@@route-scroll-reset - switch tabs/details should always start from top to avoid "tail landing" confusion. + window.scrollTo({ top: 0, left: 0, behavior: 'auto' }); + }, [pathname]); + return null; +} + function Layout({ children }: { children: React.ReactNode }) { return (
@@ -463,16 +1959,20 @@ function Layout({ children }: { children: React.ReactNode }) { export default function App() { return ( + } /> } /> } /> + } /> } /> } /> } /> } /> } /> + } /> + } /> diff --git a/frontend/monitor/src/styles.css b/frontend/monitor/src/styles.css index 0b767eade..bb6c8ad20 100644 --- a/frontend/monitor/src/styles.css +++ b/frontend/monitor/src/styles.css @@ -253,11 +253,264 @@ section li { color: #e0e0e0; } -/* Loading */ -div:has(> :only-child:is(div:contains("Loading"))) { +.trace-summary { + white-space: pre-wrap; + word-break: break-word; + max-width: 56ch; +} + +.trace-actor { + display: inline-block; + padding: 0.15rem 0.45rem; + border-radius: 999px; + font-size: 0.75rem; + text-transform: uppercase; + letter-spacing: 0.04em; +} + +.trace-assistant { + background: #1f3a5a; + color: #8dc3ff; +} + +.trace-tool { + background: #2d3f24; + color: #a9e684; +} + +.trace-runtime { + background: #4b3d1f; + color: #f2c56b; +} + +.trace-details summary { + cursor: pointer; + color: #8db9ff; +} + +.trace-payload { + margin-top: 0.5rem; + max-height: 220px; + overflow: auto; + padding: 0.75rem; +} + +.trace-toolbar { + margin: 0.8rem 0; display: flex; - justify-content: center; + justify-content: flex-start; + flex-wrap: wrap; align-items: center; - min-height: 200px; - color: #888; + gap: 1rem; +} + +.trace-run-select { + display: flex; + align-items: center; + gap: 0.4rem; +} + +.trace-run-select select { + border: 1px solid #2e3e57; + background: #101721; + color: #dbe9f7; + border-radius: 6px; + padding: 0.3rem 0.45rem; +} + +.trace-filters { + display: flex; + gap: 0.4rem; +} + +.trace-view-switch { + display: flex; + gap: 0.4rem; +} + +.trace-filter-btn { + border: 1px solid #2e3e57; + background: #1a2432; + color: #9ec2ef; + border-radius: 6px; + padding: 0.28rem 0.62rem; + cursor: pointer; +} + +.trace-filter-btn.is-active { + background: #2a4f7a; + color: #e8f3ff; + border-color: #4d85bf; +} + +.trace-raw-toggle { + color: #9aa7b6; + font-size: 0.9rem; + display: flex; + align-items: center; + gap: 0.35rem; +} + +.trace-metrics { + display: flex; + gap: 1rem; + color: #91a4b8; + font-size: 0.9rem; +} + +.trace-timeline { + margin-top: 0.8rem; + display: flex; + flex-direction: column; + gap: 0.8rem; +} + +.trace-card { + border: 1px solid #2a2f36; + background: #12161c; + border-radius: 10px; + padding: 0.7rem 0.8rem; +} + +.trace-card-assistant { + border-left: 4px solid #4f7fd8; +} + +.trace-card-tool { + border-left: 4px solid #5f9446; +} + +.trace-card-runtime { + border-left: 4px solid #a07932; +} + +.trace-card-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 0.6rem; + gap: 0.8rem; +} + +.trace-card-meta { + display: flex; + align-items: center; + gap: 0.42rem; +} + +.trace-step { + color: #89a4c0; + font-family: 'SF Mono', Monaco, monospace; + font-size: 0.85rem; +} + +.trace-event { + color: #ccd6e0; + font-size: 0.85rem; + font-family: 'SF Mono', Monaco, monospace; +} + +.trace-run-id { + color: #8395aa; +} + +.trace-block-wrap { + display: flex; + flex-direction: column; + gap: 0.35rem; +} + +.trace-label { + color: #8ea0b4; + font-size: 0.78rem; + text-transform: uppercase; + letter-spacing: 0.04em; +} + +.trace-block { + background: #0c1014; + border: 1px solid #1f2732; + border-radius: 7px; + padding: 0.55rem 0.65rem; + font-family: 'SF Mono', Monaco, monospace; + font-size: 0.82rem; + color: #dae5f2; + white-space: pre-wrap; + word-break: break-word; + max-height: 300px; + overflow: auto; +} + +.trace-output { + max-height: 460px; +} + +.trace-assistant-text { + max-height: 340px; +} + +.trace-command { + color: #bde59d; +} + +.trace-empty { + border: 1px dashed #33404f; + color: #95a4b4; + border-radius: 8px; + padding: 1rem; +} + +.trace-raw-table { + margin-top: 1rem; +} + +.trace-step-card { + border: 1px solid #2a2f36; + background: #0f141b; + border-left: 4px solid #4f7fd8; + border-radius: 10px; + padding: 0.8rem; +} + +.conversation-card { + border: 1px solid #30363d; + background: #0d1117; + border-radius: 10px; + padding: 0.8rem; +} + +.trace-step-header { + display: flex; + justify-content: space-between; + gap: 0.8rem; + margin-bottom: 0.55rem; +} + +.trace-step-meta { + display: flex; + align-items: center; + gap: 0.6rem; +} + +.trace-step-index { + color: #e7f1ff; + font-weight: 600; +} + +.trace-step-block { + display: flex; + flex-direction: column; + gap: 0.28rem; + margin: 0.4rem 0 0.65rem; +} + +.trace-raw-item { + margin-top: 0.5rem; +} + +.trace-raw-item-title { + display: flex; + gap: 0.45rem; + color: #9eb3c9; + font-size: 0.82rem; } diff --git a/sandbox/capability.py b/sandbox/capability.py index a3a15a190..a4c2c6fd0 100644 --- a/sandbox/capability.py +++ b/sandbox/capability.py @@ -13,7 +13,7 @@ from pathlib import Path from typing import TYPE_CHECKING -from sandbox.interfaces.executor import BaseExecutor +from sandbox.interfaces.executor import AsyncCommand, BaseExecutor, ExecuteResult from sandbox.interfaces.filesystem import FileSystemBackend if TYPE_CHECKING: @@ -127,6 +127,64 @@ def _lookup_command_terminal_id(self, command_id: str) -> str | None: ).fetchone() return str(row[0]) if row else None + def _load_persisted_command(self, command_id: str) -> sqlite3.Row | None: + if self._db_path is None: + return None + with sqlite3.connect(str(self._db_path), timeout=30) as conn: + conn.execute("PRAGMA busy_timeout=30000") + conn.row_factory = sqlite3.Row + row = conn.execute( + """ + SELECT + tc.command_id, + tc.command_line, + tc.cwd, + tc.status, + tc.stdout, + tc.stderr, + tc.exit_code + FROM terminal_commands tc + LEFT JOIN abstract_terminals at ON at.terminal_id = tc.terminal_id + LEFT JOIN chat_sessions cs ON cs.chat_session_id = tc.chat_session_id + WHERE tc.command_id = ? + AND (at.thread_id = ? OR cs.thread_id = ?) + LIMIT 1 + """, + (command_id, self._session.thread_id, self._session.thread_id), + ).fetchone() + return row + + # @@@persisted-command-fallback - background command terminal/session can be cleaned before command_status; fallback to DB row keeps trace truthful. + def _command_from_row(self, row: sqlite3.Row) -> AsyncCommand: + status = str(row["status"] or "") + return AsyncCommand( + command_id=str(row["command_id"]), + command_line=str(row["command_line"] or ""), + cwd=str(row["cwd"] or ""), + stdout_buffer=[str(row["stdout"] or "")], + stderr_buffer=[str(row["stderr"] or "")], + exit_code=row["exit_code"], + done=status in {"done", "failed", "cancelled"}, + ) + + def _result_from_row(self, row: sqlite3.Row, timeout: float | None = None) -> ExecuteResult: + status = str(row["status"] or "") + if status in {"running", "pending"} and timeout is not None and timeout > 0: + return ExecuteResult( + exit_code=-1, + stdout=str(row["stdout"] or ""), + stderr=str(row["stderr"] or ""), + timed_out=True, + command_id=str(row["command_id"]), + ) + exit_code = row["exit_code"] + return ExecuteResult( + exit_code=int(exit_code) if exit_code is not None else 0, + stdout=str(row["stdout"] or ""), + stderr=str(row["stderr"] or ""), + command_id=str(row["command_id"]), + ) + def _resolve_session_for_terminal(self, terminal_id: str): if terminal_id == self._session.terminal.terminal_id: return self._session @@ -162,13 +220,31 @@ def _resolve_session_for_command(self, command_id: str): async def get_status(self, command_id: str): """Get status for an async command.""" - session = self._resolve_session_for_command(command_id) - return await session.runtime.get_command(command_id) + try: + session = self._resolve_session_for_command(command_id) + cmd = await session.runtime.get_command(command_id) + if cmd is not None: + return cmd + except RuntimeError: + pass + row = self._load_persisted_command(command_id) + if row is None: + return None + return self._command_from_row(row) async def wait_for(self, command_id: str, timeout: float | None = None): """Wait for async command completion.""" - session = self._resolve_session_for_command(command_id) - return await session.runtime.wait_for_command(command_id, timeout=timeout) + try: + session = self._resolve_session_for_command(command_id) + result = await session.runtime.wait_for_command(command_id, timeout=timeout) + if result is not None: + return result + except RuntimeError: + pass + row = self._load_persisted_command(command_id) + if row is None: + return None + return self._result_from_row(row, timeout=timeout) def store_completed_result(self, command_id: str, command_line: str, cwd: str, result): """Store completed result for command_status lookup.""" diff --git a/tests/test_integration_new_arch.py b/tests/test_integration_new_arch.py index caf265a5e..a581c6bea 100644 --- a/tests/test_integration_new_arch.py +++ b/tests/test_integration_new_arch.py @@ -252,6 +252,36 @@ async def test_running_async_command_visible_from_new_manager(self, temp_db, moc assert finished.exit_code == 0 assert "tick-3" in finished.stdout + @pytest.mark.asyncio + async def test_command_status_fallback_when_terminal_row_missing(self, sandbox_manager, temp_db): + thread_id = "test-thread-fallback-status" + capability = sandbox_manager.get_sandbox(thread_id) + + async_cmd = await capability.command.execute_async("echo fallback-ok") + done = await capability.command.wait_for(async_cmd.command_id, timeout=5.0) + assert done is not None + assert done.exit_code == 0 + assert "fallback-ok" in done.stdout + + with sqlite3.connect(str(temp_db), timeout=30) as conn: + row = conn.execute( + "SELECT terminal_id FROM terminal_commands WHERE command_id = ?", + (async_cmd.command_id,), + ).fetchone() + assert row is not None + conn.execute("DELETE FROM abstract_terminals WHERE terminal_id = ?", (row[0],)) + conn.commit() + + status = await capability.command.get_status(async_cmd.command_id) + assert status is not None + assert status.done + assert "fallback-ok" in "".join(status.stdout_buffer) + + done_again = await capability.command.wait_for(async_cmd.command_id, timeout=1.0) + assert done_again is not None + assert done_again.exit_code == 0 + assert "fallback-ok" in done_again.stdout + def test_terminal_state_persists_across_sessions(self, sandbox_manager, temp_db): """Test that terminal state persists when session expires.""" thread_id = "test-thread-4"