From 4e95e9394f62db88e79a45ce89f462574fb48833 Mon Sep 17 00:00:00 2001 From: ravensanstete <14302010024@fudan.edu.cn> Date: Tue, 28 Apr 2026 17:59:16 +0800 Subject: [PATCH] Add sandbox container runtime v2 --- README.md | 12 +- docs/architecture/runtime_and_scheduler.md | 22 +- docs/current_state.md | 29 +-- examples/sandbox-coding-smoke/README.md | 10 + examples/sandbox-coding-smoke/agent.py | 25 +++ examples/sandbox-coding-smoke/project.yml | 27 +++ examples/sandbox-coding-smoke/scorer.py | 18 ++ examples/sandbox-coding-smoke/task.py | 31 +++ snowl/benchmarks/agent_bench_os/adapter.py | 24 +- snowl/benchmarks/ipi_coding_agent/adapter.py | 35 ++- snowl/envs/substrate/container_backend.py | 35 +++ snowl/eval_loop.py | 36 ++- snowl/runtime/container_contract.py | 70 ++++++ snowl/runtime/container_lifecycle.py | 5 + snowl/runtime/container_providers.py | 202 ++++++++++++++++- snowl/runtime/container_runtime.py | 8 +- snowl/runtime/engine.py | 181 +++++++++++++++- snowl/runtime/resource_scheduler.py | 2 +- snowl/runtime/workspace.py | 217 +++++++++++++++++++ tests/test_container_lifecycle.py | 34 +++ tests/test_container_runtime_providers.py | 120 ++++++++++ tests/test_resource_scheduler.py | 34 ++- tests/test_runtime_workspace.py | 33 +++ 23 files changed, 1164 insertions(+), 46 deletions(-) create mode 100644 examples/sandbox-coding-smoke/README.md create mode 100644 examples/sandbox-coding-smoke/agent.py create mode 100644 examples/sandbox-coding-smoke/project.yml create mode 100644 examples/sandbox-coding-smoke/scorer.py create mode 100644 examples/sandbox-coding-smoke/task.py create mode 100644 snowl/runtime/workspace.py create mode 100644 tests/test_runtime_workspace.py diff --git a/README.md b/README.md index 0aac06d4..970e4d7c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ # Snowl +[![CI](https://github.com/Qitor/snowl/actions/workflows/ci.yml/badge.svg)](https://github.com/Qitor/snowl/actions/workflows/ci.yml) +![Python](https://img.shields.io/badge/python-%3E%3D3.10-blue) +![Docker Sandbox](https://img.shields.io/badge/docker--sandbox-ready-2496ED) +![Benchmarks](https://img.shields.io/badge/benchmarks-20%2B-success) +![License](https://img.shields.io/badge/license-see%20repo-lightgrey) + [English](./README.md) | [简体中文](./README.zh-CN.md) Snowl is an open-source safety evaluation framework for AI agents. @@ -49,7 +55,11 @@ change. - Built-in agent evaluator primitives for answer matching, function-call matching, tool trace policy, canary leakage, workspace/state checks, command checks, checkpoint scoring, rubric judging, and grouped metrics -- Local runtime orchestration for terminal and GUI-style benchmark tasks +- Phase-aware local runtime orchestration for terminal, GUI, sandbox, and + container-backed benchmark tasks +- Runtime-owned isolated workspaces with before/after snapshots, diff metadata, + and artifact collection hooks +- Runtime-owned container cleanup for compose and Docker container providers - Provider-aware concurrency controls for OpenAI-compatible model clients - Automatic live artifacts: `manifest.json`, `plan.json`, `events.jsonl`, `runtime_state.json`, `outcomes.json`, `aggregate.json`, CSV exports, and diff --git a/docs/architecture/runtime_and_scheduler.md b/docs/architecture/runtime_and_scheduler.md index 30eaaeaf..71d28442 100644 --- a/docs/architecture/runtime_and_scheduler.md +++ b/docs/architecture/runtime_and_scheduler.md @@ -72,14 +72,16 @@ Current implementation details: - Contract normalization lives in `snowl/runtime/container_contract.py`. - Runtime registration and cleanup ownership live in `snowl/runtime/container_lifecycle.py`. +- Runtime-owned per-trial workspace materialization and snapshots live in `snowl/runtime/workspace.py`. - `prepare_trial_phase()` injects both `__snowl_container_session` and `__snowl_runtime_container_spec` into agent context. +- When a sample declares workspace inputs, runtime injects `__snowl_workspace`, `workspace_dir`, and before/after snapshot metadata for scorers. - TerminalBench and OSWorld example agents now treat a missing runtime-managed session as a runtime contract violation. What this does not mean yet: - runtime-owned containers are not warm-pooled by default - `spec_hash` does not yet drive dispatch priority or reuse -- `max_container_slots` is still not a universal admission gate across every benchmark container path +- `max_container_slots` gates runtime-managed container prepare through `begin_prepare()` ## Planner / Eval / Runtime Relationship @@ -133,10 +135,10 @@ The main eval loop in `snowl/eval.py` is the real runtime behavior for repo-leve 5. For each trial: - delegate one-trial side effects to internal `EvalTrialLifecycle` - construct `TrialRequest` - - call `prepare_trial_phase(request)` under `scheduler.running_trial_slot()` - - call `execute_agent_phase(prepared)` under the same running-trial admission - - call `score_trial_phase(prepared, partial)` under `scheduler.scoring_slot()` - - call `finalize_trial_phase(prepared, outcome)` after scoring + - call `prepare_trial_phase(request)` under `scheduler.begin_prepare(...)` + - call `execute_agent_phase(prepared)` under `scheduler.begin_execute(...)` + - call `score_trial_phase(prepared, partial)` under `scheduler.begin_score(...)` + - call `finalize_trial_phase(prepared, outcome)` under `scheduler.begin_finalize(...)` - record the recovery attempt - schedule deferred auto retry if the outcome is retry-eligible 6. In the run `finally` path: @@ -157,18 +159,18 @@ The main eval loop and `execute_trial()` are now aligned on phase order: - score - finalize -The remaining mismatch is not phase omission; it is phase admission depth. Prepare still happens while the trial is already holding `running_trial_slot()`, and finalize is still a helper call rather than a separately scheduled phase. +The remaining mismatch is dispatch depth, not phase admission. The outer eval loop still schedules whole trial coroutines, while the lifecycle admits prepare, execute, score, and finalize separately inside each coroutine. ## Known Contract Mismatches These are confirmed mismatches between exposed runtime surfaces and the main eval-loop behavior. -- `prepare_trial_phase()` is a real helper, but the main eval loop still admits it under `scheduler.running_trial_slot()` semantics. Future scheduler work must not describe prepare as independently admitted today. +- `prepare_trial_phase()` is admitted through `scheduler.begin_prepare(...)`, but the outer dispatch loop is still not a dedicated prepare worker pool. - Provider budgets are enforced most strongly at model-call time through `OpenAICompatibleChatClient.set_global_model_call_slot_resolver(...)` and `_acquire_model_slot()`. The dispatch loop does not currently choose the next trial based on provider headroom. -- Runtime-owned container cleanup is centralized for runtime-managed resources, but `max_container_slots` still does not serve as a universal dispatcher gate for every benchmark container prepare path. +- Runtime-owned container cleanup is centralized for runtime-managed resources, and `max_container_slots` gates `runtime_container.requires_container` provider prepare paths. - `spec_hash` is computed from the normalized container contract and carried into trial payload/trace, but it does not drive dispatch priority, batching, locality-aware reuse, or warm-pool preference. -- `TaskExecutionPlan` and `TrialDescriptor` exist on `TrialRequest`, but `run_eval_with_components()` does not populate them for repo-level runs. Their presence is not proof of plan-aware scheduling. -- `begin_prepare()` and `begin_finalize()` exist on `ResourceScheduler`, but the main eval loop uses only `running_trial_slot()` and `scoring_slot()` directly. +- `TaskExecutionPlan` and `TrialDescriptor` are populated on `TrialRequest` for phase admission, but their presence is not proof of plan-aware dispatch ordering. +- `begin_prepare()`, `begin_execute()`, `begin_score()`, and `begin_finalize()` are used by `EvalTrialLifecycle`. - Benchmark/sample metadata may still carry raw provider startup fields such as compose paths or OSWorld settings for benchmark compatibility, but runtime ownership decisions must come from the normalized `runtime_container` contract, not from agent-side interpretation of those raw fields. ## Resource Budgets diff --git a/docs/current_state.md b/docs/current_state.md index 2f8ed0b8..e63ca95c 100644 --- a/docs/current_state.md +++ b/docs/current_state.md @@ -40,10 +40,12 @@ What works well today: - Provider budgets are enforced for OpenAI-compatible model calls through `OpenAICompatibleChatClient` and `scheduler.provider_slot(...)`. - TerminalBench and OSWorld now use a task-declared `runtime_container` contract that runtime resolves before agent execution. - Runtime-owned benchmark container resources are registered, leased, released, and summarized by a shared lifecycle manager. +- `runtime_container` now supports provider-name-first v2 fields for workspace, init/start/check commands, network, env, mounts, artifacts, and resource limits while retaining legacy startup fields. +- Runtime-owned per-trial workspaces can materialize source files, inject `workspace_dir`, snapshot before/after files, and expose workspace diff metadata to scorers. - Samples can restrict available tools with `metadata.tool_names` or `metadata.target_functions`; missing requested tools fail in prepare with a non-retryable validation error. - Samples can also declare dynamic OpenAI-style tool schemas in `metadata.tool_schemas`; runtime converts them into `ToolSpec`s, merges them with project tools, and fails prepare on schema conflicts. - Agent-oriented scorer primitives now cover normalized trace extraction, answer matching, function-call matching, trace policy, command checks, workspace diffs, canary leakage, state transitions, checkpoint aggregation, rubric judges, and grouped metrics. -- `compose_terminal` is available as a generic runtime container provider and can be selected through `runtime_container.provider_name`. +- `compose_terminal` and `docker_container` are available as generic runtime container providers and can be selected through `runtime_container.provider_name`. - The `toolemu` built-in scorer is Snowl-native and no longer imports or executes an external evaluator runtime. - Repo-level `run_eval()` now performs trial finalize and a run-end cleanup barrier before closing live event output. - Deferred auto-retry and manual `snowl retry` both reuse a recovery ledger instead of inventing a separate retry system. @@ -55,11 +57,11 @@ What works well today: | Topic | Implemented now | Partially implemented / inconsistent | Planned / not yet real | | --- | --- | --- | --- | | Provider budgets | `provider_budgets` are real controls and model calls acquire `scheduler.provider_slot(...)` through `OpenAICompatibleChatClient`. | Dispatch does not prioritize by provider headroom, so trials can be admitted and then wait later on model-call slots. | Scheduler-visible provider-aware dispatch and richer provider backpressure policies. | -| Prepare phase | `prepare_trial_phase()` exists, resolves task-declared container contracts, and performs container/sandbox setup. | In main eval flow, prepare still runs while holding `running_trial_slot()` rather than through an independently admitted prepare queue. | Independently admitted prepare scheduling. | -| Score decoupling | Score is admitted separately under `scoring_slot()` and no longer uses the same slot as execution. | The split is still coarse; prepare and finalize are not independently scheduled in the main loop. | Fully phase-aware scheduling across prepare, execute, score, and finalize. | -| Finalize behavior | `finalize_trial_phase()` is now used in both `execute_trial()` and the repo-level eval loop. | Finalize is still a helper call, not a first-class scheduler-managed phase with its own admission policy. | Finalize as a normal, explicitly scheduled phase in repo-level evals. | -| Runtime-owned container lifecycle | TerminalBench and OSWorld runtime-created containers are registered with run/trial ownership, released at trial end, and covered by a run-end cleanup barrier. | The shared lifecycle model is currently implemented only for these benchmark provider paths; historical or future container-backed paths still need explicit adoption. | Broader generalized container ownership across every container-backed benchmark path. | -| Container slot enforcement | `max_container_slots` exists and is tracked in scheduler/profiling data. Sandbox runtimes can be wrapped with it. | It is not a universal admission gate across every benchmark container prepare path in the main eval loop. | One control plane that gates container-backed work consistently. | +| Prepare phase | `prepare_trial_phase()` resolves workspaces, task-declared container contracts, and sandbox setup under `begin_prepare()`. | The outer dispatch loop is still coroutine-based rather than a fully materialized prepare worker pool. | Queue-level prepare batching and locality-aware reuse. | +| Score decoupling | Score is admitted separately under `begin_score()` and no longer uses the same slot as execution. | The outer dispatch loop still bounds total in-flight coroutines coarsely. | Richer score queue prioritization. | +| Finalize behavior | `finalize_trial_phase()` is admitted through `begin_finalize()` and releases runtime-owned containers. | Finalize has phase stats but not a dedicated concurrency limit. | Dedicated finalize policies if teardown becomes a bottleneck. | +| Runtime-owned container lifecycle | TerminalBench, OSWorld, compose_terminal, and docker_container sessions are registered with run/trial ownership, released at trial end, and covered by a run-end cleanup barrier. | Warm reuse is intentionally absent by default. | Warm-pool reuse and broader provider-specific diagnostics. | +| Container slot enforcement | `max_container_slots` gates runtime-managed container prepare through `begin_prepare()` and sandbox runtimes through the scheduled sandbox wrapper. | Local non-container workspace prepare is not gated by container slots. | More detailed prepare resource classes. | | `spec_hash` locality | Container providers compute `spec_hash` and trial payloads/traces can carry it. | Queue dispatch does not use it for batching, warm-locality, or reuse preference. | Locality-aware dispatch and stronger prepare reuse. | | Phase-aware retry | Provider HTTP retry and deferred whole-trial auto retry are real. | Retry is still mostly whole-trial; prepare/score/finalize are not retried as distinct scheduled phases. | Phase-specific retry and recovery policies. | @@ -85,11 +87,11 @@ The web monitor currently indexes runs from `.snowl/runs/` and uses: These areas are real, but still coarse or inconsistent: -- `TaskExecutionPlan` and `TrialDescriptor` exist in `snowl/runtime/resource_scheduler.py`, but `run_eval_with_components()` does not yet populate or use them for smarter dispatch. -- The scheduler exposes prepare/execute/score/finalize APIs, but the main eval loop only uses execute and score admission directly. -- `TrialRequest.execution_plan` and `TrialRequest.trial_descriptor` exist, but repo-level eval code does not populate them. +- `TaskExecutionPlan` and `TrialDescriptor` are populated for repo-level trial lifecycle admission, but not yet used for smarter dispatch ordering. +- The eval trial lifecycle uses scheduler prepare/execute/score/finalize APIs; the outer loop still handles dispatch and retry queues. +- `TrialRequest.execution_plan` and `TrialRequest.trial_descriptor` are populated by `EvalTrialLifecycle`. - `spec_hash` is computed from normalized container contracts, but the runtime does not yet use it for locality-aware dispatch, warm-pool reuse, or batching. -- `max_container_slots` is wired into sandbox wrapping and scheduler APIs, but not all container-provider prepare paths are centrally admitted through that budget yet. +- `max_container_slots` gates runtime-managed container-provider prepare paths selected by `runtime_container.requires_container`. - The main dispatch loop is still close to FIFO: it drains `fresh_queue` in plan order, then consumes deferred retries when ready. - Provider capacity is enforced at model-call admission time, not by a scheduler that prioritizes work based on provider headroom. - Task/sample rows may still carry raw benchmark startup fields such as compose paths or OSWorld settings, but runtime ownership decisions should come from the normalized `runtime_container` contract. @@ -131,7 +133,7 @@ The following show up in docs and scaffolding, but are not current runtime behav - Scheduler-driven phase planning with explicit `TrialDescriptor` / `TaskExecutionPlan` inputs. - Locality-aware dispatch using `spec_hash`. -- Broad prepare/finalize admission through `begin_prepare()` and `begin_finalize()`. +- Dedicated queue workers for prepare/finalize beyond the current per-trial phase admission. - Benchmark container warm reuse or pooling by default. - More sophisticated blocked-group/canary-first scheduling. - Distributed or multi-machine execution. @@ -140,8 +142,7 @@ The following show up in docs and scaffolding, but are not current runtime behav - Treat `docs/runtime_scheduling*.md` as design notes, not source-of-truth behavior docs. - Treat `run_eval()` as the runtime path that matters for end-to-end repo behavior. -- Do not assume `prepare_trial_phase()` or `finalize_trial_phase()` are independently scheduled just because helpers exist. - Do not assume task/sample raw benchmark fields are the ownership contract; runtime now resolves `runtime_container` and agents must not use raw compose/OSWorld fields to decide whether to start containers. -- Do not assume `max_container_slots` fully governs every container-backed path yet. -- Do not assume `TaskExecutionPlan`, `TrialDescriptor`, or `spec_hash` are wired into dispatch just because the types exist. +- Do not assume `max_container_slots` applies to non-container local workspace materialization. +- Do not assume `TaskExecutionPlan`, `TrialDescriptor`, or `spec_hash` drive dispatch order yet. - Do not assume multiple providers, distributed execution, or cross-run pooling exist just because the scheduler types look extensible. diff --git a/examples/sandbox-coding-smoke/README.md b/examples/sandbox-coding-smoke/README.md new file mode 100644 index 00000000..702b326f --- /dev/null +++ b/examples/sandbox-coding-smoke/README.md @@ -0,0 +1,10 @@ +# Sandbox Coding Smoke + +Minimal project showing Snowl runtime-owned workspaces for coding-agent style +benchmarks. The task seeds a tiny repository, the agent edits the isolated +workspace, and the scorer checks the resulting file diff. + +```bash +snowl eval examples/sandbox-coding-smoke/project.yml +``` + diff --git a/examples/sandbox-coding-smoke/agent.py b/examples/sandbox-coding-smoke/agent.py new file mode 100644 index 00000000..243dacdc --- /dev/null +++ b/examples/sandbox-coding-smoke/agent.py @@ -0,0 +1,25 @@ +from pathlib import Path + +from snowl.core import StopReason + + +class WorkspaceFixAgent: + agent_id = "workspace_fix_agent" + + async def run(self, state, context, tools=None): + _ = tools + workspace = context.metadata.get("__snowl_workspace") or {} + workspace_dir = Path(str(workspace.get("workspace_dir") or ".")) + target = workspace_dir / "src" / "app.py" + target.write_text("def add(a, b):\n return a + b\n", encoding="utf-8") + state.output = { + "message": {"role": "assistant", "content": "patched src/app.py"}, + "usage": {"input_tokens": 1, "output_tokens": 1, "total_tokens": 2}, + "trace_events": [{"event": "example.patch", "path": "src/app.py"}], + } + state.stop_reason = StopReason.COMPLETED + return state + + +agent = WorkspaceFixAgent() + diff --git a/examples/sandbox-coding-smoke/project.yml b/examples/sandbox-coding-smoke/project.yml new file mode 100644 index 00000000..295ae534 --- /dev/null +++ b/examples/sandbox-coding-smoke/project.yml @@ -0,0 +1,27 @@ +project: + name: sandbox-coding-smoke + root_dir: . + +provider: + id: local + kind: openai_compatible + base_url: http://127.0.0.1:9/v1 + api_key: unused + +agent_matrix: + models: + - id: local + model: local/no-model + +eval: + benchmark: sandbox_coding_smoke + code: + base_dir: . + task_module: ./task.py + agent_module: ./agent.py + scorer_module: ./scorer.py + +runtime: + max_running_trials: 1 + max_container_slots: 1 + max_scoring_tasks: 1 diff --git a/examples/sandbox-coding-smoke/scorer.py b/examples/sandbox-coding-smoke/scorer.py new file mode 100644 index 00000000..095ad6fc --- /dev/null +++ b/examples/sandbox-coding-smoke/scorer.py @@ -0,0 +1,18 @@ +from snowl.core import Score +from snowl.scorer import workspace_diff + + +class WorkspaceSmokeScorer: + scorer_id = "workspace_smoke" + + def score(self, task_result, trace, context): + base = workspace_diff(metric_name="workspace_changed").score(task_result, trace, context) + changed = base["workspace_changed"] + return { + "workspace_changed": changed, + "accuracy": Score(changed.value, metadata=dict(changed.metadata)), + } + + +scorer = WorkspaceSmokeScorer() + diff --git a/examples/sandbox-coding-smoke/task.py b/examples/sandbox-coding-smoke/task.py new file mode 100644 index 00000000..046622d3 --- /dev/null +++ b/examples/sandbox-coding-smoke/task.py @@ -0,0 +1,31 @@ +from snowl.core import EnvSpec, Task + + +def _samples(): + yield { + "id": "fix-add", + "input": "Fix src/app.py so add(1, 2) returns 3.", + "metadata": { + "workspace": { + "enabled": True, + "repo_files": { + "src/app.py": "def add(a, b):\n return 0\n", + "tests/test_app.py": "from src.app import add\n\n\ndef test_add():\n assert add(1, 2) == 3\n", + }, + }, + "required_changed_paths": ["src/app.py"], + "check_command": "python -m pytest -q", + }, + } + + +task = Task( + task_id="sandbox-coding-smoke", + env_spec=EnvSpec(env_type="terminal", provided_ops=("process.run", "terminal.exec", "terminal.capture", "terminal.wait")), + sample_iter_factory=_samples, + metadata={ + "benchmark": "sandbox_coding_smoke", + "primary_metric": "workspace_changed", + }, +) + diff --git a/snowl/benchmarks/agent_bench_os/adapter.py b/snowl/benchmarks/agent_bench_os/adapter.py index c9fbb1ae..3716e96c 100644 --- a/snowl/benchmarks/agent_bench_os/adapter.py +++ b/snowl/benchmarks/agent_bench_os/adapter.py @@ -82,10 +82,19 @@ def _row_to_sample( } startup = _runtime_startup(row) if startup: + check_command = startup.get("check_command") + if check_command: + metadata["check_command"] = check_command + metadata["verification_command"] = check_command metadata["runtime_container"] = { "benchmark": self.name, - "provider_name": "compose_terminal", + "provider_name": startup.pop("provider_name", "compose_terminal"), "requires_container": True, + "init_command": startup.get("init_command"), + "start_command": startup.get("start_command"), + "check_command": check_command, + "network": startup.get("network"), + "workspace": dict(startup.get("workspace") or {}), "startup": startup, } return {"id": f"agent-bench-os-{sample_id}", "input": prompt, "metadata": metadata} @@ -109,6 +118,19 @@ def _runtime_startup(row: Mapping[str, Any]) -> dict[str, Any]: startup["init_command"] = init if row.get("start"): startup["start_command"] = str(row.get("start")) + evaluation = row.get("evaluation") + if isinstance(evaluation, Mapping): + command = evaluation.get("command") or evaluation.get("check_command") or evaluation.get("verification_command") + if command is not None: + startup["check_command"] = str(command) if create.get("compose_file"): startup["compose_file"] = str(create.get("compose_file")) + if create.get("docker_image") or create.get("image"): + startup["provider_name"] = "docker_container" + startup["image"] = str(create.get("docker_image") or create.get("image")) + if create.get("network"): + startup["network"] = str(create.get("network")) + repo_files = create.get("repo_files") + if isinstance(repo_files, Mapping): + startup["workspace"] = {"enabled": True, "repo_files": dict(repo_files)} return startup diff --git a/snowl/benchmarks/ipi_coding_agent/adapter.py b/snowl/benchmarks/ipi_coding_agent/adapter.py index 43e429e1..c757fe13 100644 --- a/snowl/benchmarks/ipi_coding_agent/adapter.py +++ b/snowl/benchmarks/ipi_coding_agent/adapter.py @@ -83,6 +83,20 @@ def _row_to_sample( if issue_text: prompt += "\n\nIssue context:\n" + issue_text repo_files = dict(row.get("repo_files") or {}) + compose_file = str(row.get("compose_file") or "").strip() + docker_image = str(row.get("docker_image") or row.get("image") or "").strip() + provider_name = "docker_container" if docker_image else "compose_terminal" + requires_container = bool(compose_file or docker_image) + check_command = row.get("check_command") or row.get("verification_command") + runtime_startup: dict[str, Any] = { + "network": "disabled", + "repo_files": repo_files, + "verification_command": check_command, + } + if compose_file: + runtime_startup["compose_file"] = compose_file + if docker_image: + runtime_startup["image"] = docker_image metadata = { "split": row_split, "task_description": task_description, @@ -96,18 +110,27 @@ def _row_to_sample( "is_benign": bool(row.get("is_benign")), "repo_files": repo_files, "workspace_before": repo_files, - "verification_command": row.get("verification_command"), + "verification_command": check_command, + "check_command": check_command, "expected_fix": row.get("expected_fix"), "forbidden_paths": list(row.get("persistence_targets") or []), + "workspace": { + "enabled": True, + "repo_files": repo_files, + "artifacts": list(row.get("artifacts") or []), + }, "runtime_container": { "benchmark": self.name, - "provider_name": "compose_terminal", - "requires_container": False, - "startup": { - "network": "disabled", + "provider_name": provider_name, + "requires_container": requires_container, + "network": "disabled", + "check_command": str(check_command) if check_command else None, + "workspace": { + "enabled": True, "repo_files": repo_files, - "verification_command": row.get("verification_command"), + "artifacts": list(row.get("artifacts") or []), }, + "startup": runtime_startup, }, } return {"id": sample_id, "input": prompt, "metadata": metadata} diff --git a/snowl/envs/substrate/container_backend.py b/snowl/envs/substrate/container_backend.py index cc1e11ac..af114a90 100644 --- a/snowl/envs/substrate/container_backend.py +++ b/snowl/envs/substrate/container_backend.py @@ -32,6 +32,10 @@ def run( self, *, image: str, + name: str | None = None, + command: str | None = None, + workdir: str | None = None, + network: str | None = None, env: Mapping[str, str] | None = None, ports: Mapping[int, int] | None = None, volumes: Mapping[str, str] | None = None, @@ -44,6 +48,13 @@ def run( cmd = ["docker", "run"] if detach: cmd.append("-d") + if name: + cmd += ["--name", str(name)] + if network: + network_value = "none" if str(network).lower() in {"disabled", "none", "off"} else str(network) + cmd += ["--network", network_value] + if workdir: + cmd += ["-w", str(workdir)] for cap in (cap_add or ()): cap_name = str(cap).strip() if cap_name: @@ -59,6 +70,8 @@ def run( for key, value in (env or {}).items(): cmd += ["-e", f"{key}={value}"] cmd.append(str(image)) + if command: + cmd += ["bash", "-lc", str(command)] return self._runner.run( cmd, timeout_seconds=timeout_seconds, @@ -98,6 +111,28 @@ def logs( on_event=on_event, ) + def exec( + self, + container_id: str, + command: str, + *, + env: Mapping[str, str] | None = None, + workdir: str | None = None, + on_event: EventSink = None, + timeout_seconds: float | None = None, + ) -> dict[str, Any]: + cmd = ["docker", "exec"] + if workdir: + cmd += ["-w", str(workdir)] + for key, value in (env or {}).items(): + cmd += ["-e", f"{key}={value}"] + cmd += [str(container_id), "bash", "-lc", str(command)] + return self._runner.run( + cmd, + timeout_seconds=timeout_seconds, + on_event=on_event, + ) + @staticmethod def compose_base_cmd(*, project: str, compose_file: str) -> list[str]: return [ diff --git a/snowl/eval_loop.py b/snowl/eval_loop.py index 5b6e3ffe..1e24c958 100644 --- a/snowl/eval_loop.py +++ b/snowl/eval_loop.py @@ -17,10 +17,11 @@ from snowl.observability.events import RunEventBus from snowl.planning import PlanTrial, trial_key as make_trial_key from snowl.runtime import TrialOutcome, TrialRequest +from snowl.runtime.container_contract import resolve_runtime_container_spec from snowl.runtime.container_lifecycle import RuntimeContainerLifecycleManager from snowl.runtime.engine import finalize_trial_phase, prepare_trial_phase, execute_agent_phase, score_trial_phase from snowl.runtime.recovery import RecoveryManager, attempt_is_success -from snowl.runtime.resource_scheduler import ResourceScheduler +from snowl.runtime.resource_scheduler import ResourceScheduler, TaskExecutionPlan, TrialDescriptor from snowl.ui.contracts import TaskMonitor, normalize_ui_event @@ -119,6 +120,29 @@ async def run( if retry_source != "initial_run": self._emit_retry_start(trial=trial, retry_source=retry_source) + container_spec = resolve_runtime_container_spec( + task_metadata=trial.task.metadata, + sample=trial.sample, + ) + trial_descriptor = TrialDescriptor( + trial_id=key, + task_id=trial.task_id, + sample_id=trial.sample_id, + agent_id=trial.agent_id, + variant_id=trial.variant_id, + scorer_id=getattr(self.scorer, "scorer_id", None), + seed=None, + spec_hash=container_spec.spec_hash, + provider_ids=(), + ) + execution_plan = TaskExecutionPlan( + trial=trial_descriptor, + requires_container=bool(container_spec.requires_container), + requires_prepare=True, + requires_build=False, + estimated_prepare_cost="container" if container_spec.requires_container else "light", + spec_hash=container_spec.spec_hash, + ) request = TrialRequest( task=trial.task, agent=trial.agent, @@ -127,16 +151,20 @@ async def run( tools=self.tool_specs, sandbox_runtime=self.shared_sandbox_runtime, on_event=lambda event: self._on_runtime_event(event, trial=trial), + execution_plan=execution_plan, + trial_descriptor=trial_descriptor, container_lifecycle=self.container_lifecycle, run_id=self.run_id, trial_id=key, ) - async with self.scheduler.running_trial_slot(): + async with self.scheduler.begin_prepare(execution_plan): prepared = await prepare_trial_phase(request) + async with self.scheduler.begin_execute(execution_plan): partial = await execute_agent_phase(prepared) - async with self.scheduler.scoring_slot(): + async with self.scheduler.begin_score(execution_plan): outcome = await score_trial_phase(prepared, partial) - outcome, _ = await finalize_trial_phase(prepared, outcome) + async with self.scheduler.begin_finalize(execution_plan): + outcome, _ = await finalize_trial_phase(prepared, outcome) async with self.checkpoint_lock: attempt_row = self.recovery_manager.record_attempt( diff --git a/snowl/runtime/container_contract.py b/snowl/runtime/container_contract.py index 5380c875..d495d48a 100644 --- a/snowl/runtime/container_contract.py +++ b/snowl/runtime/container_contract.py @@ -53,6 +53,15 @@ class RuntimeContainerSpec: cleanup_policy: str = "destroy_on_release" debug_preserve_default: bool = False startup: dict[str, Any] = field(default_factory=dict) + workspace: dict[str, Any] = field(default_factory=dict) + init_command: str | None = None + start_command: str | None = None + check_command: str | None = None + network: str | None = None + env: dict[str, str] = field(default_factory=dict) + mounts: list[dict[str, Any]] = field(default_factory=list) + artifacts: list[str] = field(default_factory=list) + resource_limits: dict[str, Any] = field(default_factory=dict) spec_hash_basis: dict[str, Any] = field(default_factory=dict) task_config: dict[str, Any] = field(default_factory=dict) sample_config: dict[str, Any] = field(default_factory=dict) @@ -78,6 +87,15 @@ def to_metadata(self) -> dict[str, Any]: "cleanup_policy": self.cleanup_policy, "debug_preserve_default": self.debug_preserve_default, "startup": _canonicalize(self.startup), + "workspace": _canonicalize(self.workspace), + "init_command": self.init_command, + "start_command": self.start_command, + "check_command": self.check_command, + "network": self.network, + "env": _canonicalize(self.env), + "mounts": _canonicalize(self.mounts), + "artifacts": list(self.artifacts), + "resource_limits": _canonicalize(self.resource_limits), "spec_hash_basis": _canonicalize(self.spec_hash_basis), "spec_hash": self.spec_hash, } @@ -126,6 +144,49 @@ def resolve_runtime_container_spec( task_startup = _as_mapping(task_contract.get("startup")) sample_startup = _as_mapping(sample_contract.get("startup")) startup = _deep_merge(task_startup, sample_startup) + workspace = _deep_merge( + _as_mapping(task_contract.get("workspace")), + _as_mapping(sample_contract.get("workspace")), + ) + init_command = ( + sample_contract.get("init_command") + or task_contract.get("init_command") + or startup.get("init_command") + ) + start_command = ( + sample_contract.get("start_command") + or task_contract.get("start_command") + or startup.get("start_command") + ) + check_command = ( + sample_contract.get("check_command") + or task_contract.get("check_command") + or startup.get("check_command") + or startup.get("verification_command") + ) + network = ( + sample_contract.get("network") + or task_contract.get("network") + or startup.get("network") + ) + env = _deep_merge( + _as_mapping(task_contract.get("env")), + _as_mapping(sample_contract.get("env")), + ) + env = _deep_merge(env, _as_mapping(startup.get("env"))) + mounts_raw = sample_contract.get("mounts", task_contract.get("mounts", startup.get("mounts", []))) + mounts = [dict(item) for item in mounts_raw] if isinstance(mounts_raw, list) else [] + artifacts_raw = sample_contract.get("artifacts", task_contract.get("artifacts", startup.get("artifacts", []))) + if isinstance(artifacts_raw, str): + artifacts = [artifacts_raw] + elif isinstance(artifacts_raw, list): + artifacts = [str(item) for item in artifacts_raw if str(item).strip()] + else: + artifacts = [] + resource_limits = _deep_merge( + _as_mapping(task_contract.get("resource_limits")), + _as_mapping(sample_contract.get("resource_limits")), + ) task_basis = _as_mapping(task_contract.get("spec_hash_basis")) sample_basis = _as_mapping(sample_contract.get("spec_hash_basis")) @@ -144,6 +205,15 @@ def resolve_runtime_container_spec( cleanup_policy=cleanup_policy, debug_preserve_default=debug_preserve_default, startup=startup, + workspace=workspace, + init_command=(str(init_command) if init_command else None), + start_command=(str(start_command) if start_command else None), + check_command=(str(check_command) if check_command else None), + network=(str(network).strip().lower() if network else None), + env={str(k): str(v) for k, v in env.items()}, + mounts=mounts, + artifacts=artifacts, + resource_limits=resource_limits, spec_hash_basis=spec_hash_basis, task_config=task_contract, sample_config=sample_contract, diff --git a/snowl/runtime/container_lifecycle.py b/snowl/runtime/container_lifecycle.py index 3e07fa13..e06ded3c 100644 --- a/snowl/runtime/container_lifecycle.py +++ b/snowl/runtime/container_lifecycle.py @@ -43,6 +43,7 @@ class RuntimeOwnedResourceRecord: container_id: str | None = None compose_project: str | None = None compose_file: str | None = None + workspace_dir: str | None = None session_kind: str | None = None provider_metadata: dict[str, Any] = field(default_factory=dict) teardown_error: str | None = None @@ -100,6 +101,7 @@ def register_container( container_id: str | None, compose_project: str | None, compose_file: str | None, + workspace_dir: str | None = None, session_kind: str | None, provider_metadata: dict[str, Any] | None, teardown: Callable[[], Awaitable[dict[str, Any] | None]], @@ -123,6 +125,7 @@ def register_container( container_id=str(container_id) if container_id else None, compose_project=str(compose_project) if compose_project else None, compose_file=str(compose_file) if compose_file else None, + workspace_dir=str(workspace_dir) if workspace_dir else None, session_kind=str(session_kind) if session_kind else None, provider_metadata=dict(provider_metadata or {}), ) @@ -144,6 +147,7 @@ def register_container( "container_id": record.container_id, "compose_project": record.compose_project, "compose_file": record.compose_file, + "workspace_dir": record.workspace_dir, "session_kind": record.session_kind, "cleanup_policy": record.cleanup_policy, "debug_preserve": record.debug_preserve, @@ -183,6 +187,7 @@ def snapshot(self) -> dict[str, Any]: "spec_hash": record.spec_hash, "container_id": record.container_id, "compose_project": record.compose_project, + "workspace_dir": record.workspace_dir, "session_kind": record.session_kind, "lifecycle_state": record.lifecycle_state.value, "debug_preserve": record.debug_preserve, diff --git a/snowl/runtime/container_providers.py b/snowl/runtime/container_providers.py index a14373d5..aeef4124 100644 --- a/snowl/runtime/container_providers.py +++ b/snowl/runtime/container_providers.py @@ -25,6 +25,7 @@ from snowl.benchmarks.osworld.container import OSWorldContainerLauncher from snowl.core import EnvSpec from snowl.envs import GuiEnv, TerminalEnv +from snowl.envs.substrate import CommandRunner, ContainerBackend from snowl.runtime.container_contract import RuntimeContainerSpec @@ -355,6 +356,11 @@ async def prepare(self, context: ContainerProviderContext) -> ContainerSession: workdir = Path(str(startup.get("workdir") or startup.get("task_root") or Path.cwd())).resolve() compose_file = str(startup.get("compose_file") or "").strip() use_compose = bool(compose_file and Path(compose_file).exists()) + workspace_dir = str(startup.get("workspace_dir") or context.container_spec.workspace.get("workspace_dir") or "").strip() + compose_env = {str(k): str(v) for k, v in dict(startup.get("compose_env") or {}).items()} + compose_env.update({str(k): str(v) for k, v in context.container_spec.env.items()}) + if workspace_dir: + compose_env.setdefault("SNOWL_WORKSPACE", workspace_dir) env = TerminalEnv( env_spec=EnvSpec( env_type="terminal", @@ -372,7 +378,7 @@ async def prepare(self, context: ContainerProviderContext) -> ContainerSession: compose_build=bool(startup.get("compose_build", True)), compose_project=project, compose_service=str(startup.get("compose_service") or "client"), - compose_env={str(k): str(v) for k, v in dict(startup.get("compose_env") or {}).items()}, + compose_env=compose_env, ) if use_compose: docker_path = context.ensure_docker_available(benchmark=context.container_spec.benchmark or self.name) @@ -408,6 +414,8 @@ async def prepare(self, context: ContainerProviderContext) -> ContainerSession: "compose_terminal docker compose up failed: " + str((up_out.get("stderr") or up_out.get("stdout") or "").strip()) ) + await self._run_lifecycle_command(context, env, "init", context.container_spec.init_command) + await self._run_lifecycle_command(context, env, "start", context.container_spec.start_command) else: context.emit_event( { @@ -417,6 +425,8 @@ async def prepare(self, context: ContainerProviderContext) -> ContainerSession: "compose_file": compose_file, } ) + await self._run_lifecycle_command(context, env, "init", context.container_spec.init_command) + await self._run_lifecycle_command(context, env, "start", context.container_spec.start_command) return ContainerSession( kind="terminal_compose", env=env, @@ -425,14 +435,46 @@ async def prepare(self, context: ContainerProviderContext) -> ContainerSession: "project": env.compose_project, "compose_file": env.compose_file, "compose_service": env.compose_service, + "workspace_dir": workspace_dir or None, "spec_hash": self.describe_requirements(context).get("spec_hash"), }, ) + async def _run_lifecycle_command( + self, + context: ContainerProviderContext, + env: TerminalEnv, + label: str, + command: str | None, + ) -> dict[str, Any] | None: + if not command: + return None + context.emit_event({"event": f"compose_terminal.container.{label}.start", "phase": "env", "command_text": command}) + out = await asyncio.to_thread( + env.exec, + command, + timeout_seconds=float(context.container_spec.resource_limits.get(f"{label}_timeout_seconds", 120.0)), + ) + context.emit_event( + { + "event": f"compose_terminal.container.{label}.finish", + "phase": "env", + "command_text": command, + "exit_code": out.get("exit_code"), + "duration_ms": out.get("duration_ms"), + "stdout_tail": str(out.get("stdout", ""))[-240:], + "stderr_tail": str(out.get("stderr", ""))[-240:], + } + ) + if out.get("exit_code", 1) != 0: + raise RuntimeError(f"compose_terminal {label}_command failed: {out.get('stderr') or out.get('stdout') or ''}") + return out + async def close(self, context: ContainerProviderContext, session: ContainerSession) -> dict[str, Any] | None: env = session.env + check_out = await self._run_lifecycle_command(context, env, "check", context.container_spec.check_command) if not getattr(env, "use_docker_compose", False): - return None + return check_out project = getattr(env, "compose_project", None) context.emit_event({"event": "compose_terminal.container.stopping", "phase": "env", "project": project}) down_out = await asyncio.to_thread( @@ -451,9 +493,164 @@ async def close(self, context: ContainerProviderContext, session: ContainerSessi "exit_code": down_out.get("exit_code"), } ) + if check_out is not None: + down_out["check"] = check_out return down_out +class DockerContainerProvider: + name = "docker_container" + + def describe_requirements(self, context: ContainerProviderContext) -> dict[str, Any]: + startup = dict(context.container_spec.startup) + return { + "benchmark": context.container_spec.benchmark, + "provider_name": self.name, + "requires_container": bool(context.container_spec.requires_container), + "requires_build": False, + "spec_hash": context.container_spec.spec_hash, + "prepare_provider_ids": (), + "estimated_prepare_cost": "medium", + "startup": startup, + } + + async def prepare(self, context: ContainerProviderContext) -> ContainerSession: + startup = dict(context.container_spec.startup) + image = str(startup.get("image") or startup.get("docker_image") or "").strip() + if not image: + raise RuntimeError("docker_container provider requires startup.image.") + docker_path = context.ensure_docker_available(benchmark=context.container_spec.benchmark or self.name) + safe_task = re.sub(r"[^a-zA-Z0-9._-]+", "-", str(context.task_id or "task")).strip("-") or "task" + safe_sample = re.sub(r"[^a-zA-Z0-9._-]+", "-", str((context.sample or {}).get("id") or "sample")).strip("-") or "sample" + safe_variant = re.sub(r"[^a-zA-Z0-9._-]+", "-", str(context.variant_id or "default")).strip("-") or "default" + container_name = str(startup.get("container_name") or f"snowl-dc-{safe_task}-{safe_sample[:12]}-{safe_variant[:12]}") + workspace_dir = str(startup.get("workspace_dir") or context.container_spec.workspace.get("workspace_dir") or "").strip() + volumes = {str(workspace_dir): str(startup.get("workspace_mount") or "/workspace")} if workspace_dir else {} + volumes.update({str(k): str(v) for k, v in dict(startup.get("volumes") or {}).items()}) + env_vars = {**context.container_spec.env, **{str(k): str(v) for k, v in dict(startup.get("env") or {}).items()}} + if workspace_dir: + env_vars.setdefault("SNOWL_WORKSPACE", str(startup.get("workspace_mount") or "/workspace")) + runner = CommandRunner(cwd=workspace_dir or None) + backend = ContainerBackend(command_runner=runner) + context.emit_event( + { + "event": "docker_container.container.starting", + "phase": "env", + "image": image, + "container_name": container_name, + "docker_path": docker_path, + "network": context.container_spec.network, + "workspace_dir": workspace_dir or None, + } + ) + out = await asyncio.to_thread( + backend.run, + image=image, + name=container_name, + command=str(startup.get("command") or "sleep infinity"), + workdir=str(startup.get("workdir") or (startup.get("workspace_mount") or "/workspace")), + network=context.container_spec.network, + env=env_vars, + volumes=volumes, + detach=True, + timeout_seconds=float(context.container_spec.resource_limits.get("start_timeout_seconds", 120.0)), + on_event=context.emit_env_stream, + ) + container_id = str(out.get("stdout") or "").strip().splitlines()[-1] if str(out.get("stdout") or "").strip() else container_name + context.emit_event( + { + "event": "docker_container.container.started", + "phase": "env", + "container_id": container_id, + "container_name": container_name, + "exit_code": out.get("exit_code"), + "duration_ms": out.get("duration_ms"), + } + ) + if out.get("exit_code", 1) != 0: + raise RuntimeError("docker_container docker run failed: " + str((out.get("stderr") or out.get("stdout") or "").strip())) + await self._run_docker_lifecycle_command(context, backend, container_id, "init", context.container_spec.init_command) + await self._run_docker_lifecycle_command(context, backend, container_id, "start", context.container_spec.start_command) + return ContainerSession( + kind="docker_container", + env={ + "container_id": container_id, + "container_name": container_name, + "workspace_dir": workspace_dir or None, + "backend": backend, + }, + benchmark=context.container_spec.benchmark or self.name, + metadata={ + "container_id": container_id, + "container_name": container_name, + "workspace_dir": workspace_dir or None, + "image": image, + "spec_hash": self.describe_requirements(context).get("spec_hash"), + }, + ) + + async def _run_docker_lifecycle_command( + self, + context: ContainerProviderContext, + backend: ContainerBackend, + container_id: str, + label: str, + command: str | None, + ) -> dict[str, Any] | None: + if not command: + return None + mount = str(context.container_spec.startup.get("workspace_mount") or "/workspace") + context.emit_event({"event": f"docker_container.container.{label}.start", "phase": "env", "command_text": command}) + out = await asyncio.to_thread( + backend.exec, + container_id, + command, + workdir=mount, + env=context.container_spec.env, + timeout_seconds=float(context.container_spec.resource_limits.get(f"{label}_timeout_seconds", 120.0)), + on_event=context.emit_env_stream, + ) + context.emit_event( + { + "event": f"docker_container.container.{label}.finish", + "phase": "env", + "command_text": command, + "exit_code": out.get("exit_code"), + "duration_ms": out.get("duration_ms"), + "stdout_tail": str(out.get("stdout", ""))[-240:], + "stderr_tail": str(out.get("stderr", ""))[-240:], + } + ) + if out.get("exit_code", 1) != 0: + raise RuntimeError(f"docker_container {label}_command failed: {out.get('stderr') or out.get('stdout') or ''}") + return out + + async def close(self, context: ContainerProviderContext, session: ContainerSession) -> dict[str, Any] | None: + env = dict(session.env or {}) + backend: ContainerBackend = env["backend"] + container_id = str(env.get("container_id") or env.get("container_name") or "") + check_out = await self._run_docker_lifecycle_command(context, backend, container_id, "check", context.container_spec.check_command) + context.emit_event({"event": "docker_container.container.stopping", "phase": "env", "container_id": container_id}) + out = await asyncio.to_thread( + backend.rm, + container_id, + force=True, + timeout_seconds=float(context.container_spec.resource_limits.get("stop_timeout_seconds", 60.0)), + on_event=context.emit_env_stream, + ) + context.emit_event( + { + "event": "docker_container.container.stopped", + "phase": "env", + "container_id": container_id, + "exit_code": out.get("exit_code"), + } + ) + if check_out is not None: + out["check"] = check_out + return out + + class OSWorldProvider: name = "osworld" @@ -517,5 +714,6 @@ def default_container_provider_registry() -> ContainerProviderRegistry: registry.register("terminalbench", TerminalBenchProvider()) registry.register("osworld", OSWorldProvider()) registry.register("compose_terminal", ComposeTerminalProvider()) + registry.register("docker_container", DockerContainerProvider()) _DEFAULT_PROVIDER_REGISTRY = registry return _DEFAULT_PROVIDER_REGISTRY diff --git a/snowl/runtime/container_runtime.py b/snowl/runtime/container_runtime.py index 12e74683..3fd17e40 100644 --- a/snowl/runtime/container_runtime.py +++ b/snowl/runtime/container_runtime.py @@ -144,6 +144,7 @@ async def prepare_phase(self) -> ContainerPrepareResult: resource_id: str | None = None if self._lifecycle_manager is not None and self._session is not None: env = self._session.env + env_map = dict(env) if isinstance(env, Mapping) else {} resource_id = self._lifecycle_manager.register_container( trial_id=self.trial_id, benchmark=benchmark, @@ -151,9 +152,10 @@ async def prepare_phase(self) -> ContainerPrepareResult: spec_hash=self._container_spec.spec_hash, cleanup_policy=self._container_spec.cleanup_policy, debug_preserve=self._container_spec.debug_preserve_default, - container_id=getattr(env, "container_id", None), - compose_project=getattr(env, "compose_project", None), - compose_file=getattr(env, "compose_file", None), + container_id=getattr(env, "container_id", None) or env_map.get("container_id"), + compose_project=getattr(env, "compose_project", None) or env_map.get("compose_project"), + compose_file=getattr(env, "compose_file", None) or env_map.get("compose_file"), + workspace_dir=dict(self._session.metadata).get("workspace_dir"), session_kind=getattr(self._session, "kind", None), provider_metadata={ **dict(self._session.metadata), diff --git a/snowl/runtime/engine.py b/snowl/runtime/engine.py index dae9f925..b3516072 100644 --- a/snowl/runtime/engine.py +++ b/snowl/runtime/engine.py @@ -31,8 +31,16 @@ from snowl.envs.sandbox_runtime import SandboxRuntime, WarmPoolSandboxRuntime from snowl.errors import SnowlValidationError from snowl.runtime.container_lifecycle import RuntimeContainerLifecycleManager +from snowl.runtime.container_contract import resolve_runtime_container_spec from snowl.runtime.container_runtime import ContainerPrepareResult, ContainerRuntime from snowl.runtime.resource_scheduler import TaskExecutionPlan, TrialDescriptor +from snowl.runtime.workspace import ( + RuntimeWorkspaceManager, + RuntimeWorkspaceSession, + diff_workspace, + resolve_workspace_spec, + snapshot_workspace, +) from snowl.ui.contracts import build_score_explanations _DEFAULT_SANDBOX_RUNTIME = WarmPoolSandboxRuntime() @@ -90,6 +98,7 @@ class PreparedTrial: sandbox_runtime: SandboxRuntime container_runtime: ContainerRuntime container_prepare: ContainerPrepareResult + workspace_session: RuntimeWorkspaceSession | None = None prepared_sandbox: Any | None = None original_max_steps: int | None = None failed_partial: PartialTrialResult | None = None @@ -282,6 +291,27 @@ def _build_score_context(request: TrialRequest, *, sample_id: str | None) -> Sco ) +def _score_context_for_prepared( + prepared: PreparedTrial, + *, + extra_sample_metadata: Mapping[str, Any] | None = None, +) -> ScoreContext: + sample_meta = dict(prepared.request.sample.get("metadata", {}) or {}) + context_sample = prepared.context.metadata.get("sample") + if isinstance(context_sample, Mapping): + context_meta = context_sample.get("metadata") + if isinstance(context_meta, Mapping): + sample_meta.update(dict(context_meta)) + sample_meta.update(dict(extra_sample_metadata or {})) + return ScoreContext( + task_id=prepared.request.task.task_id, + agent_id=getattr(prepared.request.agent, "agent_id"), + sample_id=prepared.sample_id, + task_metadata=prepared.request.task.metadata, + sample_metadata=sample_meta, + ) + + def _emit_factory(request: TrialRequest) -> Callable[[dict[str, Any]], None]: def _emit(event: dict[str, Any]) -> None: if request.on_event is None: @@ -373,6 +403,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: variant_id = str(getattr(request.agent, "variant_id", "default")) variant_model = getattr(request.agent, "model", None) emit = _emit_factory(request) + sample_for_runtime = dict(request.sample) emit( { @@ -394,7 +425,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: task_id=request.task.task_id, sample_id=sample_id, metadata={ - "sample": dict(request.sample), + "sample": sample_for_runtime, "task_metadata": request.task.metadata, "variant_id": variant_id, "model": variant_model, @@ -402,6 +433,102 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: }, ) + workspace_session: RuntimeWorkspaceSession | None = None + try: + pre_container_spec = resolve_runtime_container_spec( + task_metadata=request.task.metadata, + sample=request.sample, + ) + workspace_spec = resolve_workspace_spec( + task_metadata=request.task.metadata, + sample=request.sample, + container_startup=pre_container_spec.startup, + container_workspace=pre_container_spec.workspace, + ) + workspace_session = RuntimeWorkspaceManager( + run_id=request.run_id, + trial_id=request.trial_id, + task_id=request.task.task_id, + sample_id=sample_id, + spec=workspace_spec, + ).prepare() + if workspace_session is not None: + sample_meta = dict(sample_for_runtime.get("metadata", {}) or {}) + runtime_container = dict(sample_meta.get("runtime_container", {}) or {}) + startup = dict(runtime_container.get("startup", {}) or {}) + workspace_contract = dict(runtime_container.get("workspace", {}) or {}) + startup["workspace_dir"] = workspace_session.workspace_dir + workspace_contract["workspace_dir"] = workspace_session.workspace_dir + runtime_container["startup"] = startup + runtime_container["workspace"] = workspace_contract + sample_meta.update( + { + "runtime_container": runtime_container, + "workspace_dir": workspace_session.workspace_dir, + "workspace_before": dict(workspace_session.before), + "workspace_spec": workspace_session.spec.to_metadata(), + } + ) + sample_for_runtime["metadata"] = sample_meta + context.metadata["sample"] = sample_for_runtime + context.metadata["__snowl_workspace"] = { + "workspace_dir": workspace_session.workspace_dir, + "before": dict(workspace_session.before), + } + emit( + { + "event": "runtime.workspace.prepared", + "phase": "prepare", + "workspace_dir": workspace_session.workspace_dir, + "file_count": len(workspace_session.before), + } + ) + except Exception as exc: + container_runtime = ContainerRuntime( + run_id=request.run_id, + trial_id=request.trial_id, + task_id=request.task.task_id, + agent_id=getattr(request.agent, "agent_id"), + variant_id=variant_id, + task_env_type=request.task.env_spec.env_type, + task_metadata=request.task.metadata, + sample=sample_for_runtime, + emit=emit, + lifecycle_manager=request.container_lifecycle, + ) + return PreparedTrial( + request=request, + started_ms=started, + sample_id=sample_id, + variant_id=variant_id, + variant_model=variant_model, + state=state, + context=context, + resolved_tool_specs=[], + sandbox_runtime=request.sandbox_runtime or _DEFAULT_SANDBOX_RUNTIME, + container_runtime=container_runtime, + container_prepare=ContainerPrepareResult( + session=None, + requires_container=False, + requires_build=False, + spec_hash=None, + prepare_provider_ids=(), + metadata={}, + ), + workspace_session=workspace_session, + failed_partial=_error_partial( + request, + started_ms=started, + sample_id=sample_id, + variant_id=variant_id, + variant_model=variant_model, + code="workspace_prepare_error", + message=str(exc), + phase="prepare", + trace_event="runtime.workspace.error", + ), + ) + container_runtime = ContainerRuntime( run_id=request.run_id, trial_id=request.trial_id, @@ -410,7 +537,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: variant_id=variant_id, task_env_type=request.task.env_spec.env_type, task_metadata=request.task.metadata, - sample=request.sample, + sample=sample_for_runtime, emit=emit, lifecycle_manager=request.container_lifecycle, ) @@ -441,6 +568,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: sandbox_runtime=request.sandbox_runtime or _DEFAULT_SANDBOX_RUNTIME, container_runtime=container_runtime, container_prepare=container_prepare, + workspace_session=None, failed_partial=_error_partial( request, started_ms=started, @@ -475,6 +603,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: sandbox_runtime=request.sandbox_runtime or _DEFAULT_SANDBOX_RUNTIME, container_runtime=container_runtime, container_prepare=container_prepare, + workspace_session=workspace_session, failed_partial=_error_partial( request, started_ms=started, @@ -505,6 +634,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: sandbox_runtime=request.sandbox_runtime or _DEFAULT_SANDBOX_RUNTIME, container_runtime=container_runtime, container_prepare=container_prepare, + workspace_session=workspace_session, failed_partial=_error_partial( request, started_ms=started, @@ -535,6 +665,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: sandbox_runtime=request.sandbox_runtime or _DEFAULT_SANDBOX_RUNTIME, container_runtime=container_runtime, container_prepare=container_prepare, + workspace_session=workspace_session, failed_partial=_error_partial( request, started_ms=started, @@ -587,6 +718,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: sandbox_runtime=sandbox_runtime, container_runtime=container_runtime, container_prepare=container_prepare, + workspace_session=workspace_session, original_max_steps=original_max_steps, failed_partial=_error_partial( request, @@ -613,6 +745,7 @@ async def prepare_trial_phase(request: TrialRequest) -> PreparedTrial: sandbox_runtime=sandbox_runtime, container_runtime=container_runtime, container_prepare=container_prepare, + workspace_session=workspace_session, prepared_sandbox=prepared_sandbox, original_max_steps=original_max_steps, failed_partial=None, @@ -814,10 +947,42 @@ async def _agent_run(): **dict(prepared.container_prepare.metadata), } + workspace_score_metadata: dict[str, Any] = {} + if prepared.workspace_session is not None: + after = snapshot_workspace(prepared.workspace_session.workspace_dir) + diff = diff_workspace(prepared.workspace_session.before, after) + workspace_score_metadata = { + "workspace_dir": prepared.workspace_session.workspace_dir, + "workspace_before": dict(prepared.workspace_session.before), + "workspace_after": after, + "workspace_diff": diff, + } + payload["workspace"] = { + "workspace_dir": prepared.workspace_session.workspace_dir, + "before_file_count": len(prepared.workspace_session.before), + "after_file_count": len(after), + "diff": diff, + } + trace["workspace"] = payload["workspace"] + emit( + { + "event": "runtime.workspace.snapshot", + "phase": "execute", + "workspace_dir": prepared.workspace_session.workspace_dir, + "before_file_count": len(prepared.workspace_session.before), + "after_file_count": len(after), + "changed": list(diff.get("changed") or []), + "deleted": list(diff.get("deleted") or []), + } + ) + return PartialTrialResult( task_result=task_result, trace=trace, - score_context=_build_score_context(request, sample_id=prepared.sample_id), + score_context=_score_context_for_prepared( + prepared, + extra_sample_metadata=workspace_score_metadata, + ), ) @@ -1033,6 +1198,16 @@ async def finalize_trial_phase( if container_close is not None: payload["container_finalize"] = dict(container_close) trace["container_finalize"] = dict(container_close) + if prepared.workspace_session is not None: + after = snapshot_workspace(prepared.workspace_session.workspace_dir) + diff = diff_workspace(prepared.workspace_session.before, after) + payload["workspace"] = { + "workspace_dir": prepared.workspace_session.workspace_dir, + "before_file_count": len(prepared.workspace_session.before), + "after_file_count": len(after), + "diff": diff, + } + trace["workspace"] = payload["workspace"] if finalize_error is not None: task_result = TaskResult( diff --git a/snowl/runtime/resource_scheduler.py b/snowl/runtime/resource_scheduler.py index 38520877..cbc8f892 100644 --- a/snowl/runtime/resource_scheduler.py +++ b/snowl/runtime/resource_scheduler.py @@ -300,7 +300,7 @@ async def begin_finalize(self, plan: TaskExecutionPlan | None = None) -> AsyncIt async with _AsyncSemaphoreContext( sem=None, wait_phase="finalize", - active_key=None, + active_key="finalizing", phase_name="finalize", scheduler=self, ): diff --git a/snowl/runtime/workspace.py b/snowl/runtime/workspace.py new file mode 100644 index 00000000..6190a7e4 --- /dev/null +++ b/snowl/runtime/workspace.py @@ -0,0 +1,217 @@ +"""Runtime-owned per-trial workspace materialization and snapshots.""" + +from __future__ import annotations + +import hashlib +import shutil +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Mapping + + +def _as_mapping(value: Any) -> dict[str, Any]: + if isinstance(value, Mapping): + return {str(k): v for k, v in value.items()} + return {} + + +def _safe_part(value: str) -> str: + out = "".join(ch if ch.isalnum() or ch in "._-" else "-" for ch in str(value or "")) + return out.strip("-") or "workspace" + + +@dataclass(frozen=True) +class RuntimeWorkspaceSpec: + enabled: bool = False + root: str | None = None + source_dir: str | None = None + repo_files: dict[str, Any] = field(default_factory=dict) + seed_files: dict[str, Any] = field(default_factory=dict) + artifacts: tuple[str, ...] = () + preserve: bool = False + + def to_metadata(self) -> dict[str, Any]: + return { + "enabled": self.enabled, + "root": self.root, + "source_dir": self.source_dir, + "repo_file_count": len(self.repo_files), + "seed_file_count": len(self.seed_files), + "artifacts": list(self.artifacts), + "preserve": self.preserve, + } + + +@dataclass(frozen=True) +class RuntimeWorkspaceSession: + workspace_dir: str + before: dict[str, str] + spec: RuntimeWorkspaceSpec + + +def resolve_workspace_spec( + *, + task_metadata: Mapping[str, Any] | None, + sample: Mapping[str, Any] | None, + container_startup: Mapping[str, Any] | None = None, + container_workspace: Mapping[str, Any] | None = None, +) -> RuntimeWorkspaceSpec: + task_meta = _as_mapping(task_metadata) + sample_row = _as_mapping(sample) + sample_meta = _as_mapping(sample_row.get("metadata")) + startup = _as_mapping(container_startup) + + task_workspace = _as_mapping(task_meta.get("workspace")) + sample_workspace = _as_mapping(sample_meta.get("workspace")) + contract_workspace = _as_mapping(container_workspace) + workspace = {**task_workspace, **contract_workspace, **sample_workspace} + + repo_files = _as_mapping( + workspace.get("repo_files") + or sample_meta.get("repo_files") + or startup.get("repo_files") + ) + seed_files = _as_mapping(workspace.get("seed_files") or startup.get("seed_files")) + source_dir = workspace.get("source_dir") or startup.get("source_dir") + enabled = bool( + workspace.get("enabled", False) + or repo_files + or seed_files + or source_dir + ) + raw_artifacts = workspace.get("artifacts") or startup.get("artifacts") or () + if isinstance(raw_artifacts, str): + artifacts = (raw_artifacts,) + elif isinstance(raw_artifacts, (list, tuple)): + artifacts = tuple(str(item) for item in raw_artifacts if str(item).strip()) + else: + artifacts = () + return RuntimeWorkspaceSpec( + enabled=enabled, + root=(str(workspace.get("root")) if workspace.get("root") else None), + source_dir=(str(source_dir) if source_dir else None), + repo_files=repo_files, + seed_files=seed_files, + artifacts=artifacts, + preserve=bool(workspace.get("preserve", False)), + ) + + +class RuntimeWorkspaceManager: + def __init__( + self, + *, + run_id: str | None, + trial_id: str | None, + task_id: str, + sample_id: str | None, + spec: RuntimeWorkspaceSpec, + ) -> None: + self.run_id = run_id + self.trial_id = trial_id + self.task_id = task_id + self.sample_id = sample_id + self.spec = spec + + def prepare(self) -> RuntimeWorkspaceSession | None: + if not self.spec.enabled: + return None + workspace_dir = self._workspace_dir() + if workspace_dir.exists() and not self.spec.preserve: + shutil.rmtree(workspace_dir) + workspace_dir.mkdir(parents=True, exist_ok=True) + if self.spec.source_dir: + self._copy_source(Path(self.spec.source_dir), workspace_dir) + self._write_files(workspace_dir, self.spec.repo_files) + self._write_files(workspace_dir, self.spec.seed_files) + before = snapshot_workspace(workspace_dir) + return RuntimeWorkspaceSession( + workspace_dir=str(workspace_dir), + before=before, + spec=self.spec, + ) + + def _workspace_dir(self) -> Path: + if self.spec.root: + root = Path(self.spec.root) + else: + root = Path.cwd() / ".snowl" / "workspaces" + name_basis = "|".join( + [ + str(self.run_id or "run"), + str(self.trial_id or ""), + str(self.task_id or ""), + str(self.sample_id or ""), + ] + ) + digest = hashlib.sha1(name_basis.encode("utf-8")).hexdigest()[:10] + return ( + root + / _safe_part(str(self.run_id or "local")) + / f"{_safe_part(str(self.sample_id or self.task_id))}-{digest}" + ).resolve() + + def _copy_source(self, source: Path, target: Path) -> None: + source = source.resolve() + if not source.exists() or not source.is_dir(): + raise RuntimeError(f"workspace source_dir not found: {source}") + for child in source.iterdir(): + dest = target / child.name + if child.is_dir(): + shutil.copytree(child, dest, dirs_exist_ok=True) + else: + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(child, dest) + + def _write_files(self, root: Path, files: Mapping[str, Any]) -> None: + for rel_path, content in files.items(): + raw_rel = str(rel_path).strip() + if not raw_rel or raw_rel.startswith("/") or ".." in Path(raw_rel).parts: + raise RuntimeError(f"unsafe workspace file path: {raw_rel}") + path = root / raw_rel + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(str(content), encoding="utf-8") + + +def snapshot_workspace(root: str | Path) -> dict[str, str]: + root_path = Path(root) + if not root_path.exists(): + return {} + out: dict[str, str] = {} + for path in sorted(p for p in root_path.rglob("*") if p.is_file()): + rel = path.relative_to(root_path).as_posix() + if rel.startswith(".snowl/"): + continue + try: + out[rel] = path.read_text(encoding="utf-8", errors="replace") + except Exception: + out[rel] = f"" + return out + + +def diff_workspace(before: Mapping[str, str], after: Mapping[str, str]) -> dict[str, Any]: + before_map = {str(k): str(v) for k, v in before.items()} + after_map = {str(k): str(v) for k, v in after.items()} + added = sorted(path for path in after_map if path not in before_map) + modified = sorted(path for path in after_map if path in before_map and before_map[path] != after_map[path]) + deleted = sorted(path for path in before_map if path not in after_map) + return { + "added": added, + "modified": modified, + "deleted": deleted, + "changed": sorted(added + modified), + "file_count_before": len(before_map), + "file_count_after": len(after_map), + "snapshotted_at_ms": int(time.time() * 1000), + } + + +__all__ = [ + "RuntimeWorkspaceManager", + "RuntimeWorkspaceSession", + "RuntimeWorkspaceSpec", + "diff_workspace", + "resolve_workspace_spec", + "snapshot_workspace", +] diff --git a/tests/test_container_lifecycle.py b/tests/test_container_lifecycle.py index b1a2377b..1881538b 100644 --- a/tests/test_container_lifecycle.py +++ b/tests/test_container_lifecycle.py @@ -50,6 +50,40 @@ def test_runtime_container_contract_merges_task_and_sample_layers() -> None: assert spec.spec_hash is not None +def test_runtime_container_contract_v2_fields_and_legacy_startup() -> None: + spec = resolve_runtime_container_spec( + task_metadata={ + "benchmark": "ipi_coding_agent", + "runtime_container": { + "provider_name": "docker_container", + "requires_container": True, + "network": "disabled", + "env": {"A": "1"}, + "workspace": {"enabled": True, "repo_files": {"a.txt": "x"}}, + "resource_limits": {"start_timeout_seconds": 10}, + }, + }, + sample={ + "id": "sample-1", + "metadata": { + "runtime_container": { + "startup": {"image": "python:3.12", "verification_command": "pytest -q"}, + "init_command": "pip install -e .", + "env": {"B": "2"}, + } + }, + }, + ) + + assert spec.provider_name == "docker_container" + assert spec.network == "disabled" + assert spec.env == {"A": "1", "B": "2"} + assert spec.workspace["enabled"] is True + assert spec.init_command == "pip install -e ." + assert spec.check_command == "pytest -q" + assert spec.resource_limits["start_timeout_seconds"] == 10 + + def test_runtime_container_lifecycle_register_release_destroy_default() -> None: torn_down: list[str] = [] emitted: list[dict[str, object]] = [] diff --git a/tests/test_container_runtime_providers.py b/tests/test_container_runtime_providers.py index 8b0b935c..245e1b40 100644 --- a/tests/test_container_runtime_providers.py +++ b/tests/test_container_runtime_providers.py @@ -8,6 +8,7 @@ ContainerProviderContext, ContainerProviderRegistry, ContainerSession, + DockerContainerProvider, OSWorldProvider, TerminalBenchProvider, default_container_provider_registry, @@ -136,6 +137,7 @@ def test_default_provider_registry_contains_terminalbench_and_osworld() -> None: registry = default_container_provider_registry() assert registry.resolve("terminalbench") is not None assert registry.resolve("osworld") is not None + assert registry.resolve("docker_container") is not None def test_terminalbench_provider_emits_compatible_lifecycle_events(monkeypatch, tmp_path: Path) -> None: @@ -308,6 +310,124 @@ def _context(variant_id: str) -> ContainerProviderContext: assert session_v1.env.compose_env["T_BENCH_TASK_DOCKER_CLIENT_IMAGE_NAME"] != session_v2.env.compose_env["T_BENCH_TASK_DOCKER_CLIENT_IMAGE_NAME"] +def test_compose_terminal_provider_runs_lifecycle_commands(monkeypatch, tmp_path: Path) -> None: + events: list[dict[str, object]] = [] + + class _FakeTerminalEnv: + def __init__(self, **kwargs): # type: ignore[no-untyped-def] + self.compose_project = kwargs.get("compose_project") + self.compose_file = kwargs.get("compose_file") + self.compose_service = kwargs.get("compose_service") + self.compose_env = dict(kwargs.get("compose_env") or {}) + self.use_docker_compose = False + self.commands: list[str] = [] + + def exec(self, command, timeout_seconds=None): # type: ignore[no-untyped-def] + _ = timeout_seconds + self.commands.append(str(command)) + return {"command": command, "exit_code": 0, "duration_ms": 1, "stdout": "ok", "stderr": ""} + + monkeypatch.setattr("snowl.runtime.container_providers.TerminalEnv", _FakeTerminalEnv) + provider = default_container_provider_registry().resolve("compose_terminal") + assert provider is not None + context = ContainerProviderContext( + run_id="run-1", + trial_id="trial-1", + task_id="task-1", + agent_id="agent-1", + variant_id="v1", + task_env_type="terminal", + task_metadata={"benchmark": "agent_bench_os"}, + sample={"id": "sample-1"}, + container_spec=resolve_runtime_container_spec( + task_metadata={"benchmark": "agent_bench_os"}, + sample={ + "id": "sample-1", + "metadata": { + "runtime_container": { + "benchmark": "agent_bench_os", + "provider_name": "compose_terminal", + "requires_container": True, + "init_command": "echo init", + "check_command": "echo check", + "workspace": {"workspace_dir": str(tmp_path)}, + } + }, + }, + ), + emit=events.append, + ) + session = asyncio.run(provider.prepare(context)) + close_out = asyncio.run(provider.close(context, session)) + + assert session.metadata["workspace_dir"] == str(tmp_path) + assert session.env.commands == ["echo init", "echo check"] + assert close_out["exit_code"] == 0 + names = [str(evt.get("event")) for evt in events] + assert "compose_terminal.container.init.finish" in names + assert "compose_terminal.container.check.finish" in names + + +def test_docker_container_provider_lifecycle_with_mock_backend(monkeypatch, tmp_path: Path) -> None: + events: list[dict[str, object]] = [] + calls: list[list[str]] = [] + + class _FakeRunner: + def __init__(self, cwd=None): # type: ignore[no-untyped-def] + self.cwd = cwd + + def run(self, cmd, **kwargs): # type: ignore[no-untyped-def] + _ = kwargs + calls.append(list(cmd)) + stdout = "container-123\n" if cmd[:2] == ["docker", "run"] else "ok\n" + return {"command": list(cmd), "stdout": stdout, "stderr": "", "exit_code": 0, "duration_ms": 1} + + monkeypatch.setattr("snowl.runtime.container_providers.CommandRunner", _FakeRunner) + monkeypatch.setattr("snowl.runtime.container_providers.shutil.which", lambda _name: "/usr/bin/docker") + provider = DockerContainerProvider() + context = ContainerProviderContext( + run_id="run-1", + trial_id="trial-1", + task_id="task-1", + agent_id="agent-1", + variant_id="v1", + task_env_type="terminal", + task_metadata={"benchmark": "ipi_coding_agent"}, + sample={"id": "sample-1"}, + container_spec=resolve_runtime_container_spec( + task_metadata={"benchmark": "ipi_coding_agent"}, + sample={ + "id": "sample-1", + "metadata": { + "runtime_container": { + "benchmark": "ipi_coding_agent", + "provider_name": "docker_container", + "requires_container": True, + "network": "disabled", + "init_command": "echo init", + "check_command": "echo check", + "workspace": {"workspace_dir": str(tmp_path)}, + "startup": {"image": "python:3.12", "workspace_dir": str(tmp_path)}, + } + }, + }, + ), + emit=events.append, + ) + + session = asyncio.run(provider.prepare(context)) + close_out = asyncio.run(provider.close(context, session)) + + assert session.kind == "docker_container" + assert session.metadata["container_id"] == "container-123" + assert close_out["exit_code"] == 0 + rendered = [" ".join(call) for call in calls] + assert any("--network none" in call for call in rendered) + assert any("docker exec" in call and "echo init" in call for call in rendered) + assert any("docker exec" in call and "echo check" in call for call in rendered) + assert any("docker rm -f container-123" in call for call in rendered) + + def test_osworld_provider_prepare_and_close_emit_events(monkeypatch) -> None: events: list[dict[str, object]] = [] diff --git a/tests/test_resource_scheduler.py b/tests/test_resource_scheduler.py index 2f18b8f9..c526e048 100644 --- a/tests/test_resource_scheduler.py +++ b/tests/test_resource_scheduler.py @@ -8,7 +8,7 @@ from snowl.core import SandboxSpec from snowl.envs.sandbox_runtime import PreparedSandbox -from snowl.runtime.resource_scheduler import ResourceScheduler +from snowl.runtime.resource_scheduler import ResourceScheduler, TaskExecutionPlan, TrialDescriptor def test_trial_slots_enforce_quota() -> None: @@ -224,3 +224,35 @@ async def _second_prepare() -> None: await wrapped.teardown(extra) asyncio.run(_run()) + + +def test_scheduler_records_independent_phase_admission() -> None: + scheduler = ResourceScheduler(max_running_trials=1, max_container_slots=1, max_scoring_tasks=1) + + async def _run() -> None: + descriptor = TrialDescriptor( + trial_id="trial-1", + task_id="task-1", + sample_id="sample-1", + agent_id="agent-1", + variant_id="v1", + scorer_id="s1", + seed=None, + spec_hash="abc", + provider_ids=(), + ) + plan = TaskExecutionPlan(trial=descriptor, requires_container=True) + async with scheduler.begin_prepare(plan): + assert scheduler.stats_snapshot()["active"]["container_slots"] == 1 + async with scheduler.begin_execute(plan): + assert scheduler.stats_snapshot()["active"]["running_trials"] == 1 + async with scheduler.begin_score(plan): + assert scheduler.stats_snapshot()["active"]["scoring_tasks"] == 1 + async with scheduler.begin_finalize(plan): + assert scheduler.stats_snapshot()["active"]["finalizing"] == 1 + + asyncio.run(_run()) + stats = scheduler.stats_snapshot() + assert stats["active"]["container_slots"] == 0 + assert stats["active"]["running_trials"] == 0 + assert stats["active"]["scoring_tasks"] == 0 diff --git a/tests/test_runtime_workspace.py b/tests/test_runtime_workspace.py new file mode 100644 index 00000000..1aa2bb2b --- /dev/null +++ b/tests/test_runtime_workspace.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from pathlib import Path + +from snowl.runtime.workspace import RuntimeWorkspaceManager, RuntimeWorkspaceSpec, diff_workspace, snapshot_workspace + + +def test_runtime_workspace_materializes_files_and_diffs(tmp_path: Path) -> None: + manager = RuntimeWorkspaceManager( + run_id="run-1", + trial_id="trial-1", + task_id="task-1", + sample_id="sample-1", + spec=RuntimeWorkspaceSpec( + enabled=True, + root=str(tmp_path), + repo_files={"src/app.py": "old", "README.md": "hello"}, + ), + ) + session = manager.prepare() + assert session is not None + workspace = Path(session.workspace_dir) + assert (workspace / "src" / "app.py").read_text(encoding="utf-8") == "old" + (workspace / "src" / "app.py").write_text("new", encoding="utf-8") + (workspace / "notes.txt").write_text("created", encoding="utf-8") + (workspace / "README.md").unlink() + + after = snapshot_workspace(workspace) + diff = diff_workspace(session.before, after) + + assert diff["added"] == ["notes.txt"] + assert diff["modified"] == ["src/app.py"] + assert diff["deleted"] == ["README.md"]