diff --git a/backend/web/routers/monitor.py b/backend/web/routers/monitor.py index 0d0449cba..eb1781db6 100644 --- a/backend/web/routers/monitor.py +++ b/backend/web/routers/monitor.py @@ -1,82 +1,91 @@ -"""Sandbox Monitor API - thin router over monitor core.""" +"""Monitor router compatibility layer. + +Expose the richer monitor implementation from ``backend.web.monitor`` while +preserving the newer resource/health helper endpoints added on main. +""" import asyncio -from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import HTTPException, Query +from pydantic import BaseModel, Field -from backend.web.core.dependencies import get_current_user_id +from backend.web.monitor import list_leases, router from backend.web.services import monitor_service from backend.web.services.resource_cache import ( get_monitor_resource_overview_snapshot, refresh_monitor_resource_overview_sync, ) -router = APIRouter(prefix="/api/monitor") - - -@router.get("/threads") -def list_threads(user_id: Annotated[str, Depends(get_current_user_id)]): - # TODO(multi-tenant): threads are stored in SQLite (sandbox DB) and linked to members via - # chat_sessions.member_id → members.owner_user_id. Filtering requires a JOIN-capable repo - # method. Add owner filtering once monitor_repo exposes query_threads(owner_user_id=...). - return monitor_service.list_threads() - - -@router.get("/thread/{thread_id}") -def get_thread(thread_id: str, user_id: Annotated[str, Depends(get_current_user_id)]): - return monitor_service.get_thread(thread_id) - - -@router.get("/leases") -def list_leases(user_id: Annotated[str, Depends(get_current_user_id)]): - return monitor_service.list_leases() - - -@router.get("/lease/{lease_id}") -def get_lease(lease_id: str, user_id: Annotated[str, Depends(get_current_user_id)]): - try: - return monitor_service.get_lease(lease_id) - except KeyError as e: - raise HTTPException(status_code=404, detail=str(e)) from e - -@router.get("/diverged") -def list_diverged(user_id: Annotated[str, Depends(get_current_user_id)]): - return monitor_service.list_diverged() - - -@router.get("/events") -def list_events(user_id: Annotated[str, Depends(get_current_user_id)], limit: int = 100): - return monitor_service.list_events(limit=limit) - - -@router.get("/event/{event_id}") -def get_event(event_id: str, user_id: Annotated[str, Depends(get_current_user_id)]): - try: - return monitor_service.get_event(event_id) - except KeyError as e: - raise HTTPException(status_code=404, detail=str(e)) from e +class ResourceCleanupRequest(BaseModel): + action: str = Field(default="cleanup_residue") + lease_ids: list[str] + expected_category: str @router.get("/health") -def health_snapshot(user_id: Annotated[str, Depends(get_current_user_id)]): +def health_snapshot(): return monitor_service.runtime_health_snapshot() +@router.get("/dashboard") +def dashboard_snapshot(): + health = monitor_service.runtime_health_snapshot() + resources = get_monitor_resource_overview_snapshot() + leases = list_leases() + + resource_summary = resources.get("summary") or {} + lease_summary = leases.get("summary") or {} + + return { + "snapshot_at": health.get("snapshot_at"), + "resources_summary": resource_summary, + "infra": { + "providers_active": int(resource_summary.get("active_providers") or 0), + "providers_unavailable": int(resource_summary.get("unavailable_providers") or 0), + "leases_total": int(lease_summary.get("total") or leases.get("count") or 0), + "leases_diverged": int(lease_summary.get("diverged") or 0) + int(lease_summary.get("orphan_diverged") or 0), + "leases_orphan": int(lease_summary.get("orphan") or 0) + int(lease_summary.get("orphan_diverged") or 0), + "leases_healthy": int(lease_summary.get("healthy") or 0), + }, + "workload": { + "db_sessions_total": int(((health.get("db") or {}).get("counts") or {}).get("chat_sessions") or 0), + "provider_sessions_total": int(((health.get("sessions") or {}).get("total")) or 0), + "running_sessions": int(resource_summary.get("running_sessions") or 0), + "evaluations_running": 0, + }, + "latest_evaluation": None, + } + + @router.get("/resources") -def resources_overview(user_id: Annotated[str, Depends(get_current_user_id)]): +def resources_overview(): return get_monitor_resource_overview_snapshot() @router.post("/resources/refresh") -async def resources_refresh(user_id: Annotated[str, Depends(get_current_user_id)]): +async def resources_refresh(): # @@@refresh-off-main-loop - provider I/O stays off event loop to avoid request head-of-line blocking. return await asyncio.to_thread(refresh_monitor_resource_overview_sync) +@router.post("/resources/cleanup") +async def resources_cleanup(payload: ResourceCleanupRequest): + from backend.web.services import monitor_service + + try: + return await asyncio.to_thread( + monitor_service.cleanup_resource_leases, + action=payload.action, + lease_ids=payload.lease_ids, + expected_category=payload.expected_category, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + @router.get("/sandbox/{lease_id}/browse") -async def sandbox_browse(lease_id: str, user_id: Annotated[str, Depends(get_current_user_id)], path: str = Query(default="/")): +async def sandbox_browse(lease_id: str, path: str = Query(default="/")): from backend.web.services.resource_service import sandbox_browse as _browse try: @@ -88,7 +97,7 @@ async def sandbox_browse(lease_id: str, user_id: Annotated[str, Depends(get_curr @router.get("/sandbox/{lease_id}/read") -async def sandbox_read_file(lease_id: str, user_id: Annotated[str, Depends(get_current_user_id)], path: str = Query(...)): +async def sandbox_read_file(lease_id: str, path: str = Query(...)): from backend.web.services.resource_service import sandbox_read as _read try: diff --git a/backend/web/services/monitor_service.py b/backend/web/services/monitor_service.py index 31f59b729..e813718a6 100644 --- a/backend/web/services/monitor_service.py +++ b/backend/web/services/monitor_service.py @@ -3,18 +3,29 @@ from __future__ import annotations import json +import re from datetime import UTC, datetime from typing import Any from backend.web.core.storage_factory import make_sandbox_monitor_repo from backend.web.services.sandbox_service import init_providers_and_managers, load_all_sessions +from storage.providers.sqlite.chat_session_repo import SQLiteChatSessionRepo from storage.providers.sqlite.kernel import SQLiteDBRole, resolve_role_db_path +from storage.providers.sqlite.lease_repo import SQLiteLeaseRepo # --------------------------------------------------------------------------- # Mapping helpers (private) # --------------------------------------------------------------------------- +def make_chat_session_repo() -> SQLiteChatSessionRepo: + return SQLiteChatSessionRepo(db_path=resolve_role_db_path(SQLiteDBRole.SANDBOX)) + + +def make_lease_repo() -> SQLiteLeaseRepo: + return SQLiteLeaseRepo(db_path=resolve_role_db_path(SQLiteDBRole.SANDBOX)) + + def _format_time_ago(iso_timestamp: str | None) -> str: if not iso_timestamp: return "never" @@ -75,6 +86,325 @@ def _lease_link(lease_id: str | None) -> dict[str, Any]: return {"lease_id": lease_id, "lease_url": f"/lease/{lease_id}" if lease_id else None} +LEASE_SEMANTIC_ORDER = [ + "orphan_diverged", + "diverged", + "orphan", + "healthy", +] + +LEASE_SEMANTIC_META = { + "orphan_diverged": { + "title": "Orphaned + Diverged", + "description": "Lease lost thread binding while desired and observed state still disagree.", + }, + "diverged": { + "title": "Diverged", + "description": "Lease is still attached to a thread, but runtime state has not converged.", + }, + "orphan": { + "title": "Orphans", + "description": "Lease has no active thread binding. Usually cleanup or historical residue.", + }, + "healthy": { + "title": "Healthy", + "description": "Lease has a thread binding and desired state matches observed state.", + }, +} + + +EVAL_NOTE_KEYS = [ + "runner", + "rc", + "sandbox", + "run_dir", + "stdout_log", + "stderr_log", +] + +LEASE_TRIAGE_ORDER = [ + "active_drift", + "detached_residue", + "orphan_cleanup", + "healthy_capacity", +] + +LEASE_TRIAGE_META = { + "active_drift": { + "title": "Active Drift", + "description": "Leases whose desired and observed state still disagree recently enough to warrant active operator attention.", + "tone": "warning", + }, + "detached_residue": { + "title": "Detached Residue", + "description": ( + "Leases still marked desired=running but observed=detached long after the runtime " + "stopped moving. Usually cleanup debt, not live pressure." + ), + "tone": "danger", + }, + "orphan_cleanup": { + "title": "Orphan Cleanup", + "description": "Lease rows that have already lost thread binding and mainly represent cleanup backlog or historical residue.", + "tone": "warning", + }, + "healthy_capacity": { + "title": "Healthy Capacity", + "description": "Leases with attached thread context and converged runtime state.", + "tone": "success", + }, +} + +DETACHED_RESIDUE_THRESHOLD_HOURS = 4.0 +RESOURCE_CLEANUP_ALLOWED_CATEGORIES = {"detached_residue", "orphan_cleanup"} +ACTIVE_CHAT_SESSION_STATUSES = {"active", "idle", "paused"} + + +def _classify_lease_semantics(*, thread_id: str | None, badge: dict[str, Any]) -> dict[str, str]: + is_orphan = not bool(thread_id) + is_converged = bool(badge.get("converged")) + if is_orphan and not is_converged: + category = "orphan_diverged" + elif not is_converged: + category = "diverged" + elif is_orphan: + category = "orphan" + else: + category = "healthy" + meta = LEASE_SEMANTIC_META[category] + return { + "category": category, + "title": meta["title"], + "description": meta["description"], + } + + +def _parse_local_timestamp(iso_timestamp: str | None) -> datetime | None: + if not iso_timestamp: + return None + cleaned = iso_timestamp + if "Z" in cleaned: + cleaned = cleaned.replace("Z", "") + if "+" in cleaned: + cleaned = cleaned.split("+")[0] + try: + return datetime.fromisoformat(cleaned) + except ValueError: + return None + + +def _hours_since(iso_timestamp: str | None) -> float | None: + dt = _parse_local_timestamp(iso_timestamp) + if dt is None: + return None + delta = datetime.now() - dt + return delta.total_seconds() / 3600 + + +def _classify_lease_triage( + *, + thread_id: str | None, + badge: dict[str, Any], + observed_state: str | None, + desired_state: str | None, + updated_at: str | None, +) -> dict[str, Any]: + observed = str(observed_state or "").strip().lower() or None + desired = str(desired_state or "").strip().lower() or None + age_hours = _hours_since(updated_at) + is_orphan = not bool(thread_id) + is_converged = bool(badge.get("converged")) + + if is_orphan: + key = "orphan_cleanup" + elif is_converged: + key = "healthy_capacity" + elif observed == "detached" and desired == "running" and age_hours is not None and age_hours >= DETACHED_RESIDUE_THRESHOLD_HOURS: + key = "detached_residue" + else: + key = "active_drift" + + meta = LEASE_TRIAGE_META[key] + return { + "category": key, + "title": meta["title"], + "description": meta["description"], + "tone": meta["tone"], + "age_hours": age_hours, + } + + +def _cleanable_lease_ids(lease_ids: list[str]) -> list[str]: + cleaned: list[str] = [] + seen: set[str] = set() + for raw in lease_ids: + lease_id = str(raw or "").strip() + if not lease_id or lease_id in seen: + continue + seen.add(lease_id) + cleaned.append(lease_id) + if not cleaned: + raise ValueError("lease_ids must contain at least one non-empty lease id") + return cleaned + + +def _triage_category_for_row(row: dict[str, Any]) -> str: + badge = _make_badge(row.get("desired_state"), row.get("observed_state")) + triage = _classify_lease_triage( + thread_id=row.get("thread_id"), + badge=badge, + observed_state=row.get("observed_state"), + desired_state=row.get("desired_state"), + updated_at=row.get("updated_at"), + ) + return str(triage["category"]) + + +def _extract_eval_note_value(notes: str, key: str) -> str | None: + match = re.search(rf"(?:^|[ |]){re.escape(key)}=([^ ]+)", notes) + if not match: + return None + return match.group(1).strip() + + +def build_evaluation_operator_surface( + *, + status: str, + notes: str, + score: dict[str, Any], + threads_total: int, + threads_running: int, + threads_done: int, +) -> dict[str, Any]: + extracted = {key: _extract_eval_note_value(notes, key) for key in EVAL_NOTE_KEYS} + rc_text = extracted.get("rc") + try: + rc = int(rc_text) if rc_text is not None else None + except ValueError: + rc = None + + scored = bool(score.get("scored")) + score_gate = str(score.get("score_gate") or "provisional") + artifacts = [ + { + "label": "Run directory", + "path": score.get("run_dir") or extracted.get("run_dir"), + }, + {"label": "Run manifest", "path": score.get("manifest_path")}, + {"label": "STDOUT log", "path": extracted.get("stdout_log")}, + {"label": "STDERR log", "path": extracted.get("stderr_log")}, + {"label": "Eval summary", "path": score.get("eval_summary_path")}, + {"label": "Trace summaries", "path": score.get("trace_summaries_path")}, + ] + artifacts = [ + { + **item, + "status": "present" if item["path"] else "missing", + } + for item in artifacts + ] + artifact_summary = { + "present": sum(1 for item in artifacts if item["status"] == "present"), + "missing": sum(1 for item in artifacts if item["status"] == "missing"), + "total": len(artifacts), + } + + facts = [ + {"label": "Status", "value": status}, + {"label": "Score gate", "value": score_gate}, + {"label": "Threads materialized", "value": str(threads_total)}, + {"label": "Threads running", "value": str(threads_running)}, + {"label": "Threads done", "value": str(threads_done)}, + ] + runner = extracted.get("runner") + if runner: + facts.append({"label": "Runner", "value": runner}) + if rc is not None: + facts.append({"label": "Exit code", "value": str(rc)}) + + kind = "collecting_runtime_evidence" + tone = "default" + headline = "Evaluation is still collecting runtime evidence." + summary = "Use the artifacts below to inspect progress and confirm whether thread rows are materializing." + next_steps = [ + "Open the run manifest to confirm the slice payload and output directory.", + "Inspect stdout/stderr before assuming the run is healthy.", + ] + + if status == "provisional" and not scored: + kind = "provisional_waiting_for_summary" + tone = "warning" + headline = "Evaluation is provisional. Final score is blocked." + summary = "This run has not produced the final eval summary yet, so publishable scoring is intentionally withheld." + next_steps = [ + "Check whether eval_summary_path is still missing because the run is ongoing or because the runner exited early.", + "Use stdout/stderr logs to confirm whether the solve phase actually started.", + ] + + if rc is not None and rc != 0 and threads_total == 0: + kind = "bootstrap_failure" + tone = "danger" + headline = "Runner exited before evaluation threads materialized." + summary = "Treat this as a bootstrap failure, not as an empty successful run. No evaluation thread rows were created." + next_steps = [ + "Inspect STDERR first to find the failing bootstrap step.", + "Use the run manifest and stdout log to confirm whether the slice was prepared before exit.", + "Re-run only after the failing dependency or model configuration is understood.", + ] + elif status == "running" and threads_total == 0 and threads_running > 0: + kind = "running_waiting_for_threads" + tone = "default" + headline = "Evaluation is actively running while thread rows catch up." + summary = ( + "The runner is alive, but thread rows have not materialized yet. Treat this as an ingestion lag window, not as an empty run." + ) + next_steps = [ + "Refresh after the first thread row materializes.", + "Use stdout/stderr to confirm the solve loop is still advancing.", + ] + elif status == "running": + kind = "running_active" + tone = "default" + headline = "Evaluation is actively running." + summary = "Thread rows and traces may lag behind the runner. Use live progress and logs before declaring drift." + next_steps = [ + "Refresh after new thread rows materialize.", + "Inspect traces only after the first active thread appears.", + ] + elif status == "completed_with_errors" and scored: + kind = "completed_with_errors" + tone = "warning" + headline = "Evaluation completed with recorded errors." + summary = ( + "Some thread rows reached completion, but at least one instance recorded an error. Treat this as reviewable but not clean." + ) + next_steps = [ + "Inspect error-bearing threads before comparing this run against cleaner baselines.", + "Use eval summary and trace summaries to isolate failing instances.", + ] + elif status == "completed" and scored: + kind = "completed_publishable" + tone = "success" + headline = "Evaluation finished with a publishable score surface." + summary = "Score artifacts are present. Use the thread table to drill into trace-level evidence." + next_steps = [ + "Open threads with low-quality traces and inspect tool-call detail.", + "Use the eval summary and trace summaries to compare runs.", + ] + + return { + "kind": kind, + "tone": tone, + "headline": headline, + "summary": summary, + "facts": facts, + "artifacts": artifacts, + "artifact_summary": artifact_summary, + "next_steps": next_steps, + "raw_notes": notes, + } + + # --------------------------------------------------------------------------- # Mappers (private) # --------------------------------------------------------------------------- @@ -130,21 +460,82 @@ def _map_thread_detail(thread_id: str, sessions: list[dict[str, Any]]) -> dict[s def _map_leases(rows: list[dict[str, Any]]) -> dict[str, Any]: - items = [ - { - "lease_id": row["lease_id"], - "lease_url": f"/lease/{row['lease_id']}", - "provider": row["provider_name"], - "instance_id": row["current_instance_id"], - "thread": _thread_ref(row["thread_id"]), - "state_badge": _make_badge(row["desired_state"], row["observed_state"]), - "error": row["last_error"], - "updated_at": row["updated_at"], - "updated_ago": _format_time_ago(row["updated_at"]), - } - for row in rows - ] - return {"title": "All Leases", "count": len(items), "items": items} + items = [] + for row in rows: + badge = _make_badge(row["desired_state"], row["observed_state"]) + triage = _classify_lease_triage( + thread_id=row["thread_id"], + badge=badge, + observed_state=row["observed_state"], + desired_state=row["desired_state"], + updated_at=row["updated_at"], + ) + items.append( + { + "lease_id": row["lease_id"], + "lease_url": f"/lease/{row['lease_id']}", + "provider": row["provider_name"], + "instance_id": row["current_instance_id"], + "thread": _thread_ref(row["thread_id"]), + "state_badge": badge, + "semantics": _classify_lease_semantics(thread_id=row["thread_id"], badge=badge), + "triage": triage, + "error": row["last_error"], + "updated_at": row["updated_at"], + "updated_ago": _format_time_ago(row["updated_at"]), + } + ) + + summary = {key: 0 for key in LEASE_SEMANTIC_ORDER} + for item in items: + summary[item["semantics"]["category"]] += 1 + summary["total"] = len(items) + + groups = [] + for key in LEASE_SEMANTIC_ORDER: + meta = LEASE_SEMANTIC_META[key] + group_items = [item for item in items if item["semantics"]["category"] == key] + groups.append( + { + "key": key, + "title": meta["title"], + "description": meta["description"], + "count": len(group_items), + "items": group_items, + } + ) + + triage_summary = {key: 0 for key in LEASE_TRIAGE_ORDER} + for item in items: + triage_summary[item["triage"]["category"]] += 1 + triage_summary["total"] = len(items) + + triage_groups = [] + for key in LEASE_TRIAGE_ORDER: + meta = LEASE_TRIAGE_META[key] + group_items = [item for item in items if item["triage"]["category"] == key] + triage_groups.append( + { + "key": key, + "title": meta["title"], + "description": meta["description"], + "tone": meta["tone"], + "count": len(group_items), + "items": group_items, + } + ) + + return { + "title": "All Leases", + "count": len(items), + "summary": summary, + "groups": groups, + "triage": { + "summary": triage_summary, + "groups": triage_groups, + }, + "items": items, + } def _map_lease_detail( @@ -192,6 +583,47 @@ def _map_lease_detail( } +def _historical_lease_detail( + lease_id: str, + sessions: list[dict[str, Any]], + events: list[dict[str, Any]], +) -> dict[str, Any] | None: + if not sessions and not events: + return None + + created_candidates = [ + str(value) for value in [*(row.get("started_at") for row in sessions), *(row.get("created_at") for row in events)] if value + ] + updated_candidates = [ + str(value) + for value in [ + *(row.get("ended_at") or row.get("started_at") for row in sessions), + *(row.get("created_at") for row in events), + ] + if value + ] + first_session = sessions[0] if sessions else {} + thread_ids: list[str] = [] + seen_threads: set[str] = set() + for row in sessions: + thread_id = str(row.get("thread_id") or "").strip() + if thread_id and thread_id not in seen_threads: + seen_threads.add(thread_id) + thread_ids.append(thread_id) + + lease = { + "provider_name": first_session.get("provider_name") or "unknown", + "current_instance_id": first_session.get("current_instance_id"), + "created_at": min(created_candidates) if created_candidates else None, + "updated_at": max(updated_candidates) if updated_candidates else None, + "desired_state": first_session.get("desired_state"), + "observed_state": first_session.get("observed_state"), + "last_error": first_session.get("last_error"), + } + threads = [{"thread_id": thread_id} for thread_id in thread_ids] + return _map_lease_detail(lease_id, lease, threads, events) + + def _map_diverged(rows: list[dict[str, Any]]) -> dict[str, Any]: items = [ { @@ -297,16 +729,152 @@ def list_leases() -> dict[str, Any]: repo.close() +def cleanup_resource_leases( + *, + action: str, + lease_ids: list[str], + expected_category: str, +) -> dict[str, Any]: + if action != "cleanup_residue": + raise ValueError(f"Unsupported cleanup action: {action}") + if expected_category not in RESOURCE_CLEANUP_ALLOWED_CATEGORIES: + raise ValueError("expected_category must be one of: detached_residue, orphan_cleanup") + + target_lease_ids = _cleanable_lease_ids(lease_ids) + monitor_repo = make_sandbox_monitor_repo() + lease_repo = make_lease_repo() + chat_session_repo = make_chat_session_repo() + try: + rows_by_id = {str(row.get("lease_id") or ""): row for row in monitor_repo.query_leases() if row.get("lease_id")} + providers, _ = init_providers_and_managers() + cleaned: list[dict[str, Any]] = [] + skipped: list[str] = [] + errors: list[dict[str, Any]] = [] + + for lease_id in target_lease_ids: + row = rows_by_id.get(lease_id) + if row is None: + skipped.append(lease_id) + errors.append({"lease_id": lease_id, "reason": "lease_not_found"}) + continue + + actual_category = _triage_category_for_row(row) + if actual_category != expected_category: + skipped.append(lease_id) + errors.append( + { + "lease_id": lease_id, + "reason": "category_mismatch", + "expected_category": expected_category, + "actual_category": actual_category, + } + ) + continue + + sessions = monitor_repo.query_lease_sessions(lease_id) + live_session_ids = [ + str(session.get("chat_session_id")) + for session in sessions + if str(session.get("status") or "").strip().lower() in ACTIVE_CHAT_SESSION_STATUSES + ] + if live_session_ids: + skipped.append(lease_id) + errors.append( + { + "lease_id": lease_id, + "reason": "live_sessions_present", + "session_ids": live_session_ids, + } + ) + continue + + if chat_session_repo.lease_has_running_command(lease_id): + skipped.append(lease_id) + errors.append({"lease_id": lease_id, "reason": "running_command_present"}) + continue + + provider_name = str(row.get("provider_name") or "").strip() + instance_id = str(row.get("current_instance_id") or "").strip() or None + if instance_id: + provider = providers.get(provider_name) + if provider is None: + skipped.append(lease_id) + errors.append( + { + "lease_id": lease_id, + "reason": "provider_unavailable", + "provider": provider_name, + } + ) + continue + if not provider.get_capability().can_destroy: + skipped.append(lease_id) + errors.append( + { + "lease_id": lease_id, + "reason": "provider_destroy_unsupported", + "provider": provider_name, + } + ) + continue + try: + destroyed = provider.destroy_session(instance_id, sync=True) + except Exception as exc: + skipped.append(lease_id) + errors.append( + { + "lease_id": lease_id, + "reason": "provider_destroy_failed", + "provider": provider_name, + "detail": str(exc), + } + ) + continue + if not destroyed: + skipped.append(lease_id) + errors.append( + { + "lease_id": lease_id, + "reason": "provider_destroy_failed", + "provider": provider_name, + "detail": "destroy_session returned false", + } + ) + continue + + lease_repo.delete(lease_id) + cleaned.append({"lease_id": lease_id, "category": actual_category}) + + refreshed_summary = list_leases()["triage"]["summary"] + return { + "action": action, + "expected_category": expected_category, + "attempted": target_lease_ids, + "cleaned": cleaned, + "skipped": skipped, + "errors": errors, + "refreshed_summary": refreshed_summary, + } + finally: + chat_session_repo.close() + lease_repo.close() + monitor_repo.close() + + def get_lease(lease_id: str) -> dict[str, Any]: repo = make_sandbox_monitor_repo() try: lease = repo.query_lease(lease_id) - if not lease: - raise KeyError("Lease not found") threads = repo.query_lease_threads(lease_id) events = repo.query_lease_events(lease_id) + sessions = repo.query_lease_sessions(lease_id) finally: repo.close() + if not lease: + fallback = _historical_lease_detail(lease_id, sessions, events) + if fallback: + return fallback + raise KeyError("Lease not found") return _map_lease_detail(lease_id, lease, threads, events) diff --git a/backend/web/services/resource_cache.py b/backend/web/services/resource_cache.py index afc4da809..62846a653 100644 --- a/backend/web/services/resource_cache.py +++ b/backend/web/services/resource_cache.py @@ -10,7 +10,7 @@ from datetime import UTC, datetime from typing import Any -from backend.web.services import resource_service +from backend.web.services import monitor_service, resource_service _DEFAULT_REFRESH_INTERVAL_SEC = 90.0 @@ -18,12 +18,16 @@ _snapshot_cache: dict[str, Any] | None = None -def clear_monitor_resource_overview_cache() -> None: +def clear_resource_overview_cache() -> None: with _snapshot_lock: global _snapshot_cache _snapshot_cache = None +def clear_monitor_resource_overview_cache() -> None: + clear_resource_overview_cache() + + def _now_iso() -> str: return datetime.now(UTC).isoformat().replace("+00:00", "Z") @@ -55,6 +59,13 @@ def _with_refresh_metadata( return payload +def _attach_monitor_triage(payload: dict[str, Any]) -> dict[str, Any]: + lease_payload = monitor_service.list_leases() + triage = lease_payload.get("triage") or {"summary": {}, "groups": []} + payload["triage"] = triage + return payload + + def _snapshot_drifted_from_live_sessions(snapshot: dict[str, Any]) -> bool: live_stats = resource_service.visible_resource_session_stats() for provider in snapshot.get("providers") or []: @@ -72,12 +83,13 @@ def _snapshot_drifted_from_live_sessions(snapshot: dict[str, Any]) -> bool: return False -def refresh_monitor_resource_overview_sync() -> dict[str, Any]: - """Refresh cached monitor overview snapshot and return latest payload.""" +def refresh_resource_overview_sync() -> dict[str, Any]: + """Refresh cached overview snapshot and return latest payload.""" global _snapshot_cache started = time.perf_counter() try: payload = resource_service.list_resource_providers() + payload = _attach_monitor_triage(payload) duration_ms = (time.perf_counter() - started) * 1000 payload = _with_refresh_metadata(payload, duration_ms=duration_ms, status="ok", error=None) with _snapshot_lock: @@ -96,8 +108,12 @@ def refresh_monitor_resource_overview_sync() -> dict[str, Any]: return degraded -def get_monitor_resource_overview_snapshot() -> dict[str, Any]: - """Return cached monitor snapshot; perform one synchronous refresh on cold start.""" +def refresh_monitor_resource_overview_sync() -> dict[str, Any]: + return refresh_resource_overview_sync() + + +def get_resource_overview_snapshot() -> dict[str, Any]: + """Return cached snapshot; perform one synchronous refresh on cold start.""" with _snapshot_lock: cached = copy.deepcopy(_snapshot_cache) if cached is not None: @@ -105,14 +121,18 @@ def get_monitor_resource_overview_snapshot() -> dict[str, Any]: # starts; if the cached Resources snapshot no longer matches visible lease/session # counts, refresh synchronously instead of serving a stale zero-sandbox card. if _snapshot_drifted_from_live_sessions(cached): - return refresh_monitor_resource_overview_sync() + return refresh_resource_overview_sync() return cached # @@@cold-start-cache-fill - route fallback fills cache once to keep first call deterministic. - return refresh_monitor_resource_overview_sync() + return refresh_resource_overview_sync() -async def monitor_resource_overview_refresh_loop() -> None: - """Continuously refresh the global monitor resource snapshot.""" +def get_monitor_resource_overview_snapshot() -> dict[str, Any]: + return get_resource_overview_snapshot() + + +async def resource_overview_refresh_loop() -> None: + """Continuously refresh resource overview snapshot.""" interval_sec = _read_refresh_interval_sec() while True: # @@@delayed-first-probe - avoid probe I/O at startup; keeps app boot and testclient deterministic. @@ -131,10 +151,14 @@ async def monitor_resource_overview_refresh_loop() -> None: try: # @@@refresh-loop-timebox - provider SDK calls may block; timebox to keep shutdown responsive. - await asyncio.wait_for(asyncio.to_thread(refresh_monitor_resource_overview_sync), timeout=10.0) + await asyncio.wait_for(asyncio.to_thread(refresh_resource_overview_sync), timeout=10.0) except asyncio.CancelledError: raise except TimeoutError: print("[monitor] resource refresh loop timeout") except Exception as exc: print(f"[monitor] resource refresh loop error: {exc}") + + +async def monitor_resource_overview_refresh_loop() -> None: + await resource_overview_refresh_loop() diff --git a/storage/contracts.py b/storage/contracts.py index 40f6e6406..cc97e41de 100644 --- a/storage/contracts.py +++ b/storage/contracts.py @@ -99,6 +99,28 @@ def list_all(self) -> list[dict[str, Any]]: ... def cleanup_expired(self) -> list[str]: ... +class SandboxMonitorRepo(Protocol): + """Read-only monitor queries over sandbox/session/lease state.""" + + def close(self) -> None: ... + def query_threads(self, *, thread_id: str | None = None) -> list[dict[str, Any]]: ... + def query_thread_summary(self, thread_id: str) -> dict[str, Any] | None: ... + def query_thread_sessions(self, thread_id: str) -> list[dict[str, Any]]: ... + def query_leases(self) -> list[dict[str, Any]]: ... + def list_leases_with_threads(self) -> list[dict[str, Any]]: ... + def query_lease(self, lease_id: str) -> dict[str, Any] | None: ... + def query_lease_sessions(self, lease_id: str) -> list[dict[str, Any]]: ... + def query_lease_threads(self, lease_id: str) -> list[dict[str, Any]]: ... + def query_lease_events(self, lease_id: str) -> list[dict[str, Any]]: ... + def query_diverged(self) -> list[dict[str, Any]]: ... + def query_events(self, limit: int = 100) -> list[dict[str, Any]]: ... + def query_event(self, event_id: str) -> dict[str, Any] | None: ... + def count_rows(self, table_names: list[str]) -> dict[str, int]: ... + def list_sessions_with_leases(self) -> list[dict[str, Any]]: ... + def list_probe_targets(self) -> list[dict[str, Any]]: ... + def query_lease_instance_id(self, lease_id: str) -> str | None: ... + + # --------------------------------------------------------------------------- # Member-Chat — enums + row types # --------------------------------------------------------------------------- diff --git a/storage/providers/sqlite/sandbox_monitor_repo.py b/storage/providers/sqlite/sandbox_monitor_repo.py index 03257c4ae..406366859 100644 --- a/storage/providers/sqlite/sandbox_monitor_repo.py +++ b/storage/providers/sqlite/sandbox_monitor_repo.py @@ -29,25 +29,46 @@ def __init__(self, db_path: str | Path | None = None) -> None: def close(self) -> None: self._conn.close() - def query_threads(self) -> list[dict]: - rows = self._conn.execute( - """ - SELECT - cs.thread_id, - COUNT(DISTINCT cs.chat_session_id) as session_count, - MAX(cs.last_active_at) as last_active, - sl.lease_id, - sl.provider_name, - sl.desired_state, - sl.observed_state, - sl.current_instance_id - FROM chat_sessions cs - LEFT JOIN sandbox_leases sl ON cs.lease_id = sl.lease_id - WHERE cs.status != 'closed' - GROUP BY cs.thread_id - ORDER BY MAX(cs.last_active_at) DESC - """ - ).fetchall() + def query_threads(self, *, thread_id: str | None = None) -> list[dict]: + if thread_id is None: + rows = self._conn.execute( + """ + SELECT + cs.thread_id, + COUNT(DISTINCT cs.chat_session_id) as session_count, + MAX(cs.last_active_at) as last_active, + sl.lease_id, + sl.provider_name, + sl.desired_state, + sl.observed_state, + sl.current_instance_id + FROM chat_sessions cs + LEFT JOIN sandbox_leases sl ON cs.lease_id = sl.lease_id + WHERE cs.status != 'closed' + GROUP BY cs.thread_id + ORDER BY MAX(cs.last_active_at) DESC + """ + ).fetchall() + else: + rows = self._conn.execute( + """ + SELECT + cs.thread_id, + COUNT(DISTINCT cs.chat_session_id) as session_count, + MAX(cs.last_active_at) as last_active, + sl.lease_id, + sl.provider_name, + sl.desired_state, + sl.observed_state, + sl.current_instance_id + FROM chat_sessions cs + LEFT JOIN sandbox_leases sl ON cs.lease_id = sl.lease_id + WHERE cs.status != 'closed' AND cs.thread_id = ? + GROUP BY cs.thread_id + ORDER BY MAX(cs.last_active_at) DESC + """, + (thread_id,), + ).fetchall() return [_row_to_dict(r) for r in rows] def query_thread_summary(self, thread_id: str) -> dict | None: @@ -186,6 +207,31 @@ def query_lease(self, lease_id: str) -> dict | None: ).fetchone() return _row_to_dict(row) if row else None + def query_lease_sessions(self, lease_id: str) -> list[dict]: + rows = self._conn.execute( + """ + SELECT + cs.chat_session_id, + cs.thread_id, + cs.status, + cs.started_at, + cs.ended_at, + cs.close_reason, + cs.lease_id, + sl.provider_name, + sl.desired_state, + sl.observed_state, + sl.current_instance_id, + sl.last_error + FROM chat_sessions cs + LEFT JOIN sandbox_leases sl ON cs.lease_id = sl.lease_id + WHERE cs.lease_id = ? + ORDER BY cs.started_at DESC + """, + (lease_id,), + ).fetchall() + return [_row_to_dict(r) for r in rows] + def query_lease_threads(self, lease_id: str) -> list[dict]: rows = self._conn.execute( """ diff --git a/storage/providers/supabase/sandbox_monitor_repo.py b/storage/providers/supabase/sandbox_monitor_repo.py index 2de7749e0..cfc647008 100644 --- a/storage/providers/supabase/sandbox_monitor_repo.py +++ b/storage/providers/supabase/sandbox_monitor_repo.py @@ -181,6 +181,39 @@ def query_lease(self, lease_id: str) -> dict | None: ) return dict(rows[0]) if rows else None + def query_lease_sessions(self, lease_id: str) -> list[dict]: + sessions = q.rows( + q.order( + self._client.table("chat_sessions") + .select("chat_session_id,thread_id,status,started_at,ended_at,close_reason,lease_id") + .eq("lease_id", lease_id), + "started_at", + desc=True, + repo=_REPO, + operation="query_lease_sessions", + ).execute(), + _REPO, + "query_lease_sessions", + ) + lease = self.query_lease(lease_id) + return [ + { + "chat_session_id": session.get("chat_session_id"), + "thread_id": session.get("thread_id"), + "status": session.get("status"), + "started_at": session.get("started_at"), + "ended_at": session.get("ended_at"), + "close_reason": session.get("close_reason"), + "lease_id": session.get("lease_id"), + "provider_name": lease.get("provider_name") if lease else None, + "desired_state": lease.get("desired_state") if lease else None, + "observed_state": lease.get("observed_state") if lease else None, + "current_instance_id": lease.get("current_instance_id") if lease else None, + "last_error": lease.get("last_error") if lease else None, + } + for session in sessions + ] + def query_lease_threads(self, lease_id: str) -> list[dict]: rows = q.rows( q.order( @@ -303,7 +336,6 @@ def count_rows(self, table_names: list[str]) -> dict[str, int]: return counts def list_sessions_with_leases(self) -> list[dict]: - # Active sessions joined with leases active_sessions = q.rows( self._client.table("chat_sessions").select("chat_session_id,thread_id,lease_id,started_at").neq("status", "closed").execute(), _REPO, @@ -318,20 +350,30 @@ def list_sessions_with_leases(self) -> list[dict]: ) lease_map = {le["lease_id"]: le for le in leases} - # Terminals for fallback all_terminals = q.rows( self._client.table("abstract_terminals").select("lease_id,thread_id,created_at").execute(), _REPO, "list_sessions_with_leases terminals", ) - term_map: dict[str, str] = {} - for t in sorted(all_terminals, key=lambda x: x.get("created_at") or ""): - term_map[t["lease_id"]] = t["thread_id"] + terminal_rows_by_lease: dict[str, list[dict[str, Any]]] = {} + for row in all_terminals: + terminal_rows_by_lease.setdefault(str(row.get("lease_id") or ""), []).append(dict(row)) + + all_sessions = q.rows( + self._client.table("chat_sessions").select("chat_session_id,thread_id,lease_id,status,started_at").execute(), + _REPO, + "list_sessions_with_leases all_sessions", + ) + latest_session_thread_by_lease: dict[str, str] = {} + for row in sorted(all_sessions, key=lambda x: x.get("started_at") or ""): + lease_id = str(row.get("lease_id") or "") + thread_id = str(row.get("thread_id") or "") + if lease_id and thread_id: + latest_session_thread_by_lease[lease_id] = thread_id result = [] seen_leases: set[str] = set() - # Active sessions for s in active_sessions: lease = lease_map.get(s.get("lease_id") or "") if not lease: @@ -349,17 +391,31 @@ def list_sessions_with_leases(self) -> list[dict]: } ) - # Terminal fallback for leases with no active session for lease in leases: lid = lease["lease_id"] if lid in seen_leases: continue - thread_id = term_map.get(lid) + terminal_rows = terminal_rows_by_lease.get(lid, []) + if terminal_rows: + for terminal_row in terminal_rows: + result.append( + { + "provider": lease.get("provider_name") or "local", + "session_id": None, + "thread_id": terminal_row.get("thread_id"), + "lease_id": lid, + "observed_state": lease.get("observed_state"), + "desired_state": lease.get("desired_state"), + "created_at": lease.get("created_at"), + } + ) + continue + result.append( { "provider": lease.get("provider_name") or "local", "session_id": None, - "thread_id": thread_id, + "thread_id": latest_session_thread_by_lease.get(lid), "lease_id": lid, "observed_state": lease.get("observed_state"), "desired_state": lease.get("desired_state"), diff --git a/tests/Integration/test_monitor_resources_route.py b/tests/Integration/test_monitor_resources_route.py new file mode 100644 index 000000000..95a82d809 --- /dev/null +++ b/tests/Integration/test_monitor_resources_route.py @@ -0,0 +1,178 @@ +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from backend.web.core.dependencies import get_current_user_id +from backend.web.routers import monitor, resources + + +def _build_monitor_test_app(*, include_product_resources: bool = False) -> FastAPI: + app = FastAPI() + app.include_router(monitor.router) + if include_product_resources: + app.include_router(resources.router) + app.dependency_overrides[get_current_user_id] = lambda: "user-test" + return app + + +def _stub_monitor_resource_snapshot(monkeypatch): + snapshot = { + "summary": { + "snapshot_at": "2026-04-07T00:00:00Z", + "last_refreshed_at": "2026-04-07T00:00:00Z", + "refresh_status": "fresh", + "running_sessions": 0, + "active_providers": 0, + "unavailable_providers": 0, + }, + "providers": [], + "triage": { + "summary": { + "total": 0, + "active_drift": 0, + "detached_residue": 0, + "orphan_cleanup": 0, + "healthy_capacity": 0, + }, + "groups": [], + }, + } + + monkeypatch.setattr(monitor, "get_monitor_resource_overview_snapshot", lambda: snapshot) + monkeypatch.setattr(monitor, "refresh_monitor_resource_overview_sync", lambda: snapshot) + return snapshot + + +def test_monitor_resources_route_smoke(monkeypatch): + _stub_monitor_resource_snapshot(monkeypatch) + + with TestClient(_build_monitor_test_app()) as client: + response = client.get("/api/monitor/resources") + + assert response.status_code == 200 + payload = response.json() + assert "summary" in payload + assert "providers" in payload + assert "triage" in payload + assert "snapshot_at" in payload["summary"] + assert "running_sessions" in payload["summary"] + assert isinstance(payload["providers"], list) + assert set(payload["triage"]["summary"]).issuperset({"total", "active_drift", "detached_residue", "orphan_cleanup", "healthy_capacity"}) + assert isinstance(payload["triage"]["groups"], list) + + +def test_monitor_resources_refresh_route_smoke(monkeypatch): + _stub_monitor_resource_snapshot(monkeypatch) + + with TestClient(_build_monitor_test_app()) as client: + response = client.post("/api/monitor/resources/refresh") + + assert response.status_code == 200 + payload = response.json() + assert "summary" in payload + assert "providers" in payload + assert "triage" in payload + assert "last_refreshed_at" in payload["summary"] + assert "refresh_status" in payload["summary"] + assert set(payload["triage"]["summary"]).issuperset({"total", "active_drift", "detached_residue", "orphan_cleanup", "healthy_capacity"}) + + +def test_monitor_and_product_resource_routes_coexist_intentionally(monkeypatch): + from backend.web.services import resource_projection_service + + _stub_monitor_resource_snapshot(monkeypatch) + monkeypatch.setattr( + resource_projection_service, + "list_user_resource_providers", + lambda *_args, **_kwargs: {"summary": {"snapshot_at": "now"}, "providers": []}, + ) + + with TestClient(_build_monitor_test_app(include_product_resources=True)) as client: + monitor_response = client.get("/api/monitor/resources") + product_response = client.get("/api/resources/overview") + + assert monitor_response.status_code == 200 + assert product_response.status_code == 200 + + +def test_monitor_health_route_smoke(): + with TestClient(_build_monitor_test_app()) as client: + response = client.get("/api/monitor/health") + + assert response.status_code == 200 + payload = response.json() + assert "snapshot_at" in payload + assert "db" in payload + assert "sessions" in payload + + +def test_monitor_dashboard_route_smoke(monkeypatch): + _stub_monitor_resource_snapshot(monkeypatch) + + with TestClient(_build_monitor_test_app()) as client: + response = client.get("/api/monitor/dashboard") + + assert response.status_code == 200 + payload = response.json() + assert "snapshot_at" in payload + assert "resources_summary" in payload + assert "infra" in payload + assert "workload" in payload + assert "latest_evaluation" in payload + + +def test_monitor_leases_route_exposes_summary_and_groups(): + with TestClient(_build_monitor_test_app()) as client: + response = client.get("/api/monitor/leases") + + assert response.status_code == 200 + payload = response.json() + assert "summary" in payload + assert "groups" in payload + assert "triage" in payload + assert set(payload["summary"]).issuperset({"total", "healthy", "diverged", "orphan", "orphan_diverged"}) + assert isinstance(payload["groups"], list) + assert set(payload["triage"]["summary"]).issuperset({"total", "active_drift", "detached_residue", "orphan_cleanup", "healthy_capacity"}) + assert isinstance(payload["triage"]["groups"], list) + + +def test_monitor_resources_cleanup_route_forwards_structured_payload(monkeypatch): + from backend.web.services import monitor_service + + monkeypatch.setattr( + monitor_service, + "cleanup_resource_leases", + lambda *, action, lease_ids, expected_category: { + "action": action, + "expected_category": expected_category, + "attempted": list(lease_ids), + "cleaned": [{"lease_id": "lease-1", "category": expected_category}], + "skipped": [], + "errors": [], + "refreshed_summary": { + "total": 1, + "active_drift": 0, + "detached_residue": 0, + "orphan_cleanup": 1, + "healthy_capacity": 0, + }, + }, + ) + + with TestClient(_build_monitor_test_app()) as client: + response = client.post( + "/api/monitor/resources/cleanup", + json={ + "action": "cleanup_residue", + "lease_ids": ["lease-1"], + "expected_category": "detached_residue", + }, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["action"] == "cleanup_residue" + assert payload["attempted"] == ["lease-1"] + assert payload["cleaned"] == [{"lease_id": "lease-1", "category": "detached_residue"}] + assert payload["skipped"] == [] + assert payload["errors"] == [] + assert set(payload["refreshed_summary"]).issuperset({"total", "active_drift", "detached_residue", "orphan_cleanup", "healthy_capacity"}) diff --git a/tests/Unit/monitor/test_monitor_compat.py b/tests/Unit/monitor/test_monitor_compat.py new file mode 100644 index 000000000..809ede5e1 --- /dev/null +++ b/tests/Unit/monitor/test_monitor_compat.py @@ -0,0 +1,405 @@ +from backend.web.services import monitor_service + + +def test_list_leases_exposes_semantic_groups_and_summary(monkeypatch): + class FakeRepo: + def query_leases(self): + return [ + { + "lease_id": "lease-healthy", + "provider_name": "local", + "desired_state": "running", + "observed_state": "running", + "current_instance_id": "inst-1", + "last_error": None, + "updated_at": "2026-04-06T00:10:00", + "thread_id": "thread-1", + }, + { + "lease_id": "lease-diverged", + "provider_name": "local", + "desired_state": "running", + "observed_state": "detached", + "current_instance_id": "inst-2", + "last_error": "drift", + "updated_at": "2026-04-06T00:11:00", + "thread_id": "thread-2", + }, + { + "lease_id": "lease-orphan-diverged", + "provider_name": "local", + "desired_state": "running", + "observed_state": "detached", + "current_instance_id": "inst-3", + "last_error": None, + "updated_at": "2026-04-06T00:12:00", + "thread_id": None, + }, + { + "lease_id": "lease-orphan", + "provider_name": "local", + "desired_state": "stopped", + "observed_state": "stopped", + "current_instance_id": "inst-4", + "last_error": None, + "updated_at": "2026-04-06T00:13:00", + "thread_id": None, + }, + ] + + def close(self): + return None + + monkeypatch.setattr(monitor_service, "make_sandbox_monitor_repo", lambda: FakeRepo()) + monkeypatch.setattr( + monitor_service, + "_hours_since", + lambda iso_timestamp: { + "2026-04-06T00:10:00": 0.5, + "2026-04-06T00:11:00": 0.5, + "2026-04-06T00:12:00": 10.0, + "2026-04-06T00:13:00": 10.0, + }.get(iso_timestamp), + ) + + payload = monitor_service.list_leases() + + assert payload["summary"] == { + "total": 4, + "healthy": 1, + "diverged": 1, + "orphan": 1, + "orphan_diverged": 1, + } + assert [group["key"] for group in payload["groups"]] == [ + "orphan_diverged", + "diverged", + "orphan", + "healthy", + ] + assert payload["triage"]["summary"] == { + "total": 4, + "active_drift": 1, + "detached_residue": 0, + "orphan_cleanup": 2, + "healthy_capacity": 1, + } + assert [group["key"] for group in payload["triage"]["groups"]] == [ + "active_drift", + "detached_residue", + "orphan_cleanup", + "healthy_capacity", + ] + by_id = {item["lease_id"]: item for item in payload["items"]} + assert by_id["lease-healthy"]["semantics"]["category"] == "healthy" + assert by_id["lease-healthy"]["triage"]["category"] == "healthy_capacity" + assert by_id["lease-diverged"]["semantics"]["category"] == "diverged" + assert by_id["lease-diverged"]["triage"]["category"] == "active_drift" + assert by_id["lease-orphan-diverged"]["semantics"]["category"] == "orphan_diverged" + assert by_id["lease-orphan-diverged"]["triage"]["category"] == "orphan_cleanup" + assert by_id["lease-orphan"]["semantics"]["category"] == "orphan" + assert by_id["lease-orphan"]["triage"]["category"] == "orphan_cleanup" + + +def test_list_leases_marks_old_detached_running_rows_as_detached_residue(monkeypatch): + class FakeRepo: + def query_leases(self): + return [ + { + "lease_id": "lease-stale", + "provider_name": "local", + "desired_state": "running", + "observed_state": "detached", + "current_instance_id": "inst-9", + "last_error": None, + "updated_at": "2026-04-05T00:00:00", + "thread_id": "subagent-1234", + } + ] + + def close(self): + return None + + monkeypatch.setattr(monitor_service, "make_sandbox_monitor_repo", lambda: FakeRepo()) + monkeypatch.setattr(monitor_service, "_hours_since", lambda _: 24.0) + + payload = monitor_service.list_leases() + + item = payload["items"][0] + assert item["semantics"]["category"] == "diverged" + assert item["triage"]["category"] == "detached_residue" + assert payload["triage"]["summary"]["detached_residue"] == 1 + + +def test_get_lease_falls_back_to_historical_session_rows(monkeypatch): + class FakeRepo: + def query_lease(self, lease_id): + return None + + def query_lease_threads(self, lease_id): + return [] + + def query_lease_events(self, lease_id): + return [] + + def query_lease_sessions(self, lease_id): + return [ + { + "chat_session_id": "sess-old", + "thread_id": "thread-historical", + "status": "closed", + "started_at": "2026-04-06T10:00:00", + "ended_at": "2026-04-06T10:05:00", + "close_reason": "expired", + "lease_id": lease_id, + "provider_name": None, + "desired_state": None, + "observed_state": None, + "current_instance_id": None, + "last_error": None, + } + ] + + def close(self): + return None + + monkeypatch.setattr(monitor_service, "make_sandbox_monitor_repo", lambda: FakeRepo()) + + payload = monitor_service.get_lease("lease-historical") + + assert payload["lease_id"] == "lease-historical" + assert payload["info"]["provider"] == "unknown" + assert payload["state"]["text"] == "destroyed" + assert payload["related_threads"]["items"] == [{"thread_id": "thread-historical", "thread_url": "/thread/thread-historical"}] + + +def test_build_evaluation_operator_surface_flags_runner_exit_before_threads_materialize(): + payload = monitor_service.build_evaluation_operator_surface( + status="provisional", + notes=("runner=direct rc=1 sandbox=local run_dir=/tmp/eval stdout_log=/tmp/eval/out.log stderr_log=/tmp/eval/err.log"), + score={ + "score_gate": "provisional", + "publishable": False, + "run_dir": "/tmp/eval", + "manifest_path": "/tmp/eval/run_manifest.json", + "eval_summary_path": None, + "trace_summaries_path": None, + "scored": False, + }, + threads_total=0, + threads_running=0, + threads_done=0, + ) + + assert payload["kind"] == "bootstrap_failure" + assert payload["tone"] == "danger" + assert payload["headline"] == "Runner exited before evaluation threads materialized." + assert "bootstrap failure" in payload["summary"] + assert payload["facts"][-2:] == [ + {"label": "Runner", "value": "direct"}, + {"label": "Exit code", "value": "1"}, + ] + artifact_labels = {item["label"] for item in payload["artifacts"]} + assert artifact_labels == { + "Run directory", + "Run manifest", + "STDOUT log", + "STDERR log", + "Eval summary", + "Trace summaries", + } + assert payload["artifact_summary"] == { + "present": 4, + "missing": 2, + "total": 6, + } + assert payload["artifacts"][0]["status"] == "present" + assert payload["artifacts"][-1]["status"] == "missing" + + +def test_build_evaluation_operator_surface_marks_running_waiting_for_threads(): + payload = monitor_service.build_evaluation_operator_surface( + status="running", + notes="runner=direct rc=0", + score={ + "score_gate": "provisional", + "publishable": False, + "run_dir": "/tmp/eval", + "manifest_path": "/tmp/eval/run_manifest.json", + "eval_summary_path": None, + "trace_summaries_path": None, + "scored": False, + }, + threads_total=0, + threads_running=2, + threads_done=0, + ) + + assert payload["kind"] == "running_waiting_for_threads" + assert payload["tone"] == "default" + assert "actively running" in payload["headline"] + assert payload["artifact_summary"]["present"] == 2 + + +def test_build_evaluation_operator_surface_marks_completed_with_errors(): + payload = monitor_service.build_evaluation_operator_surface( + status="completed_with_errors", + notes="runner=direct rc=0", + score={ + "score_gate": "final", + "publishable": True, + "run_dir": "/tmp/eval", + "manifest_path": "/tmp/eval/run_manifest.json", + "eval_summary_path": "/tmp/eval/eval_summary.json", + "trace_summaries_path": "/tmp/eval/trace_summaries.jsonl", + "scored": True, + "error_instances": 2, + }, + threads_total=10, + threads_running=0, + threads_done=10, + ) + + assert payload["kind"] == "completed_with_errors" + assert payload["tone"] == "warning" + assert "completed with recorded errors" in payload["headline"] + assert payload["artifact_summary"] == { + "present": 4, + "missing": 2, + "total": 6, + } + + +def test_cleanup_resource_leases_deletes_allowed_detached_residue(monkeypatch): + rows = [ + { + "lease_id": "lease-stale", + "provider_name": "local", + "desired_state": "running", + "observed_state": "detached", + "current_instance_id": None, + "last_error": None, + "updated_at": "2026-04-05T00:00:00", + "thread_id": "subagent-1234", + } + ] + + class FakeMonitorRepo: + def query_leases(self): + return list(rows) + + def query_lease_sessions(self, lease_id): + assert lease_id == "lease-stale" + return [{"chat_session_id": "sess-old", "status": "closed"}] + + def close(self): + return None + + class FakeLeaseRepo: + def __init__(self): + self.deleted = [] + + def delete(self, lease_id): + self.deleted.append(lease_id) + rows[:] = [row for row in rows if row["lease_id"] != lease_id] + + def close(self): + return None + + class FakeChatSessionRepo: + def lease_has_running_command(self, lease_id): + assert lease_id == "lease-stale" + return False + + def close(self): + return None + + lease_repo = FakeLeaseRepo() + monkeypatch.setattr(monitor_service, "make_sandbox_monitor_repo", lambda: FakeMonitorRepo()) + monkeypatch.setattr(monitor_service, "make_lease_repo", lambda: lease_repo) + monkeypatch.setattr(monitor_service, "make_chat_session_repo", lambda: FakeChatSessionRepo()) + monkeypatch.setattr(monitor_service, "init_providers_and_managers", lambda: ({}, {})) + monkeypatch.setattr(monitor_service, "_hours_since", lambda _: 24.0) + + payload = monitor_service.cleanup_resource_leases( + action="cleanup_residue", + lease_ids=["lease-stale"], + expected_category="detached_residue", + ) + + assert lease_repo.deleted == ["lease-stale"] + assert payload["attempted"] == ["lease-stale"] + assert payload["cleaned"] == [{"lease_id": "lease-stale", "category": "detached_residue"}] + assert payload["skipped"] == [] + assert payload["errors"] == [] + assert payload["refreshed_summary"]["detached_residue"] == 0 + + +def test_cleanup_resource_leases_reports_category_mismatch_without_deleting(monkeypatch): + rows = [ + { + "lease_id": "lease-live", + "provider_name": "local", + "desired_state": "running", + "observed_state": "detached", + "current_instance_id": "inst-live", + "last_error": None, + "updated_at": "2026-04-06T00:00:00", + "thread_id": "thread-1", + } + ] + + class FakeMonitorRepo: + def query_leases(self): + return list(rows) + + def query_lease_sessions(self, lease_id): + assert lease_id == "lease-live" + return [{"chat_session_id": "sess-live", "status": "active"}] + + def close(self): + return None + + class FakeLeaseRepo: + def __init__(self): + self.deleted = [] + + def delete(self, lease_id): + self.deleted.append(lease_id) + + def close(self): + return None + + class FakeChatSessionRepo: + def lease_has_running_command(self, lease_id): + assert lease_id == "lease-live" + return True + + def close(self): + return None + + lease_repo = FakeLeaseRepo() + monkeypatch.setattr(monitor_service, "make_sandbox_monitor_repo", lambda: FakeMonitorRepo()) + monkeypatch.setattr(monitor_service, "make_lease_repo", lambda: lease_repo) + monkeypatch.setattr(monitor_service, "make_chat_session_repo", lambda: FakeChatSessionRepo()) + monkeypatch.setattr(monitor_service, "init_providers_and_managers", lambda: ({}, {})) + monkeypatch.setattr(monitor_service, "_hours_since", lambda _: 0.5) + + payload = monitor_service.cleanup_resource_leases( + action="cleanup_residue", + lease_ids=["lease-live"], + expected_category="detached_residue", + ) + + assert lease_repo.deleted == [] + assert payload["attempted"] == ["lease-live"] + assert payload["cleaned"] == [] + assert payload["skipped"] == ["lease-live"] + assert payload["errors"] == [ + { + "lease_id": "lease-live", + "reason": "category_mismatch", + "expected_category": "detached_residue", + "actual_category": "active_drift", + } + ] diff --git a/tests/Unit/monitor/test_monitor_resource_overview_cache.py b/tests/Unit/monitor/test_monitor_resource_overview_cache.py index 0d17c0b04..6f961cb7d 100644 --- a/tests/Unit/monitor/test_monitor_resource_overview_cache.py +++ b/tests/Unit/monitor/test_monitor_resource_overview_cache.py @@ -1,6 +1,23 @@ from backend.web.services import resource_cache as cache +def _triage_payload(category: str) -> dict: + summary = { + "total": 1, + "active_drift": 0, + "detached_residue": 0, + "orphan_cleanup": 0, + "healthy_capacity": 0, + } + summary[category] = 1 + return { + "triage": { + "summary": summary, + "groups": [{"key": category, "items": [{"lease_id": "lease-1"}]}], + } + } + + def test_resource_overview_cache_refresh_adds_metadata(monkeypatch): cache.clear_monitor_resource_overview_cache() monkeypatch.setattr( @@ -17,14 +34,22 @@ def test_resource_overview_cache_refresh_adds_metadata(monkeypatch): "providers": [{"id": "local"}], }, ) + monkeypatch.setattr( + cache, + "monitor_service", + type("_MonitorService", (), {"list_leases": staticmethod(lambda: _triage_payload("detached_residue"))}), + raising=False, + ) payload = cache.refresh_monitor_resource_overview_sync() assert payload["summary"]["refresh_status"] == "ok" assert payload["summary"]["refresh_error"] is None assert payload["summary"]["last_refreshed_at"] == "2026-03-03T00:00:00Z" + assert payload["triage"]["summary"]["detached_residue"] == 1 cached = cache.get_monitor_resource_overview_snapshot() assert cached["providers"][0]["id"] == "local" + assert cached["triage"]["groups"][0]["key"] == "detached_residue" def test_resource_overview_cache_keeps_last_snapshot_on_refresh_error(monkeypatch): @@ -43,6 +68,12 @@ def test_resource_overview_cache_keeps_last_snapshot_on_refresh_error(monkeypatc "providers": [{"id": "docker"}], }, ) + monkeypatch.setattr( + cache, + "monitor_service", + type("_MonitorService", (), {"list_leases": staticmethod(lambda: _triage_payload("orphan_cleanup"))}), + raising=False, + ) cache.refresh_monitor_resource_overview_sync() def _raise(): @@ -53,6 +84,7 @@ def _raise(): assert degraded["providers"][0]["id"] == "docker" assert degraded["summary"]["refresh_status"] == "error" assert degraded["summary"]["refresh_error"] == "probe failed" + assert degraded["triage"]["groups"][0]["key"] == "orphan_cleanup" def test_resource_overview_cache_refreshes_when_live_session_counts_drift(monkeypatch): @@ -94,9 +126,16 @@ def test_resource_overview_cache_refreshes_when_live_session_counts_drift(monkey calls = iter([stale_payload, fresh_payload]) monkeypatch.setattr(cache.resource_service, "list_resource_providers", lambda: next(calls)) monkeypatch.setattr(cache.resource_service, "visible_resource_session_stats", lambda: {"local": {"sessions": 1, "running": 1}}) + monkeypatch.setattr( + cache, + "monitor_service", + type("_MonitorService", (), {"list_leases": staticmethod(lambda: _triage_payload("healthy_capacity"))}), + raising=False, + ) cache.refresh_monitor_resource_overview_sync() payload = cache.get_monitor_resource_overview_snapshot() assert payload["providers"][0]["telemetry"]["running"]["used"] == 1 assert len(payload["providers"][0]["sessions"]) == 1 + assert payload["triage"]["summary"]["healthy_capacity"] == 1 diff --git a/tests/Unit/monitor/test_monitor_sqlite_sandbox_repo.py b/tests/Unit/monitor/test_monitor_sqlite_sandbox_repo.py new file mode 100644 index 000000000..637b35143 --- /dev/null +++ b/tests/Unit/monitor/test_monitor_sqlite_sandbox_repo.py @@ -0,0 +1,256 @@ +import sqlite3 + +from storage.providers.sqlite.sandbox_monitor_repo import SQLiteSandboxMonitorRepo +from storage.providers.supabase.sandbox_monitor_repo import SupabaseSandboxMonitorRepo +from tests.fakes.supabase import FakeSupabaseClient + + +def _bootstrap_monitor_db(db_path): + conn = sqlite3.connect(db_path) + try: + conn.executescript( + """ + CREATE TABLE sandbox_leases ( + lease_id TEXT PRIMARY KEY, + provider_name TEXT, + desired_state TEXT, + observed_state TEXT, + current_instance_id TEXT, + created_at TEXT, + updated_at TEXT + ); + + CREATE TABLE abstract_terminals ( + terminal_id TEXT PRIMARY KEY, + lease_id TEXT, + thread_id TEXT, + cwd TEXT, + created_at TEXT + ); + + CREATE TABLE chat_sessions ( + chat_session_id TEXT PRIMARY KEY, + thread_id TEXT, + lease_id TEXT, + status TEXT, + started_at TEXT, + last_active_at TEXT + ); + """ + ) + conn.commit() + finally: + conn.close() + + +def test_list_sessions_with_leases_keeps_raw_newest_terminal_truth(tmp_path): + db_path = tmp_path / "sandbox.db" + _bootstrap_monitor_db(db_path) + + conn = sqlite3.connect(db_path) + try: + conn.execute( + """ + INSERT INTO sandbox_leases ( + lease_id, provider_name, desired_state, observed_state, current_instance_id, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + "lease-1", + "daytona_selfhost", + "paused", + "paused", + "instance-1", + "2026-04-05T13:00:00", + "2026-04-05T23:59:00", + ), + ) + conn.executemany( + """ + INSERT INTO abstract_terminals (terminal_id, lease_id, thread_id, cwd, created_at) + VALUES (?, ?, ?, ?, ?) + """, + [ + ("term-parent", "lease-1", "thread-parent", "/home/daytona/files/app", "2026-04-05T13:35:08"), + ("term-subagent", "lease-1", "subagent-deadbeef", "/home/daytona/files/app", "2026-04-05T23:51:40"), + ], + ) + conn.executemany( + """ + INSERT INTO chat_sessions (chat_session_id, thread_id, lease_id, status, started_at) + VALUES (?, ?, ?, ?, ?) + """, + [ + ("sess-parent", "thread-parent", "lease-1", "closed", "2026-04-05T23:24:06"), + ("sess-subagent", "subagent-deadbeef", "lease-1", "closed", "2026-04-05T23:51:42"), + ], + ) + conn.commit() + finally: + conn.close() + + repo = SQLiteSandboxMonitorRepo(db_path=db_path) + try: + rows = repo.list_sessions_with_leases() + finally: + repo.close() + + assert len(rows) == 2 + assert {row["thread_id"] for row in rows} == {"thread-parent", "subagent-deadbeef"} + assert all(row["lease_id"] == "lease-1" for row in rows) + + +def test_query_threads_accepts_optional_thread_filter(tmp_path): + db_path = tmp_path / "sandbox.db" + _bootstrap_monitor_db(db_path) + + conn = sqlite3.connect(db_path) + try: + conn.execute( + """ + INSERT INTO sandbox_leases ( + lease_id, provider_name, desired_state, observed_state, current_instance_id, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ("lease-1", "local", "running", "running", "instance-1", "2026-04-05T10:00:00", "2026-04-05T10:00:00"), + ) + conn.executemany( + """ + INSERT INTO chat_sessions (chat_session_id, thread_id, lease_id, status, started_at) + VALUES (?, ?, ?, ?, ?) + """, + [ + ("sess-1", "thread-1", "lease-1", "active", "2026-04-05T10:00:00"), + ("sess-2", "thread-2", "lease-1", "active", "2026-04-05T10:05:00"), + ], + ) + conn.commit() + finally: + conn.close() + + repo = SQLiteSandboxMonitorRepo(db_path=db_path) + try: + rows = repo.query_threads(thread_id="thread-2") + finally: + repo.close() + + assert [row["thread_id"] for row in rows] == ["thread-2"] + + +def test_supabase_list_sessions_with_leases_matches_sqlite_terminal_and_recent_session_fallback(tmp_path): + db_path = tmp_path / "sandbox.db" + _bootstrap_monitor_db(db_path) + + sqlite_conn = sqlite3.connect(db_path) + try: + sqlite_conn.executemany( + """ + INSERT INTO sandbox_leases ( + lease_id, provider_name, desired_state, observed_state, current_instance_id, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + [ + ("lease-active", "local", "running", "running", "instance-a", "2026-04-05T10:00:00", "2026-04-05T10:00:00"), + ("lease-terminal", "daytona_selfhost", "paused", "paused", "instance-b", "2026-04-05T11:00:00", "2026-04-05T11:00:00"), + ("lease-recent", "docker", "paused", "paused", "instance-c", "2026-04-05T12:00:00", "2026-04-05T12:00:00"), + ], + ) + sqlite_conn.executemany( + """ + INSERT INTO abstract_terminals (terminal_id, lease_id, thread_id, cwd, created_at) + VALUES (?, ?, ?, ?, ?) + """, + [ + ("term-parent", "lease-terminal", "thread-parent", "/workspace", "2026-04-05T11:05:00"), + ("term-subagent", "lease-terminal", "subagent-deadbeef", "/workspace", "2026-04-05T11:06:00"), + ], + ) + sqlite_conn.executemany( + """ + INSERT INTO chat_sessions (chat_session_id, thread_id, lease_id, status, started_at) + VALUES (?, ?, ?, ?, ?) + """, + [ + ("sess-active", "thread-active", "lease-active", "active", "2026-04-05T10:01:00"), + ("sess-recent-a", "thread-old", "lease-recent", "closed", "2026-04-05T12:01:00"), + ("sess-recent-b", "thread-new", "lease-recent", "closed", "2026-04-05T12:02:00"), + ], + ) + sqlite_conn.commit() + finally: + sqlite_conn.close() + + sqlite_repo = SQLiteSandboxMonitorRepo(db_path=db_path) + try: + sqlite_rows = sqlite_repo.list_sessions_with_leases() + finally: + sqlite_repo.close() + + supabase_tables = { + "sandbox_leases": [ + { + "lease_id": "lease-active", + "provider_name": "local", + "desired_state": "running", + "observed_state": "running", + "current_instance_id": "instance-a", + "created_at": "2026-04-05T10:00:00", + "updated_at": "2026-04-05T10:00:00", + }, + { + "lease_id": "lease-terminal", + "provider_name": "daytona_selfhost", + "desired_state": "paused", + "observed_state": "paused", + "current_instance_id": "instance-b", + "created_at": "2026-04-05T11:00:00", + "updated_at": "2026-04-05T11:00:00", + }, + { + "lease_id": "lease-recent", + "provider_name": "docker", + "desired_state": "paused", + "observed_state": "paused", + "current_instance_id": "instance-c", + "created_at": "2026-04-05T12:00:00", + "updated_at": "2026-04-05T12:00:00", + }, + ], + "abstract_terminals": [ + {"terminal_id": "term-parent", "lease_id": "lease-terminal", "thread_id": "thread-parent", "created_at": "2026-04-05T11:05:00"}, + { + "terminal_id": "term-subagent", + "lease_id": "lease-terminal", + "thread_id": "subagent-deadbeef", + "created_at": "2026-04-05T11:06:00", + }, + ], + "chat_sessions": [ + { + "chat_session_id": "sess-active", + "thread_id": "thread-active", + "lease_id": "lease-active", + "status": "active", + "started_at": "2026-04-05T10:01:00", + }, + { + "chat_session_id": "sess-recent-a", + "thread_id": "thread-old", + "lease_id": "lease-recent", + "status": "closed", + "started_at": "2026-04-05T12:01:00", + }, + { + "chat_session_id": "sess-recent-b", + "thread_id": "thread-new", + "lease_id": "lease-recent", + "status": "closed", + "started_at": "2026-04-05T12:02:00", + }, + ], + } + supabase_repo = SupabaseSandboxMonitorRepo(FakeSupabaseClient(supabase_tables)) + + supabase_rows = supabase_repo.list_sessions_with_leases() + + assert supabase_rows == sqlite_rows