diff --git a/CHANGELOG.md b/CHANGELOG.md index a6b8a77..e09713c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,34 @@ All notable changes to this project are documented here. ### Added +**Deterministic replay** + +- `RLM.Replay` — replay orchestrator that re-executes a previously recorded run using + the original LLM responses. Replays re-evaluate all code but skip live LLM calls, + enabling debugging, regression testing, and model comparison. Supports: + - `:patch` option — `%{iteration => code}` map to replace code at specific iterations + while maintaining tape alignment (LLM response is still consumed) + - `:config` option — config overrides for the replay run +- `RLM.Replay.Tape` — struct representing an ordered sequence of recorded LLM responses, + with `from_events/1` builder that sources from EventLog Agent or TraceStore fallback +- `RLM.Replay.LLM` — `RLM.LLM` behaviour implementation that returns responses from a + tape via process dictionary (matching the existing process-dict pattern in `RLM.Eval`) +- `RLM.replay/2` — public API delegation to `RLM.Replay.replay/2` +- `enable_replay_recording` config flag (default: `false`) — when enabled, records the + full raw LLM response text for each iteration as `:llm_response` events in the trace +- `[:rlm, :llm, :response, :recorded]` telemetry event — emitted by Worker after each + successful LLM call when recording is enabled +- `original_context` and `original_query` fields in `:node_start` events for depth-0 + workers — enables replay to recover the original inputs +- `:replay_patches` field on `RLM.Worker` struct — applied before eval to substitute + code at specific iterations during replay +- `RLM.Replay.FallbackLLM` — LLM behaviour impl that tries tape entries first, + then falls back to a live LLM module when the tape is exhausted +- `:fallback` option on `RLM.replay/2` — `:error` (default) or `:live` to switch + to live LLM calls when the tape runs out (e.g., because a patch caused extra iterations) +- 17 tests covering recording, tape construction, replay LLM, replay orchestration, + patching, fallback behavior, and the public API + **Distributed Erlang node support** - `RLM.Node` — lightweight wrapper for OTP distribution with three public functions: diff --git a/CLAUDE.md b/CLAUDE.md index 2673d95..467c03d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -64,7 +64,7 @@ rlm/ Enforced at compile time via the `boundary` library: -- **`RLM`** — Core engine. Zero web dependencies. Exports: Config, Run, Worker, EventLog, TraceStore, Helpers, Span, IEx, Node, Telemetry, Tool, ToolRegistry. +- **`RLM`** — Core engine. Zero web dependencies. Exports: Config, Run, Worker, EventLog, TraceStore, Helpers, Span, IEx, Node, Replay, Replay.Tape, Replay.LLM, Replay.FallbackLLM, Telemetry, Tool, ToolRegistry. - **`RLMWeb`** — Phoenix web layer. Depends on `RLM`. Exports: Endpoint. - **`RLM.Application`** — Top-level. Depends on both `RLM` and `RLMWeb`. Starts the unified supervision tree. @@ -180,7 +180,7 @@ Default models: | Module | Purpose | |---|---| -| `RLM` | Public API: `run/3`, `start_session/1`, `send_message/3`, `history/1`, `status/1` | +| `RLM` | Public API: `run/3`, `replay/2`, `start_session/1`, `send_message/3`, `history/1`, `status/1` | | `RLM.Run` | Per-run coordinator; owns worker DynSup + eval TaskSup, ETS worker tree, crash propagation | | `RLM.Config` | Config struct; loads from app env + keyword overrides | | `RLM.Worker` | GenServer per execution node; iterate loop + keep_alive mode; delegates spawning to Run | @@ -202,6 +202,10 @@ Default models: | `RLM.Telemetry.Logger` | Structured logging handler | | `RLM.Telemetry.PubSub` | Phoenix.PubSub broadcast handler | | `RLM.Telemetry.EventLogHandler` | Routes telemetry events to EventLog Agent AND TraceStore | +| `RLM.Replay` | Replay orchestrator: `replay/2` replays a recorded run with optional code patches | +| `RLM.Replay.Tape` | Tape struct + `from_events/1` builder; ordered sequence of recorded LLM responses | +| `RLM.Replay.LLM` | LLM behaviour impl that returns responses from a tape (process-dict based) | +| `RLM.Replay.FallbackLLM` | LLM behaviour impl that tries tape first, then falls back to a live LLM module | | `RLM.Application` | OTP application; starts unified supervision tree (core + web) | ### Filesystem Tools @@ -262,6 +266,7 @@ Read-only Phoenix LiveView dashboard. Serves on `http://localhost:4000`. | `enable_otel` | `false` | Enable OpenTelemetry integration | | `enable_event_log` | `true` | Enable per-run EventLog trace agents | | `event_log_capture_full_stdout` | `false` | Store full stdout in traces (vs truncated) | +| `enable_replay_recording` | `false` | Record full LLM responses for deterministic replay | | `llm_module` | `RLM.LLM` | Swappable for `RLM.Test.MockLLM` | ## Testing Conventions diff --git a/README.md b/README.md index e25a14f..8a98374 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ runs in a persistent REPL, with recursive sub-agent spawning and built-in filesy - [Interactive sessions](#interactive-sessions) - [IEx helpers](#iex-helpers) - [Configuration overrides](#configuration-overrides) + - [Deterministic replay](#deterministic-replay) - [Sandbox API](#sandbox-api) - [Architecture](#architecture) - [OTP supervision tree](#otp-supervision-tree) @@ -143,6 +144,31 @@ watch(session) # attach a live telemetry stream ) ``` +### Deterministic replay + +Record a run and replay it later — useful for debugging, regression testing, and cost +optimization (replay re-evaluates all code without making LLM calls). + +```elixir +# Step 1: Record a run (enable_replay_recording captures full LLM responses) +{:ok, answer, run_id} = RLM.run(context, query, enable_replay_recording: true) + +# Step 2: Replay — same code runs, same result, no LLM calls +{:ok, same_answer, replay_run_id} = RLM.replay(run_id) + +# Step 3: Replay with a patch — try different code at a specific iteration +{:ok, new_answer, _} = RLM.replay(run_id, + patch: %{0 => ~s(final_answer = "patched result")} +) + +# Step 4: Patch + fallback — if the patch causes extra iterations, +# fall back to live LLM calls instead of erroring +{:ok, answer, _} = RLM.replay(run_id, + patch: %{0 => "x = 42"}, + fallback: :live +) +``` + ### Sandbox API Functions available inside the LLM's eval'd code: diff --git a/lib/rlm.ex b/lib/rlm.ex index 196534e..a061618 100644 --- a/lib/rlm.ex +++ b/lib/rlm.ex @@ -11,6 +11,10 @@ defmodule RLM do Span, IEx, Node, + Replay, + Replay.Tape, + Replay.LLM, + Replay.FallbackLLM, Telemetry, Telemetry.PubSub, Tool, @@ -209,6 +213,31 @@ defmodule RLM do :exit, _ -> {:error, :not_found} end + # --------------------------------------------------------------------------- + # Replay API + # --------------------------------------------------------------------------- + + @doc """ + Replay a previously recorded run. + + Requires the original run to have been executed with `enable_replay_recording: true`. + The replay re-executes all eval'd code using the recorded LLM responses. + + ## Options + + * `:patch` — `%{iteration => code}` map to replace code at specific iterations + * `:fallback` — `:error` (default) or `:live` to switch to live LLM calls + when the tape runs out (e.g., because a patch caused extra iterations) + * `:config` — config overrides for the replay run (set `llm_module` here + to control which module handles live fallback calls) + + Returns `{:ok, answer, replay_run_id}` or `{:error, reason}`. + """ + @spec replay(String.t(), keyword()) :: {:ok, any(), String.t()} | {:error, any()} + def replay(run_id, opts \\ []) do + RLM.Replay.replay(run_id, opts) + end + # --------------------------------------------------------------------------- # Run management # --------------------------------------------------------------------------- diff --git a/lib/rlm/config.ex b/lib/rlm/config.ex index 1b60440..e627717 100644 --- a/lib/rlm/config.ex +++ b/lib/rlm/config.ex @@ -26,6 +26,7 @@ defmodule RLM.Config do :enable_otel, :enable_event_log, :event_log_capture_full_stdout, + :enable_replay_recording, :llm_module ] @@ -57,6 +58,7 @@ defmodule RLM.Config do enable_otel: get(overrides, :enable_otel, false), enable_event_log: get(overrides, :enable_event_log, true), event_log_capture_full_stdout: get(overrides, :event_log_capture_full_stdout, false), + enable_replay_recording: get(overrides, :enable_replay_recording, false), llm_module: get(overrides, :llm_module, RLM.LLM) } end diff --git a/lib/rlm/replay.ex b/lib/rlm/replay.ex new file mode 100644 index 0000000..97d8fdf --- /dev/null +++ b/lib/rlm/replay.ex @@ -0,0 +1,109 @@ +defmodule RLM.Replay do + @moduledoc """ + Replay a previously recorded RLM run. + + Replays use the recorded LLM responses but re-execute all eval'd code. + This verifies that the same code still works against the current environment. + + ## Options + + * `:patch` — `%{iteration => code}` map. At the specified iteration, the + patched code replaces the LLM's code before eval. The LLM response is + still consumed from the tape (to maintain iteration alignment). + * `:fallback` — what to do when the tape runs out: + - `:error` (default) — return an error + - `:live` — switch to live LLM calls for remaining iterations + * `:config` — config overrides applied to the replay run. When using + `fallback: :live`, set `llm_module` here to control which module + handles the live calls (defaults to `RLM.LLM`). + """ + + @spec replay(String.t(), keyword()) :: {:ok, any(), String.t()} | {:error, any()} + def replay(run_id, opts \\ []) do + patches = Keyword.get(opts, :patch, %{}) + config_overrides = Keyword.get(opts, :config, []) + fallback = Keyword.get(opts, :fallback, :error) + + with {:ok, tape} <- RLM.Replay.Tape.from_events(run_id) do + # Resolve the LLM module: tape-only or tape-with-fallback + {llm_module, fallback_module} = + case fallback do + :live -> + # The fallback module comes from config overrides, or the default + user_config = RLM.Config.load(config_overrides) + {RLM.Replay.FallbackLLM, user_config.llm_module} + + :error -> + {RLM.Replay.LLM, nil} + end + + config = + RLM.Config.load( + Keyword.merge(config_overrides, + llm_module: llm_module, + enable_replay_recording: false + ) + ) + + # Extract original context and query from the tape + context = tape.context || "" + query = tape.query || context + + replay_run_id = RLM.Span.generate_run_id() + span_id = RLM.Span.generate_id() + + run_opts = [run_id: replay_run_id, config: config] + + case DynamicSupervisor.start_child(RLM.RunSup, {RLM.Run, run_opts}) do + {:ok, run_pid} -> + worker_opts = [ + span_id: span_id, + run_id: replay_run_id, + context: context, + query: query, + config: config, + depth: 0, + model: config.model_large, + caller: self(), + replay_tape: tape, + replay_patches: patches, + replay_fallback_module: fallback_module + ] + + case RLM.Run.start_worker(run_pid, worker_opts) do + {:ok, pid} -> + ref = Process.monitor(pid) + total_timeout = config.eval_timeout * 2 + + receive do + {:rlm_result, ^span_id, {:ok, answer}} -> + Process.demonitor(ref, [:flush]) + {:ok, answer, replay_run_id} + + {:rlm_result, ^span_id, {:error, reason}} -> + Process.demonitor(ref, [:flush]) + {:error, reason} + + {:DOWN, ^ref, :process, ^pid, :normal} -> + {:error, "Replay worker exited without result"} + + {:DOWN, ^ref, :process, ^pid, reason} -> + {:error, "Replay worker crashed: #{inspect(reason)}"} + after + total_timeout -> + Process.demonitor(ref, [:flush]) + RLM.terminate_run(run_pid) + {:error, "Replay timed out after #{total_timeout}ms"} + end + + {:error, reason} -> + RLM.terminate_run(run_pid) + {:error, "Failed to start replay worker: #{inspect(reason)}"} + end + + {:error, reason} -> + {:error, "Failed to start replay run: #{inspect(reason)}"} + end + end + end +end diff --git a/lib/rlm/replay/fallback_llm.ex b/lib/rlm/replay/fallback_llm.ex new file mode 100644 index 0000000..71ad642 --- /dev/null +++ b/lib/rlm/replay/fallback_llm.ex @@ -0,0 +1,42 @@ +defmodule RLM.Replay.FallbackLLM do + @moduledoc """ + An LLM module that replays from a tape, falling back to a live LLM + module when the tape is exhausted. + + Used by `RLM.Replay.replay/2` with `fallback: :live`. The fallback + module is stored in the process dictionary alongside the tape entries. + """ + + @behaviour RLM.LLM + + @impl true + def chat(messages, model, config, opts \\ []) do + case pop_entry() do + nil -> + fallback_module = Process.get(:rlm_replay_fallback_module, RLM.LLM) + fallback_module.chat(messages, model, config, opts) + + entry -> + {:ok, entry.response, entry.usage} + end + end + + @doc "Initialize the replay state with tape entries and a fallback module." + @spec load_tape(RLM.Replay.Tape.t(), module()) :: :ok + def load_tape(%RLM.Replay.Tape{entries: entries}, fallback_module) do + Process.put(:rlm_replay_entries, entries) + Process.put(:rlm_replay_fallback_module, fallback_module) + :ok + end + + defp pop_entry do + case Process.get(:rlm_replay_entries, []) do + [] -> + nil + + [entry | rest] -> + Process.put(:rlm_replay_entries, rest) + entry + end + end +end diff --git a/lib/rlm/replay/llm.ex b/lib/rlm/replay/llm.ex new file mode 100644 index 0000000..cdde2af --- /dev/null +++ b/lib/rlm/replay/llm.ex @@ -0,0 +1,47 @@ +defmodule RLM.Replay.LLM do + @moduledoc """ + An LLM module that replays responses from a recorded tape. + Used by `RLM.Replay.replay/2` to re-execute a run deterministically. + + Responses are consumed in order. If the tape runs out, returns an error. + + ## Process dictionary + + This uses the process dictionary because the `RLM.LLM` behaviour's `chat/4` + callback doesn't have a slot for replay state. Since Worker calls `chat/4` + synchronously from its own process, the tape state lives in the Worker's + process dict. This matches the existing pattern where `RLM.Eval` uses + process dict for `worker_pid`, `cwd`, etc. + """ + + @behaviour RLM.LLM + + @impl true + def chat(_messages, _model, _config, _opts \\ []) do + case pop_entry() do + nil -> + {:error, "Replay tape exhausted — no more recorded responses"} + + entry -> + {:ok, entry.response, entry.usage} + end + end + + @doc "Initialize the replay state for the current process." + @spec load_tape(RLM.Replay.Tape.t()) :: :ok + def load_tape(%RLM.Replay.Tape{entries: entries}) do + Process.put(:rlm_replay_entries, entries) + :ok + end + + defp pop_entry do + case Process.get(:rlm_replay_entries, []) do + [] -> + nil + + [entry | rest] -> + Process.put(:rlm_replay_entries, rest) + entry + end + end +end diff --git a/lib/rlm/replay/tape.ex b/lib/rlm/replay/tape.ex new file mode 100644 index 0000000..7e21333 --- /dev/null +++ b/lib/rlm/replay/tape.ex @@ -0,0 +1,65 @@ +defmodule RLM.Replay.Tape do + @moduledoc """ + A replay tape: an ordered sequence of LLM responses from a recorded run. + Built from EventLog events, consumed by the replay LLM module. + """ + + defstruct [:run_id, :context, :query, entries: []] + + @type entry :: %{ + iteration: non_neg_integer(), + span_id: String.t(), + response: String.t(), + usage: map() + } + + @type t :: %__MODULE__{ + run_id: String.t(), + context: String.t() | nil, + query: String.t() | nil, + entries: [entry()] + } + + @doc "Build a tape from EventLog events for a given run." + @spec from_events(String.t()) :: {:ok, t()} | {:error, :no_events | :no_responses} + def from_events(run_id) do + events = get_events(run_id) + + if events == [] do + {:error, :no_events} + else + node_start = Enum.find(events, &(&1.type == :node_start && &1[:depth] == 0)) + + responses = + events + |> Enum.filter(&(&1.type == :llm_response)) + |> Enum.sort_by(& &1.iteration) + |> Enum.map(fn e -> + %{iteration: e.iteration, span_id: e.span_id, response: e.response, usage: e.usage} + end) + + if responses == [] do + {:error, :no_responses} + else + {:ok, + %__MODULE__{ + run_id: run_id, + context: node_start[:original_context], + query: node_start[:original_query], + entries: responses + }} + end + end + end + + # EventLog.get_events/1 raises an exit when no Agent exists for the run_id. + # Catch that and fall back to TraceStore. + defp get_events(run_id) do + case RLM.EventLog.get_events(run_id) do + [] -> RLM.EventLog.get_events_from_store(run_id) + events -> events + end + catch + :exit, _ -> RLM.EventLog.get_events_from_store(run_id) + end +end diff --git a/lib/rlm/telemetry/event_log_handler.ex b/lib/rlm/telemetry/event_log_handler.ex index 62205d3..859e7ed 100644 --- a/lib/rlm/telemetry/event_log_handler.ex +++ b/lib/rlm/telemetry/event_log_handler.ex @@ -13,6 +13,8 @@ defmodule RLM.Telemetry.EventLogHandler do depth: metadata.depth, model: metadata.model, context_bytes: metadata[:context_bytes], + original_context: metadata[:original_context], + original_query: metadata[:original_query], timestamp_us: System.system_time(:microsecond) } @@ -64,6 +66,20 @@ defmodule RLM.Telemetry.EventLogHandler do RLM.TraceStore.put_event(metadata.run_id, event) end + def handle_event([:rlm, :llm, :response, :recorded], _measurements, metadata, _config) do + event = %{ + type: :llm_response, + span_id: metadata.span_id, + iteration: metadata[:iteration], + response: metadata[:response], + usage: metadata[:usage], + timestamp_us: System.system_time(:microsecond) + } + + RLM.EventLog.append(metadata.run_id, event) + RLM.TraceStore.put_event(metadata.run_id, event) + end + def handle_event([:rlm, :subcall, :spawn], _measurements, metadata, _config) do event = %{ type: :subcall_spawn, diff --git a/lib/rlm/telemetry/telemetry.ex b/lib/rlm/telemetry/telemetry.ex index 93987b1..866ee1b 100644 --- a/lib/rlm/telemetry/telemetry.ex +++ b/lib/rlm/telemetry/telemetry.ex @@ -21,6 +21,7 @@ defmodule RLM.Telemetry do [:rlm, :subcall, :result], [:rlm, :direct_query, :start], [:rlm, :direct_query, :stop], + [:rlm, :llm, :response, :recorded], [:rlm, :compaction, :run], [:rlm, :turn, :complete] ] diff --git a/lib/rlm/worker.ex b/lib/rlm/worker.ex index accc9f6..74c7ed2 100644 --- a/lib/rlm/worker.ex +++ b/lib/rlm/worker.ex @@ -50,7 +50,9 @@ defmodule RLM.Worker do :eval_sup, pending_subcalls: %{}, # Maps monitor ref → query_id for direct query crash detection - direct_query_monitors: %{} + direct_query_monitors: %{}, + # Replay: code patches by iteration number (empty map = no patches) + replay_patches: %{} ] # -- Public API -- @@ -83,6 +85,18 @@ defmodule RLM.Worker do run_pid = Keyword.get(opts, :run_pid) eval_sup = Keyword.get(opts, :eval_sup) + replay_tape = Keyword.get(opts, :replay_tape) + replay_patches = Keyword.get(opts, :replay_patches, %{}) + replay_fallback_module = Keyword.get(opts, :replay_fallback_module) + + if replay_tape do + if replay_fallback_module do + RLM.Replay.FallbackLLM.load_tape(replay_tape, replay_fallback_module) + else + RLM.Replay.LLM.load_tape(replay_tape) + end + end + if keep_alive do # Keep-alive mode: start idle, wait for send_message system_msg = RLM.Prompt.build_system_message() @@ -153,12 +167,15 @@ defmodule RLM.Worker do cwd: cwd, pending_from: nil, run_pid: run_pid, - eval_sup: eval_sup + eval_sup: eval_sup, + replay_patches: replay_patches } emit_telemetry([:rlm, :node, :start], %{}, state, %{ context_bytes: context_bytes, - query_preview: String.slice(query, 0, 200) + query_preview: String.slice(query, 0, 200), + original_context: if(depth == 0, do: context), + original_query: if(depth == 0, do: query) }) send(self(), :iterate) @@ -191,6 +208,15 @@ defmodule RLM.Worker do {:ok, response, usage} -> llm_duration = System.monotonic_time(:millisecond) - llm_start + # Record the full LLM response for replay (gated by config flag) + if state.config.enable_replay_recording do + emit_telemetry([:rlm, :llm, :response, :recorded], %{}, state, %{ + iteration: state.iteration, + response: response, + usage: usage + }) + end + # Step 2: Parse structured JSON response case RLM.LLM.extract_structured(response) do {:ok, %{reasoning: reasoning, code: code}} -> @@ -210,6 +236,9 @@ defmodule RLM.Worker do } ) + # Apply replay patches if any + code = Map.get(state.replay_patches, state.iteration, code) + if code != "" do start_async_eval( state, diff --git a/test/rlm/replay_test.exs b/test/rlm/replay_test.exs new file mode 100644 index 0000000..abdc4ab --- /dev/null +++ b/test/rlm/replay_test.exs @@ -0,0 +1,430 @@ +defmodule RLM.ReplayTest do + @moduledoc """ + Tests for the deterministic replay feature. + + Covers: recording LLM responses, building tapes, replaying runs, + and patching iterations during replay. + """ + use ExUnit.Case, async: false + + alias RLM.Test.MockLLM + import RLM.Test.Helpers + + # ── Phase 1: Recording ────────────────────────────────────────────── + + describe "recording LLM responses" do + test "records llm_response events when enable_replay_recording is true" do + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "hello")) + ]) + + config = RLM.Config.load(llm_module: MockLLM, enable_replay_recording: true) + run_id = RLM.Span.generate_run_id() + span_id = RLM.Span.generate_id() + + %{run_pid: run_pid} = start_test_run(run_id: run_id, config: config) + + worker_opts = [ + span_id: span_id, + run_id: run_id, + context: "test context", + query: "say hello", + config: config, + depth: 0, + model: config.model_large, + caller: self() + ] + + {:ok, _pid} = RLM.Run.start_worker(run_pid, worker_opts) + + receive do + {:rlm_result, ^span_id, {:ok, "hello"}} -> :ok + after + 5000 -> flunk("Worker did not complete") + end + + # Give events time to propagate + Process.sleep(100) + + events = RLM.EventLog.get_events(run_id) + llm_responses = Enum.filter(events, &(&1.type == :llm_response)) + + assert length(llm_responses) == 1 + [resp] = llm_responses + assert resp.iteration == 0 + assert is_binary(resp.response) + assert is_map(resp.usage) + end + + test "does NOT record llm_response events when enable_replay_recording is false" do + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "hello")) + ]) + + config = RLM.Config.load(llm_module: MockLLM, enable_replay_recording: false) + run_id = RLM.Span.generate_run_id() + span_id = RLM.Span.generate_id() + + %{run_pid: run_pid} = start_test_run(run_id: run_id, config: config) + + worker_opts = [ + span_id: span_id, + run_id: run_id, + context: "test context", + query: "say hello", + config: config, + depth: 0, + model: config.model_large, + caller: self() + ] + + {:ok, _pid} = RLM.Run.start_worker(run_pid, worker_opts) + + receive do + {:rlm_result, ^span_id, {:ok, "hello"}} -> :ok + after + 5000 -> flunk("Worker did not complete") + end + + Process.sleep(100) + + events = RLM.EventLog.get_events(run_id) + llm_responses = Enum.filter(events, &(&1.type == :llm_response)) + assert llm_responses == [] + end + + test "records multi-iteration LLM responses" do + MockLLM.program_responses([ + MockLLM.mock_response("x = 1 + 2"), + MockLLM.mock_response(~s(final_answer = "result: \#{x}")) + ]) + + config = RLM.Config.load(llm_module: MockLLM, enable_replay_recording: true) + run_id = RLM.Span.generate_run_id() + span_id = RLM.Span.generate_id() + + %{run_pid: run_pid} = start_test_run(run_id: run_id, config: config) + + worker_opts = [ + span_id: span_id, + run_id: run_id, + context: "test", + query: "compute", + config: config, + depth: 0, + model: config.model_large, + caller: self() + ] + + {:ok, _pid} = RLM.Run.start_worker(run_pid, worker_opts) + + receive do + {:rlm_result, ^span_id, {:ok, _}} -> :ok + after + 5000 -> flunk("Worker did not complete") + end + + Process.sleep(100) + + events = RLM.EventLog.get_events(run_id) + llm_responses = Enum.filter(events, &(&1.type == :llm_response)) + + assert length(llm_responses) == 2 + assert Enum.map(llm_responses, & &1.iteration) == [0, 1] + end + + test "records original context and query in node_start event for depth-0 workers" do + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "done")) + ]) + + config = RLM.Config.load(llm_module: MockLLM, enable_replay_recording: true) + run_id = RLM.Span.generate_run_id() + span_id = RLM.Span.generate_id() + + %{run_pid: run_pid} = start_test_run(run_id: run_id, config: config) + + worker_opts = [ + span_id: span_id, + run_id: run_id, + context: "my special context", + query: "my special query", + config: config, + depth: 0, + model: config.model_large, + caller: self() + ] + + {:ok, _pid} = RLM.Run.start_worker(run_pid, worker_opts) + + receive do + {:rlm_result, ^span_id, {:ok, "done"}} -> :ok + after + 5000 -> flunk("Worker did not complete") + end + + Process.sleep(100) + + events = RLM.EventLog.get_events(run_id) + node_start = Enum.find(events, &(&1.type == :node_start)) + + assert node_start.original_context == "my special context" + assert node_start.original_query == "my special query" + end + end + + # ── Phase 2: Tape ─────────────────────────────────────────────────── + + describe "Tape.from_events/1" do + test "builds a tape from recorded events" do + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "hello")) + ]) + + {:ok, "hello", run_id} = + RLM.run("ctx", "say hello", + llm_module: MockLLM, + enable_replay_recording: true + ) + + Process.sleep(100) + + assert {:ok, tape} = RLM.Replay.Tape.from_events(run_id) + assert tape.run_id == run_id + assert length(tape.entries) == 1 + + [entry] = tape.entries + assert entry.iteration == 0 + assert is_binary(entry.response) + end + + test "returns error for nonexistent run" do + assert {:error, :no_events} = RLM.Replay.Tape.from_events("nonexistent_run") + end + + test "returns error when no llm_response events exist" do + # Run without recording enabled — events exist but no llm_response events + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "hello")) + ]) + + {:ok, "hello", run_id} = + RLM.run("ctx", "say hello", + llm_module: MockLLM, + enable_replay_recording: false + ) + + Process.sleep(100) + + assert {:error, :no_responses} = RLM.Replay.Tape.from_events(run_id) + end + end + + # ── Phase 2: Replay LLM ───────────────────────────────────────────── + + describe "Replay.LLM" do + test "returns responses in order from tape" do + entries = [ + %{ + iteration: 0, + span_id: "s1", + response: ~s({"reasoning":"r0","code":"x=1"}), + usage: %{ + prompt_tokens: 10, + completion_tokens: 5, + total_tokens: 15, + cache_creation_input_tokens: nil, + cache_read_input_tokens: nil + } + }, + %{ + iteration: 1, + span_id: "s2", + response: ~s({"reasoning":"r1","code":"final_answer=x"}), + usage: %{ + prompt_tokens: 20, + completion_tokens: 10, + total_tokens: 30, + cache_creation_input_tokens: nil, + cache_read_input_tokens: nil + } + } + ] + + tape = %RLM.Replay.Tape{run_id: "test", entries: entries} + RLM.Replay.LLM.load_tape(tape) + + config = RLM.Config.load() + {:ok, resp1, usage1} = RLM.Replay.LLM.chat([], "model", config) + assert resp1 == ~s({"reasoning":"r0","code":"x=1"}) + assert usage1.prompt_tokens == 10 + + {:ok, resp2, _usage2} = RLM.Replay.LLM.chat([], "model", config) + assert resp2 == ~s({"reasoning":"r1","code":"final_answer=x"}) + end + + test "returns error when tape is exhausted" do + tape = %RLM.Replay.Tape{run_id: "test", entries: []} + RLM.Replay.LLM.load_tape(tape) + + config = RLM.Config.load() + assert {:error, _} = RLM.Replay.LLM.chat([], "model", config) + end + end + + # ── Phase 3: Replay Orchestrator ──────────────────────────────────── + + describe "replay/2" do + test "replays a recorded run and produces the same result" do + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "hello")) + ]) + + {:ok, "hello", run_id} = + RLM.run("test context", "say hello", + llm_module: MockLLM, + enable_replay_recording: true + ) + + Process.sleep(200) + + assert {:ok, "hello", replay_run_id} = RLM.Replay.replay(run_id) + assert replay_run_id != run_id + end + + test "replays a multi-iteration run" do + MockLLM.program_responses([ + MockLLM.mock_response("x = 10 + 5"), + MockLLM.mock_response(~s(final_answer = "sum is \#{x}")) + ]) + + {:ok, "sum is 15", run_id} = + RLM.run("test", "compute", + llm_module: MockLLM, + enable_replay_recording: true + ) + + Process.sleep(200) + + assert {:ok, "sum is 15", _replay_run_id} = RLM.Replay.replay(run_id) + end + + test "replay with patch modifies one iteration's code" do + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "original")) + ]) + + {:ok, "original", run_id} = + RLM.run("ctx", "task", + llm_module: MockLLM, + enable_replay_recording: true + ) + + Process.sleep(200) + + assert {:ok, "patched", _replay_run_id} = + RLM.Replay.replay(run_id, + patch: %{0 => ~s(final_answer = "patched")} + ) + end + + test "replay returns error when no recorded events exist" do + assert {:error, :no_events} = RLM.Replay.replay("nonexistent_run_id") + end + end + + # ── Fallback ───────────────────────────────────────────────────────── + + describe "replay with fallback: :live" do + test "falls back to live LLM when tape is exhausted by a patch" do + # Record a 1-iteration run + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "original")) + ]) + + {:ok, "original", run_id} = + RLM.run("ctx", "task", + llm_module: MockLLM, + enable_replay_recording: true + ) + + Process.sleep(200) + + # Patch iteration 0 to NOT set final_answer → tape exhausts after 1 entry + # The fallback MockLLM provides the response for iteration 1 + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "from_fallback")) + ]) + + assert {:ok, "from_fallback", _} = + RLM.Replay.replay(run_id, + patch: %{0 => "x = 42"}, + fallback: :live, + config: [llm_module: MockLLM] + ) + end + + test "tape entries are consumed before falling back" do + # Record a 2-iteration run + MockLLM.program_responses([ + MockLLM.mock_response("x = 10"), + MockLLM.mock_response(~s(final_answer = "got \#{x}")) + ]) + + {:ok, "got 10", run_id} = + RLM.run("ctx", "task", + llm_module: MockLLM, + enable_replay_recording: true + ) + + Process.sleep(200) + + # Straight replay with fallback: :live — tape has enough entries, + # so fallback is never used. No need to program MockLLM responses. + assert {:ok, "got 10", _} = + RLM.Replay.replay(run_id, + fallback: :live, + config: [llm_module: MockLLM] + ) + end + + test "fallback: :error (default) returns error when tape exhausted" do + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "original")) + ]) + + {:ok, "original", run_id} = + RLM.run("ctx", "task", + llm_module: MockLLM, + enable_replay_recording: true + ) + + Process.sleep(200) + + # Patch removes final_answer, causing iteration 1 to need an LLM response + # With default fallback: :error, this should fail + assert {:error, _reason} = + RLM.Replay.replay(run_id, patch: %{0 => "x = 42"}) + end + end + + # ── Phase 4: Public API ───────────────────────────────────────────── + + describe "RLM.replay/2 public API" do + test "delegates to RLM.Replay.replay/2" do + MockLLM.program_responses([ + MockLLM.mock_response(~s(final_answer = "via_api")) + ]) + + {:ok, "via_api", run_id} = + RLM.run("ctx", "query", + llm_module: MockLLM, + enable_replay_recording: true + ) + + Process.sleep(200) + + assert {:ok, "via_api", _replay_run_id} = RLM.replay(run_id) + end + end +end