diff --git a/package.json b/package.json index 013b96f..ac67fdd 100644 --- a/package.json +++ b/package.json @@ -4,14 +4,14 @@ "type": "module", "private": true, "scripts": { - "build": "bun build --compile --outfile loop src/loop.ts", + "build": "bun build --compile --outfile loop src/cli.ts", "install:global": "bun run build && bun run src/install.ts", "release:patch": "npm version patch && git push --follow-tags", "check": "ultracite check", "fix": "ultracite fix" }, "bin": { - "loop": "./src/loop.ts" + "loop": "./src/cli.ts" }, "devDependencies": { "@biomejs/biome": "^2.4.0", diff --git a/src/loop.ts b/src/cli.ts similarity index 100% rename from src/loop.ts rename to src/cli.ts diff --git a/src/loop/args.ts b/src/loop/args.ts index e35e93f..09548f3 100644 --- a/src/loop/args.ts +++ b/src/loop/args.ts @@ -95,6 +95,14 @@ const applyValueFlag = ( opts.model = trimmed; return; } + if (flag === "session") { + const trimmed = value.trim(); + if (!trimmed) { + throw new Error("Invalid --session value: cannot be empty"); + } + opts.sessionId = trimmed; + return; + } opts.format = parseFormat(value); }; diff --git a/src/loop/claude-sdk-server.ts b/src/loop/claude-sdk-server.ts index 5cccb62..28cfb79 100644 --- a/src/loop/claude-sdk-server.ts +++ b/src/loop/claude-sdk-server.ts @@ -1,4 +1,10 @@ -import { type Server, type ServerWebSocket, serve, spawn } from "bun"; +import { + type Server, + type ServerWebSocket, + serve, + spawn, + spawnSync, +} from "bun"; import { DEFAULT_CLAUDE_MODEL } from "./constants"; import { findFreePort } from "./ports"; import { DETACH_CHILD_PROCESS, killChildProcess } from "./process"; @@ -6,6 +12,7 @@ import type { Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; type Callback = (text: string) => void; +type ServeFn = (...args: Parameters) => ReturnType; type SpawnFn = (...args: Parameters) => ReturnType; type WSRole = "claude" | "frontend"; interface WSData { @@ -40,7 +47,9 @@ interface NdjsonMessage { } interface TurnState { + backgroundTaskSeen: boolean; combined: string; + drainingBackground: boolean; hasStreamed: boolean; onDelta: Callback; onParsed: Callback; @@ -52,8 +61,53 @@ interface TurnState { const CLAUDE_SDK_BASE_PORT = 8765; const CLAUDE_SDK_PORT_RANGE = 100; +const BACKGROUND_TASK_CONTINUATION = + "Background tasks are complete. Continue with the task."; +const DEFAULT_CHILD_POLL_INTERVAL_MS = 2000; const START_TIMEOUT_MS = 60_000; -const WAIT_TIMEOUT_MS = 600_000; +const DEFAULT_WAIT_TIMEOUT_MS = 600_000; + +let childPollIntervalMs = DEFAULT_CHILD_POLL_INTERVAL_MS; +let waitTimeoutMs = DEFAULT_WAIT_TIMEOUT_MS; + +type CountChildProcessesFn = (pid: number) => number; + +const asRecord = (value: unknown): Record => { + if (typeof value === "object" && value !== null) { + return value as Record; + } + return {}; +}; + +const wait = async (ms: number): Promise => { + await new Promise((resolve) => setTimeout(resolve, ms)); +}; + +const countChildProcesses = (pid: number): number => { + if (process.platform === "win32" || !Number.isInteger(pid) || pid <= 0) { + return 0; + } + try { + const proc = spawnSync({ + cmd: ["pgrep", "-g", String(pid)], + stderr: "ignore", + stdout: "pipe", + }); + const output = new TextDecoder().decode(proc.stdout).trim(); + if (!output) { + return 0; + } + return output + .split("\n") + .map((rawPid) => Number.parseInt(rawPid.trim(), 10)) + .filter((childPid) => Number.isInteger(childPid) && childPid > 0) + .filter((childPid) => childPid !== pid).length; + } catch { + return 0; + } +}; + +let countChildProcessesFn: CountChildProcessesFn = countChildProcesses; const drainStream = (stream: ReadableStream): void => { const reader = stream.getReader(); @@ -109,22 +163,51 @@ const isValidNdjson = (text: string): boolean => { }; let spawnFn: SpawnFn = spawn; +let serveFn: ServeFn = serve; export const claudeSdkInternals = { + BACKGROUND_TASK_CONTINUATION, + countChildProcesses, restoreSpawnFn(): void { spawnFn = spawn; }, setSpawnFn(next: SpawnFn): void { spawnFn = next; }, + restoreServeFn(): void { + serveFn = serve; + }, + setServeFn(next: ServeFn): void { + serveFn = next; + }, + restoreCountChildProcessesFn(): void { + countChildProcessesFn = countChildProcesses; + }, + setCountChildProcessesFn(next: CountChildProcessesFn): void { + countChildProcessesFn = next; + }, + restoreChildPollIntervalMs(): void { + childPollIntervalMs = DEFAULT_CHILD_POLL_INTERVAL_MS; + }, + setChildPollIntervalMs(next: number): void { + childPollIntervalMs = next; + }, + restoreWaitTimeoutMs(): void { + waitTimeoutMs = DEFAULT_WAIT_TIMEOUT_MS; + }, + setWaitTimeoutMs(next: number): void { + waitTimeoutMs = next; + }, }; class ClaudeSdkClient { private child: ReturnType | undefined; private closed = false; + private lastSessionId = ""; private lock: Promise = Promise.resolve(); private port = 0; private ready = false; + private resumeId = ""; private server: Server | undefined; private sessionId = ""; private started = false; @@ -143,6 +226,14 @@ class ClaudeSdkClient { return this.child !== undefined; } + getLastSessionId(): string { + return this.lastSessionId; + } + + setResumeId(id: string): void { + this.resumeId = id; + } + async start(): Promise { if (this.started) { return; @@ -163,6 +254,8 @@ class ClaudeSdkClient { }); const url = `ws://localhost:${this.port}`; + const resumeArgs = this.resumeId ? ["--resume", this.resumeId] : []; + this.resumeId = ""; this.child = spawnFn( [ @@ -179,6 +272,7 @@ class ClaudeSdkClient { "--dangerously-skip-permissions", "--sdk-url", url, + ...resumeArgs, ], { detached: DETACH_CHILD_PROCESS, @@ -257,7 +351,7 @@ class ClaudeSdkClient { private createServer(): void { const self = this; - this.server = serve({ + this.server = serveFn({ port: this.port, fetch(req, server) { const path = new URL(req.url).pathname; @@ -350,6 +444,10 @@ class ClaudeSdkClient { case "system": if (msg.subtype === "init") { this.sessionId = msg.session_id || ""; + this.lastSessionId = this.sessionId || this.lastSessionId; + if (this.sessionId) { + console.error(`[loop] claude session: ${this.sessionId}`); + } } return; case "control_response": @@ -424,6 +522,16 @@ class ClaudeSdkClient { return; } const state = this.turn; + if (state.backgroundTaskSeen) { + if (state.drainingBackground) { + return; + } + state.drainingBackground = true; + this.drainAndContinue(state).catch(() => { + // timeout and close handlers reject turn state; ignore drain errors + }); + return; + } this.turn = undefined; state.resolve({ combined: state.combined, @@ -436,6 +544,14 @@ class ClaudeSdkClient { if (!(this.ws && msg.request_id)) { return; } + const input = asRecord(msg.request?.input); + if ( + this.turn && + msg.request?.tool_name === "Task" && + input.run_in_background === true + ) { + this.turn.backgroundTaskSeen = true; + } if (msg.request?.subtype === "can_use_tool") { this.sendJson({ type: "control_response", @@ -451,10 +567,37 @@ class ClaudeSdkClient { } } + private async drainAndContinue(state: TurnState): Promise { + while (this.turn === state) { + const pid = this.child?.pid; + const remaining = + typeof pid === "number" ? countChildProcessesFn(pid) : 0; + if (remaining <= 0) { + if (this.turn !== state) { + return; + } + state.backgroundTaskSeen = false; + state.drainingBackground = false; + this.sendUserMessage(BACKGROUND_TASK_CONTINUATION); + return; + } + await wait(childPollIntervalMs); + } + } + private sendJson(data: Record): void { this.ws?.send(`${JSON.stringify(data)}\n`); } + private sendUserMessage(content: string): void { + this.sendJson({ + type: "user", + message: { role: "user", content }, + parent_tool_use_id: null, + session_id: this.sessionId, + }); + } + private async runTurnExclusive( prompt: string, _opts: Options, @@ -475,10 +618,12 @@ class ClaudeSdkClient { this.turn = undefined; reject(new Error("claude sdk turn timed out")); } - }, WAIT_TIMEOUT_MS); + }, waitTimeoutMs); this.turn = { + backgroundTaskSeen: false, combined: "", + drainingBackground: false, hasStreamed: false, onDelta, onParsed, @@ -494,12 +639,7 @@ class ClaudeSdkClient { }, }; - this.sendJson({ - type: "user", - message: { role: "user", content: prompt }, - parent_tool_use_id: null, - session_id: this.sessionId, - }); + this.sendUserMessage(prompt); }); // Claude SDK session state is process-bound, so restart per turn to force a @@ -566,8 +706,14 @@ const getClient = (): ClaudeSdkClient => { return singleton; }; -export const startClaudeSdk = async (): Promise => { - await getClient().start(); +export const startClaudeSdk = async ( + resumeSessionId?: string +): Promise => { + const client = getClient(); + if (resumeSessionId) { + client.setResumeId(resumeSessionId); + } + await client.start(); }; export const runClaudeTurn = ( @@ -590,6 +736,9 @@ export const interruptClaudeSdk = (signal: ExitSignal): void => { export const hasClaudeSdkProcess = (): boolean => getClient().hasProcess(); +export const getLastClaudeSessionId = (): string => + singleton?.getLastSessionId() ?? ""; + export const closeClaudeSdk = async (): Promise => { if (!singleton) { return; diff --git a/src/loop/codex-app-server.ts b/src/loop/codex-app-server.ts index 8f4937e..2644805 100644 --- a/src/loop/codex-app-server.ts +++ b/src/loop/codex-app-server.ts @@ -232,6 +232,7 @@ class AppServerClient { private child: ReturnType | undefined; private ws: import("./ws-client").WsClient | undefined; private closed = false; + private lastThreadId = ""; private started = false; private ready = false; private requestId = 1; @@ -243,6 +244,10 @@ class AppServerClient { return this.child; } + getLastThreadId(): string { + return this.lastThreadId; + } + hasProcess(): boolean { return this.child !== undefined; } @@ -345,10 +350,11 @@ class AppServerClient { prompt: string, opts: Options, onParsed: Callback, - onRaw: Callback + onRaw: Callback, + resumeThreadId?: string ): Promise { const task = this.lock.then(() => - this.runTurnExclusive(prompt, opts, onParsed, onRaw) + this.runTurnExclusive(prompt, opts, onParsed, onRaw, resumeThreadId) ); this.lock = task.then( () => undefined, @@ -385,7 +391,14 @@ class AppServerClient { this.started = false; } - private async ensureThread(model: string): Promise { + private async ensureThread( + model: string, + resumeThreadId?: string + ): Promise { + if (resumeThreadId) { + this.lastThreadId = resumeThreadId; + return resumeThreadId; + } const response = await this.sendRequest(METHOD_THREAD_START, { model, approvalPolicy: "never", @@ -398,6 +411,7 @@ class AppServerClient { "codex app-server returned thread/start without thread id" ); } + this.lastThreadId = thread.id; return thread.id; } @@ -405,7 +419,8 @@ class AppServerClient { prompt: string, opts: Options, onParsed: Callback, - onRaw: Callback + onRaw: Callback, + resumeThreadId?: string ): Promise { if (!(this.child && this.ready)) { await this.start(); @@ -414,7 +429,7 @@ class AppServerClient { throw new CodexAppServerFallbackError("codex app-server not running"); } - const threadId = await this.ensureThread(opts.model); + const threadId = await this.ensureThread(opts.model, resumeThreadId); const response = await this.sendRequest(METHOD_TURN_START, { threadId, input: buildInput(prompt), @@ -533,10 +548,7 @@ class AppServerClient { ): TurnState | undefined { const turnId = extractTurnId(payload) || extractThreadId(payload); if (turnId) { - const byTurn = this.turns.get(turnId); - if (byTurn) { - return byTurn; - } + return this.turns.get(turnId); } return this.turns.size === 1 ? [...this.turns.values()][0] : undefined; } @@ -703,12 +715,16 @@ class AppServerClient { } if (turnId && this.turns.has(turnId)) { const activeState = this.turns.get(turnId); - if (activeState) { - this.turns.delete(turnId); - activeState.reject( - new Error(parseErrorText(params) || `turn ${turnId} failed`) - ); + if (!activeState) { + return; } + this.turns.delete(turnId); + activeState.reject( + new Error(parseErrorText(params) || `turn ${turnId} failed`) + ); + return; + } + if (turnId) { return; } if (this.turns.size !== 1) { @@ -723,7 +739,14 @@ class AppServerClient { state: TurnState | undefined, params: Record ): void { - if (!state && this.turns.size === 1) { + const turnId = extractTurnId(params) || extractThreadId(params); + if (!state) { + if (turnId) { + return; + } + if (this.turns.size !== 1) { + return; + } const first = [...this.turns.values()][0]; if (!first) { return; @@ -731,9 +754,6 @@ class AppServerClient { this.resolveTurnState(first, params); return; } - if (!state) { - return; - } this.resolveTurnState(state, params); } @@ -888,13 +908,15 @@ export const runCodexTurn = ( prompt: string, opts: Options, // Some callers render directly from raw events and intentionally skip parsed callbacks. - callbacks: RunCodexTurnCallbacks + callbacks: RunCodexTurnCallbacks, + resumeThreadId?: string ): Promise => { return getClient().runTurn( prompt, opts, callbacks.onParsed ?? NOOP_CALLBACK, - callbacks.onRaw + callbacks.onRaw, + resumeThreadId ); }; @@ -904,6 +926,9 @@ export const interruptAppServer = (signal: ExitSignal): void => { export const hasAppServerProcess = (): boolean => getClient().hasProcess(); +export const getLastCodexThreadId = (): string => + singleton?.getLastThreadId() ?? ""; + export const closeAppServer = async (): Promise => { if (!singleton) { return; diff --git a/src/loop/constants.ts b/src/loop/constants.ts index b91850e..2ada431 100644 --- a/src/loop/constants.ts +++ b/src/loop/constants.ts @@ -10,25 +10,26 @@ export const HELP = ` loop - v${LOOP_VERSION} - meta agent loop runner Usage: - loop Open live panel for running claude/codex instances + loop Open live panel for running claude/codex instances loop [options] [prompt] - loop update Check for updates and apply if available - loop upgrade Alias for update + loop update Check for updates and apply if available + loop upgrade Alias for update Options: - -a, --agent Agent CLI to run (default: codex) - -p, --prompt Prompt text or path to a .md prompt file - -m, --max-iterations Max loops (default: infinite) - -d, --done Done signal (default: DONE) - --proof Proof requirements for task completion - --codex-model Override codex model (default: ${DEFAULT_CODEX_MODEL}) - --format Log format (default: pretty) - --review [claude|codex|claudex] Review on done (default: claudex) + -a, --agent Agent CLI to run (default: codex) + -p, --prompt Prompt text or path to a .md prompt file + -m, --max-iterations Max loops (default: infinite) + -d, --done Done signal (default: DONE) + --proof Proof requirements for task completion + --codex-model Override codex model (default: ${DEFAULT_CODEX_MODEL}) + --format Log format (default: pretty) + --review [claude|codex|claudex] Review on done (default: claudex) --review-plan [other|claude|codex|none] Review PLAN.md after plain-text planning (default: other) - --tmux Run in a detached tmux session (name: repo-loop-X) - --worktree Create and run in a fresh git worktree (name: repo-loop-X) - -v, --version Show loop version - -h, --help Show this help + --session Resume from a previous session/thread ID + --tmux Run in a detached tmux session (name: repo-loop-X) + --worktree Create and run in a fresh git worktree (name: repo-loop-X) + -v, --version Show loop version + -h, --help Show this help Auto-update: Updates are checked automatically on startup and applied on the next run. @@ -51,4 +52,5 @@ export const VALUE_FLAGS: Record = { "--proof": "proof", "--codex-model": "codexModel", "--format": "format", + "--session": "session", }; diff --git a/src/loop/iteration.ts b/src/loop/iteration.ts new file mode 100644 index 0000000..599e204 --- /dev/null +++ b/src/loop/iteration.ts @@ -0,0 +1,82 @@ +import { getLastClaudeSessionId } from "./claude-sdk-server"; +import { getLastCodexThreadId } from "./codex-app-server"; +import { runAgent } from "./runner"; +import type { Agent, Options, ReviewResult, RunResult } from "./types"; + +const DEFAULT_ITERATION_COOLDOWN_MS = 30_000; +const parseIterationCooldownMs = (): number => { + const raw = process.env.LOOP_COOLDOWN_MS; + if (raw === undefined) { + return DEFAULT_ITERATION_COOLDOWN_MS; + } + const parsed = Number.parseInt(raw, 10); + if (!Number.isInteger(parsed) || parsed < 0) { + return DEFAULT_ITERATION_COOLDOWN_MS; + } + return parsed; +}; +const ITERATION_COOLDOWN_MS = parseIterationCooldownMs(); +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +export const iterationCooldown = (i: number): Promise => + i > 1 ? sleep(ITERATION_COOLDOWN_MS) : Promise.resolve(); + +const lastSession = (agent: Agent): string => + agent === "claude" ? getLastClaudeSessionId() : getLastCodexThreadId(); + +export const doneText = (s: string): string => `done signal "${s}"`; + +export const logSessionHint = (agent: Agent): void => { + const sid = lastSession(agent); + if (sid) { + console.error(`[loop] to resume: loop --session ${sid}`); + } +}; + +export const logIterationHeader = ( + i: number, + maxIterations: number, + agent: Agent +): void => { + const tag = Number.isFinite(maxIterations) ? `/${maxIterations}` : ""; + const sid = lastSession(agent); + const sidTag = sid ? ` (session: ${sid})` : ""; + console.log(`\n[loop] iteration ${i}${tag}${sidTag}`); +}; + +export const tryRunAgent = async ( + agent: Agent, + prompt: string, + opts: Options, + sessionId?: string +): Promise => { + try { + return await runAgent(agent, prompt, opts, sessionId); + } catch (error) { + const msg = error instanceof Error ? error.message : String(error); + console.error(`\n[loop] ${agent} error: ${msg}`); + logSessionHint(agent); + return undefined; + } +}; + +export const formatFollowUp = ( + review: ReviewResult +): { notes: string; log: string } => { + if (review.failureCount > 1) { + const header = review.consensusFail + ? "Both reviewers requested changes. Decide for each comment whether to address it now. If you skip one, explain why briefly. If both reviews found the same issue, it might be worth addressing." + : "Multiple reviewers requested changes. Decide for each comment whether to address it now. If you skip one, explain why briefly."; + return { + notes: review.notes ? `${header}\n\n${review.notes}` : "", + log: review.consensusFail + ? "\n[loop] both reviewers requested changes. deciding what to address." + : "\n[loop] multiple reviewers requested changes. deciding what to address.", + }; + } + + return { + notes: review.notes, + log: "\n[loop] one reviewer requested changes. continuing loop.", + }; +}; diff --git a/src/loop/main.ts b/src/loop/main.ts index bb671bf..9aaf689 100644 --- a/src/loop/main.ts +++ b/src/loop/main.ts @@ -1,56 +1,47 @@ import { createInterface } from "node:readline/promises"; +import { + doneText, + formatFollowUp, + iterationCooldown, + logIterationHeader, + logSessionHint, + tryRunAgent, +} from "./iteration"; import { runDraftPrStep } from "./pr"; import { buildWorkPrompt } from "./prompts"; import { resolveReviewers, runReview } from "./review"; -import { runAgent } from "./runner"; -import type { Options, ReviewResult } from "./types"; +import type { Options } from "./types"; import { hasSignal } from "./utils"; -const doneText = (s: string) => `done signal "${s}"`; - -const formatFollowUp = (review: ReviewResult) => { - if (review.failureCount > 1) { - const header = review.consensusFail - ? "Both reviewers requested changes. Decide for each comment whether to address it now. If you skip one, explain why briefly. If both reviews found the same issue, it might be worth addressing." - : "Multiple reviewers requested changes. Decide for each comment whether to address it now. If you skip one, explain why briefly."; - return { - notes: review.notes ? `${header}\n\n${review.notes}` : "", - log: review.consensusFail - ? "\n[loop] both reviewers requested changes. deciding what to address." - : "\n[loop] multiple reviewers requested changes. deciding what to address.", - }; - } - - return { - notes: review.notes, - log: "\n[loop] one reviewer requested changes. continuing loop.", - }; -}; - const runIterations = async ( task: string, opts: Options, reviewers: string[] ) => { let reviewNotes = ""; + let sessionId = opts.sessionId; const shouldReview = reviewers.length > 0; const { doneSignal, maxIterations } = opts; console.log(`\n[loop] PLAN.md:\n\n${task}`); for (let i = 1; i <= maxIterations; i++) { - const tag = Number.isFinite(maxIterations) ? `/${maxIterations}` : ""; - console.log(`\n[loop] iteration ${i}${tag}`); + await iterationCooldown(i); + logIterationHeader(i, maxIterations, opts.agent); const prompt = buildWorkPrompt(task, doneSignal, opts.proof, reviewNotes); reviewNotes = ""; - const result = await runAgent(opts.agent, prompt, opts); - const output = `${result.parsed}\n${result.combined}`; - const done = hasSignal(output, doneSignal); + const result = await tryRunAgent(opts.agent, prompt, opts, sessionId); + sessionId = undefined; + if (!result) { + continue; + } if (result.exitCode !== 0) { - const hint = done ? ` (${doneText(doneSignal)} seen)` : ""; - throw new Error( - `[loop] ${opts.agent} exited with code ${result.exitCode}${hint}` + console.error( + `\n[loop] ${opts.agent} exited with code ${result.exitCode}` ); + logSessionHint(opts.agent); + continue; } - if (!done) { + const output = `${result.parsed}\n${result.combined}`; + if (!hasSignal(output, doneSignal)) { continue; } if (!shouldReview) { diff --git a/src/loop/review.ts b/src/loop/review.ts index 5af178c..c7ba64f 100644 --- a/src/loop/review.ts +++ b/src/loop/review.ts @@ -40,8 +40,8 @@ interface ReviewSignalSummary { lines: string[]; } -const cleanOutput = (result: RunResult): string => - `${result.parsed}\n${result.combined}`.replace(/\r/g, "").trimEnd(); +const cleanOutputText = (text: string): string => + text.replace(/\r/g, "").trimEnd(); const parseSignal = (line: string): ReviewSignal | undefined => { const trimmed = line.trim(); @@ -56,6 +56,27 @@ const parseSignal = (line: string): ReviewSignal | undefined => { const splitOutputLines = (output: string): string[] => output.split(NEWLINE_RE); +const hasExplicitSignal = (output: string): boolean => + splitOutputLines(output).some((line) => parseSignal(line) !== undefined); + +const cleanOutput = (result: RunResult): string => { + const parsed = cleanOutputText(result.parsed); + const combined = cleanOutputText(result.combined); + if (!parsed) { + return combined; + } + if (!combined) { + return parsed; + } + if (hasExplicitSignal(parsed)) { + return parsed; + } + if (hasExplicitSignal(combined)) { + return combined; + } + return parsed; +}; + const getFinalNonEmptyLine = ( lines: string[] ): { finalLine: string | undefined; finalLineIndex: number | undefined } => { diff --git a/src/loop/runner.ts b/src/loop/runner.ts index fcc15cb..9a27e66 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -29,7 +29,8 @@ interface SpawnConfig { type LegacyAgentRunner = ( agent: Agent, prompt: string, - opts: Options + opts: Options, + sessionId?: string ) => Promise; interface RunnerState { runLegacyAgent: LegacyAgentRunner; @@ -47,7 +48,8 @@ let activeClaudeSdkRuns = 0; let watchingSignals = false; let fallbackWarned = false; const runnerState: RunnerState = { - runLegacyAgent: (agent, prompt, opts) => runLegacyAgent(agent, prompt, opts), + runLegacyAgent: (agent, prompt, opts, sessionId) => + runLegacyAgent(agent, prompt, opts, sessionId), useAppServer: () => useAppServer(), }; @@ -105,22 +107,24 @@ const syncSignalHandlers = (): void => { export const buildCommand = ( agent: Agent, prompt: string, - model: string + model: string, + sessionId?: string ): SpawnConfig => { if (agent === "claude") { - return { - args: [ - "-p", - prompt, - "--dangerously-skip-permissions", - "--output-format", - "stream-json", - "--verbose", - "--model", - DEFAULT_CLAUDE_MODEL, - ], - cmd: "claude", - }; + const args = [ + "-p", + prompt, + "--dangerously-skip-permissions", + "--output-format", + "stream-json", + "--verbose", + "--model", + DEFAULT_CLAUDE_MODEL, + ]; + if (sessionId) { + args.push("--resume", sessionId); + } + return { args, cmd: "claude" }; } return { @@ -243,7 +247,8 @@ const appendParsedLine = ( const runCodexAgent = async ( prompt: string, - opts: Options + opts: Options, + sessionId?: string ): Promise => { if (!runnerState.useAppServer()) { return runnerState.runLegacyAgent("codex", prompt, opts); @@ -270,9 +275,12 @@ const runCodexAgent = async ( ); } - const result = await runCodexTurn(prompt, opts, { - onRaw: renderer.onRawLine, - }); + const result = await runCodexTurn( + prompt, + opts, + { onRaw: renderer.onRawLine }, + sessionId + ); const finalParsed = result.parsed || renderer.getParsed(); if ( opts.format === "pretty" && @@ -305,9 +313,10 @@ const runCodexAgent = async ( const runLegacyAgent = async ( agent: Agent, prompt: string, - opts: Options + opts: Options, + sessionId?: string ): Promise => { - const { args, cmd } = buildCommand(agent, prompt, opts.model); + const { args, cmd } = buildCommand(agent, prompt, opts.model, sessionId); const proc = spawn([cmd, ...args], { detached: DETACH_CHILD_PROCESS, env: process.env, @@ -382,8 +391,9 @@ const runLegacyAgent = async ( const defaultRunLegacyAgent: LegacyAgentRunner = ( agent: Agent, prompt: string, - opts: Options -): Promise => runLegacyAgent(agent, prompt, opts); + opts: Options, + sessionId?: string +): Promise => runLegacyAgent(agent, prompt, opts, sessionId); export const runnerInternals = { reset(): void { @@ -400,7 +410,8 @@ export const runnerInternals = { const runClaudeAgent = async ( prompt: string, - opts: Options + opts: Options, + sessionId?: string ): Promise => { let parsed = ""; let state = { parsed: "", prettyCount: 0, lastMessage: "" }; @@ -422,7 +433,7 @@ const runClaudeAgent = async ( activeClaudeSdkRuns += 1; syncSignalHandlers(); try { - await startClaudeSdk(); + await startClaudeSdk(sessionId); const result = await runClaudeTurn(prompt, opts, { onDelta, onParsed, @@ -438,13 +449,14 @@ const runClaudeAgent = async ( export const runAgent = ( agent: Agent, prompt: string, - opts: Options + opts: Options, + sessionId?: string ): Promise => { if (agent === "codex") { - return runCodexAgent(prompt, opts); + return runCodexAgent(prompt, opts, sessionId); } if (agent === "claude") { - return runClaudeAgent(prompt, opts); + return runClaudeAgent(prompt, opts, sessionId); } - return runLegacyAgent(agent, prompt, opts); + return runLegacyAgent(agent, prompt, opts, sessionId); }; diff --git a/src/loop/types.ts b/src/loop/types.ts index 08ba9c2..2bacbee 100644 --- a/src/loop/types.ts +++ b/src/loop/types.ts @@ -9,7 +9,8 @@ export type ValueFlag = | "done" | "proof" | "codexModel" - | "format"; + | "format" + | "session"; export interface Options { agent: Agent; @@ -21,6 +22,7 @@ export interface Options { proof: string; review?: ReviewMode; reviewPlan?: PlanReviewMode; + sessionId?: string; tmux?: boolean; worktree?: boolean; } diff --git a/tests/loop.test.ts b/tests/loop.test.ts index 33f3339..8266067 100644 --- a/tests/loop.test.ts +++ b/tests/loop.test.ts @@ -74,7 +74,7 @@ const loadRunCli = async ( }, })); - const { runCli } = await import(`../src/loop?test=${Date.now()}`); + const { runCli } = await import(`../src/cli?test=${Date.now()}`); return { applyStagedMock, handleManualMock, diff --git a/tests/loop/claude-sdk-server.test.ts b/tests/loop/claude-sdk-server.test.ts new file mode 100644 index 0000000..fcbab7c --- /dev/null +++ b/tests/loop/claude-sdk-server.test.ts @@ -0,0 +1,312 @@ +import { afterEach, expect, test } from "bun:test"; +import type { serve, spawn } from "bun"; +import { + claudeSdkInternals, + closeClaudeSdk, + runClaudeTurn, +} from "../../src/loop/claude-sdk-server"; +import type { Options } from "../../src/loop/types"; + +type JsonRecord = Record; +type SendFrame = (frame: JsonRecord) => void; +type UserMessageHandler = (args: { + content: string; + index: number; + send: SendFrame; +}) => void; +interface WebsocketHandlers { + close: (ws: FakeSocket, code: number, reason: string) => void; + message: (ws: FakeSocket, data: string | ArrayBuffer) => void; + open: (ws: FakeSocket) => void; +} + +interface FakeSocket { + close: () => void; + data: { role: "claude" | "frontend" }; + send: (data: string) => void; +} + +const makeOptions = (): Options => ({ + agent: "claude", + doneSignal: "", + format: "raw", + maxIterations: 1, + model: "test-model", + proof: "proof", +}); + +const asRecord = (value: unknown): JsonRecord => { + if (typeof value === "object" && value !== null) { + return value as JsonRecord; + } + return {}; +}; + +const installHarness = (onUserMessage: UserMessageHandler): string[] => { + const userMessages: string[] = []; + let handlers: WebsocketHandlers | undefined; + let fakePid = 30_000; + + const serveMock = ((options: unknown): unknown => { + handlers = asRecord(asRecord(options).websocket) as WebsocketHandlers; + return { + stop: () => { + handlers = undefined; + }, + }; + }) as unknown as ( + ...args: Parameters + ) => ReturnType; + + const spawnMock = ((_command: unknown, _options: unknown): unknown => { + if (!handlers) { + throw new Error("expected websocket handlers before spawn"); + } + + const currentHandlers = handlers; + let stdoutController: + | ReadableStreamDefaultController + | undefined; + let stderrController: + | ReadableStreamDefaultController + | undefined; + const stdout = new ReadableStream({ + start(controller) { + stdoutController = controller; + }, + }); + const stderr = new ReadableStream({ + start(controller) { + stderrController = controller; + }, + }); + + let exitedResolve = (_code: number): void => undefined; + const exited = new Promise((resolve) => { + exitedResolve = resolve; + }); + + let socketClosed = false; + let childClosed = false; + + const socket: FakeSocket = { + data: { role: "claude" }, + close: () => { + if (socketClosed) { + return; + } + socketClosed = true; + currentHandlers.close(socket, 1000, "closed"); + }, + send: (raw) => { + for (const line of raw.split("\n")) { + if (!line.trim()) { + continue; + } + const frame = asRecord(JSON.parse(line)); + const send: SendFrame = (out) => { + currentHandlers.message(socket, `${JSON.stringify(out)}\n`); + }; + if ( + frame.type === "control_request" && + asRecord(frame.request).subtype === "initialize" + ) { + send({ + type: "control_response", + response: { request_id: frame.request_id, subtype: "success" }, + }); + send({ type: "system", subtype: "init", session_id: "session-1" }); + continue; + } + if (frame.type === "user") { + const content = String(asRecord(frame.message).content ?? ""); + userMessages.push(content); + onUserMessage({ + content, + index: userMessages.length - 1, + send, + }); + } + } + }, + }; + + queueMicrotask(() => { + currentHandlers.open(socket); + }); + + const closeChild = (): void => { + if (childClosed) { + return; + } + childClosed = true; + socket.close(); + try { + stdoutController?.close(); + } catch { + // ignore close errors in tests + } + try { + stderrController?.close(); + } catch { + // ignore close errors in tests + } + exitedResolve(0); + }; + + return { + exited, + kill: (_signal?: string) => { + closeChild(); + return true; + }, + pid: fakePid++, + stderr, + stdout, + }; + }) as unknown as ( + ...args: Parameters + ) => ReturnType; + + claudeSdkInternals.setServeFn(serveMock); + claudeSdkInternals.setSpawnFn(spawnMock); + return userMessages; +}; + +afterEach(async () => { + await closeClaudeSdk(); + claudeSdkInternals.restoreSpawnFn(); + claudeSdkInternals.restoreServeFn(); + claudeSdkInternals.restoreCountChildProcessesFn(); + claudeSdkInternals.restoreChildPollIntervalMs(); + claudeSdkInternals.restoreWaitTimeoutMs(); +}); + +test("runClaudeTurn resolves immediately when no background task is detected", async () => { + const userMessages = installHarness(({ index, send }) => { + if (index !== 0) { + return; + } + send({ + type: "assistant", + message: { content: [{ type: "text", text: "done" }] }, + }); + send({ type: "result", is_error: false }); + }); + + const result = await runClaudeTurn("ship it", makeOptions(), { + onDelta: () => undefined, + onParsed: () => undefined, + onRaw: () => undefined, + }); + + expect(userMessages).toEqual(["ship it"]); + expect(result.exitCode).toBe(0); + expect(result.parsed).toBe("done"); +}); + +test("runClaudeTurn drains background Task workers then sends continuation", async () => { + let pollCount = 0; + claudeSdkInternals.setChildPollIntervalMs(1); + claudeSdkInternals.setCountChildProcessesFn(() => { + pollCount += 1; + return pollCount < 3 ? 1 : 0; + }); + + const userMessages = installHarness(({ index, send }) => { + if (index === 0) { + send({ + type: "control_request", + request_id: "tool-1", + request: { + subtype: "can_use_tool", + tool_name: "Task", + input: { run_in_background: true }, + }, + }); + send({ type: "result", is_error: false }); + return; + } + if (index === 1) { + send({ + type: "assistant", + message: { content: [{ type: "text", text: "final answer" }] }, + }); + send({ type: "result", is_error: false }); + } + }); + + const result = await runClaudeTurn("do work", makeOptions(), { + onDelta: () => undefined, + onParsed: () => undefined, + onRaw: () => undefined, + }); + + expect(pollCount).toBeGreaterThanOrEqual(3); + expect(userMessages[1]).toBe(claudeSdkInternals.BACKGROUND_TASK_CONTINUATION); + expect(result.parsed).toBe("final answer"); +}); + +test("non-Task tools with run_in_background do not trigger drain mode", async () => { + let pollCount = 0; + claudeSdkInternals.setChildPollIntervalMs(1); + claudeSdkInternals.setCountChildProcessesFn(() => { + pollCount += 1; + return 1; + }); + + const userMessages = installHarness(({ index, send }) => { + if (index !== 0) { + return; + } + send({ + type: "control_request", + request_id: "tool-2", + request: { + subtype: "can_use_tool", + tool_name: "Bash", + input: { run_in_background: true }, + }, + }); + send({ type: "result", is_error: false }); + }); + + await runClaudeTurn("do work", makeOptions(), { + onDelta: () => undefined, + onParsed: () => undefined, + onRaw: () => undefined, + }); + + expect(userMessages).toEqual(["do work"]); + expect(pollCount).toBe(0); +}); + +test("timeout rejects even while background workers are still present", async () => { + claudeSdkInternals.setWaitTimeoutMs(25); + claudeSdkInternals.setChildPollIntervalMs(1); + claudeSdkInternals.setCountChildProcessesFn(() => 1); + + installHarness(({ index, send }) => { + if (index !== 0) { + return; + } + send({ + type: "control_request", + request_id: "tool-3", + request: { + subtype: "can_use_tool", + tool_name: "Task", + input: { run_in_background: true }, + }, + }); + send({ type: "result", is_error: false }); + }); + + await expect( + runClaudeTurn("do work", makeOptions(), { + onDelta: () => undefined, + onParsed: () => undefined, + onRaw: () => undefined, + }) + ).rejects.toThrow("claude sdk turn timed out"); +}); diff --git a/tests/loop/codex-app-server.test.ts b/tests/loop/codex-app-server.test.ts index 700a6cb..4e2edf9 100644 --- a/tests/loop/codex-app-server.test.ts +++ b/tests/loop/codex-app-server.test.ts @@ -381,6 +381,66 @@ test("runCodexTurn parses successful deltas and completion", async () => { expect(rawLines.join(" ")).toContain("turn/completed"); }); +test("runCodexTurn ignores foreign subagent turn notifications", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + setTimeout(() => { + write({ + method: "error", + params: { + turnId: "sub-turn-1", + error: { message: "subagent failed" }, + }, + }); + write({ + method: "turn/completed", + params: { + turnId: "sub-turn-1", + turn: { + id: "sub-turn-1", + status: "failed", + error: { message: "subagent turn failed" }, + }, + }, + }); + write({ + method: "item/agentMessage/delta", + params: { + turnId: "turn-1", + delta: "parent turn finished", + }, + }); + write({ + method: "turn/completed", + params: { + turnId: "turn-1", + turn: { id: "turn-1", status: "completed" }, + }, + }); + }, 0); + } + }; + + const result = await appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }); + + expect(result.exitCode).toBe(0); + expect(result.parsed).toContain("parent turn finished"); + expect(result.parsed).not.toContain("subagent failed"); +}); + test("runCodexTurn maps failed turns to non-zero exit", async () => { const appServer = await getModule(); currentHandler = (request, write) => { diff --git a/tests/loop/main.test.ts b/tests/loop/main.test.ts index a8aafc3..64d5fc1 100644 --- a/tests/loop/main.test.ts +++ b/tests/loop/main.test.ts @@ -1,6 +1,8 @@ import { afterEach, expect, mock, test } from "bun:test"; import type { Options, ReviewResult, RunResult } from "../../src/loop/types"; +process.env.LOOP_COOLDOWN_MS = "0"; + const makeOptions = (overrides: Partial = {}): Options => ({ agent: "codex", doneSignal: "", @@ -93,15 +95,15 @@ test("runLoop stops immediately on done signal when review is disabled", async ( expect(runDraftPrStep).not.toHaveBeenCalled(); }); -test("runLoop throws when agent exits non-zero even with done signal (no review)", async () => { - const { runLoop } = await loadRunLoop({ +test("runLoop continues on non-zero exit code instead of throwing", async () => { + const { runLoop, runAgent } = await loadRunLoop({ resolveReviewers: () => [], runAgent: async () => makeRunResult("", "", 1), }); - await expect( - runLoop("Ship feature", makeOptions({ review: undefined })) - ).rejects.toThrow("exited with code 1"); + await runLoop("Ship feature", makeOptions({ review: undefined })); + + expect(runAgent).toHaveBeenCalledTimes(2); }); test("runLoop creates draft PR when done signal is reviewed and approved", async () => { @@ -124,15 +126,15 @@ test("runLoop creates draft PR when done signal is reviewed and approved", async expect(runDraftPrStep).toHaveBeenNthCalledWith(1, "Ship feature", opts); }); -test("runLoop throws when agent exits non-zero even with done signal (with review)", async () => { - const { runLoop, runReview, runDraftPrStep } = await loadRunLoop({ +test("runLoop skips review when agent exits non-zero even with done signal", async () => { + const { runLoop, runAgent, runReview, runDraftPrStep } = await loadRunLoop({ resolveReviewers: () => ["codex", "claude"], runAgent: async () => makeRunResult("", "", 1), }); - await expect(runLoop("Ship feature", makeOptions())).rejects.toThrow( - "exited with code 1" - ); + await runLoop("Ship feature", makeOptions()); + + expect(runAgent).toHaveBeenCalledTimes(2); expect(runReview).not.toHaveBeenCalled(); expect(runDraftPrStep).not.toHaveBeenCalled(); }); diff --git a/tests/loop/review-run.test.ts b/tests/loop/review-run.test.ts index 4f1c638..9563960 100644 --- a/tests/loop/review-run.test.ts +++ b/tests/loop/review-run.test.ts @@ -42,6 +42,24 @@ test("runReview approves when all reviewers pass", async () => { expect(runAgentMock).toHaveBeenCalledTimes(2); }); +test("runReview ignores transport noise in combined when parsed has final pass signal", async () => { + const { runReview } = makeRunReview(async () => ({ + combined: + '{"method":"item/agentMessage/delta","params":{"delta":"thinking"}}\n{"method":"turn/completed","params":{"status":"completed"}}', + exitCode: 0, + parsed: `Looks good.\n${REVIEW_PASS}`, + })); + + const result = await runReview(["codex"], "ship task", makeOptions()); + + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); +}); + test("runReview treats mixed PASS and FAIL as failure", async () => { const { runReview } = makeRunReview(async () => ({ combined: "", diff --git a/tests/loop/tmux.test.ts b/tests/loop/tmux.test.ts index b01cc7d..e9ec70d 100644 --- a/tests/loop/tmux.test.ts +++ b/tests/loop/tmux.test.ts @@ -35,7 +35,7 @@ test("runInTmux starts detached session and strips --tmux", () => { const attaches: string[] = []; const logs: string[] = []; const command = - "'env' 'LOOP_RUN_BASE=repo' 'LOOP_RUN_ID=1' 'bun' '/repo/src/loop.ts' '--proof' 'verify' 'fix bug'"; + "'env' 'LOOP_RUN_BASE=repo' 'LOOP_RUN_ID=1' 'bun' '/repo/src/cli.ts' '--proof' 'verify' 'fix bug'"; const delegated = runInTmux(["--tmux", "--proof", "verify", "fix bug"], { attach: (session: string) => { @@ -45,7 +45,7 @@ test("runInTmux starts detached session and strips --tmux", () => { env: {}, findBinary: () => true, isInteractive: () => true, - launchArgv: ["bun", "/repo/src/loop.ts"], + launchArgv: ["bun", "/repo/src/cli.ts"], log: (line: string) => { logs.push(line); }, @@ -168,10 +168,10 @@ test("tmux internals strip --tmux from forwarded args", () => { test("tmux internals build launch argv from exec path", () => { expect( tmuxInternals.buildLaunchArgv( - ["/usr/local/bin/bun", "src/loop.ts", "--tmux", "--proof", "verify"], + ["/usr/local/bin/bun", "src/cli.ts", "--tmux", "--proof", "verify"], "/usr/local/bin/bun" ) - ).toEqual(["/usr/local/bin/bun", `${process.cwd()}/src/loop.ts`]); + ).toEqual(["/usr/local/bin/bun", `${process.cwd()}/src/cli.ts`]); }); test("tmux internals build launch argv for bun-compiled binary", () => {