diff --git a/biome.jsonc b/biome.jsonc index a39804b..66733b3 100644 --- a/biome.jsonc +++ b/biome.jsonc @@ -1,4 +1,19 @@ { "$schema": "./node_modules/@biomejs/biome/configuration_schema.json", - "extends": ["ultracite/biome/core"] + "extends": ["ultracite/biome/core"], + "overrides": [ + { + "includes": ["src/loop/ws-client.ts"], + "linter": { + "rules": { + "suspicious": { + "noBitwiseOperators": "off" + }, + "complexity": { + "noExcessiveCognitiveComplexity": "off" + } + } + } + } + ] } diff --git a/src/loop.ts b/src/loop.ts index c7d463b..a9ec5e9 100644 --- a/src/loop.ts +++ b/src/loop.ts @@ -1,4 +1,5 @@ #!/usr/bin/env bun +import { closeClaudeSdk } from "./loop/claude-sdk-server"; import { closeAppServer } from "./loop/codex-app-server"; import { cliDeps } from "./loop/deps"; import { updateDeps } from "./loop/update-deps"; @@ -28,7 +29,7 @@ export const runCli = async (argv: string[]): Promise => { const task = await cliDeps.resolveTask(opts); await cliDeps.runLoop(task, opts); } finally { - await closeAppServer(); + await Promise.all([closeAppServer(), closeClaudeSdk()]); } }; diff --git a/src/loop/claude-sdk-server.ts b/src/loop/claude-sdk-server.ts new file mode 100644 index 0000000..8d241ed --- /dev/null +++ b/src/loop/claude-sdk-server.ts @@ -0,0 +1,590 @@ +import { type Server, type ServerWebSocket, serve, spawn } from "bun"; +import { DEFAULT_CLAUDE_MODEL } from "./constants"; +import { findFreePort } from "./ports"; +import type { Options, RunResult } from "./types"; + +type ExitSignal = "SIGINT" | "SIGTERM"; +type Callback = (text: string) => void; +type SpawnFn = (...args: Parameters) => ReturnType; +type WSRole = "claude" | "frontend"; +interface WSData { + role: WSRole; +} + +interface ContentBlock { + text?: string; + type: string; +} + +interface StreamEvent { + delta?: { text?: string; type?: string }; + type?: string; +} + +interface NdjsonMessage { + event?: StreamEvent; + is_error?: boolean; + message?: { content?: ContentBlock[]; role?: string }; + request?: { + input?: Record; + subtype?: string; + tool_name?: string; + }; + request_id?: string; + response?: { request_id?: string; subtype?: string }; + result?: string; + session_id?: string; + subtype?: string; + type: string; +} + +interface TurnState { + combined: string; + hasStreamed: boolean; + onDelta: Callback; + onParsed: Callback; + onRaw: Callback; + parsed: string; + reject: (error: Error) => void; + resolve: (result: RunResult) => void; +} + +const CLAUDE_SDK_BASE_PORT = 8765; +const CLAUDE_SDK_PORT_RANGE = 100; +const START_TIMEOUT_MS = 60_000; +const WAIT_TIMEOUT_MS = 600_000; + +const drainStream = (stream: ReadableStream): void => { + const reader = stream.getReader(); + const pump = (): void => { + reader + .read() + .then(({ done }) => { + if (!done) { + pump(); + } + }) + .catch(() => { + // ignore read errors after process exit + }); + }; + pump(); +}; + +const pipeToStderr = (stream: ReadableStream): void => { + const reader = stream.getReader(); + const pump = (): void => { + reader + .read() + .then(({ done, value }) => { + if (done) { + return; + } + if (value) { + process.stderr.write(value); + } + pump(); + }) + .catch(() => { + // ignore read errors after process exit + }); + }; + pump(); +}; + +let spawnFn: SpawnFn = spawn; + +export const claudeSdkInternals = { + restoreSpawnFn(): void { + spawnFn = spawn; + }, + setSpawnFn(next: SpawnFn): void { + spawnFn = next; + }, +}; + +class ClaudeSdkClient { + private child: ReturnType | undefined; + private closed = false; + private lock: Promise = Promise.resolve(); + private port = 0; + private ready = false; + private server: Server | undefined; + private sessionId = ""; + private started = false; + private turn: TurnState | undefined; + private initRequestId = ""; + private waitingForConnection: (() => void) | undefined; + private waitingForInitialize: (() => void) | undefined; + private ws: ServerWebSocket | undefined; + private readonly frontends = new Set>(); + + get process(): ReturnType | undefined { + return this.child; + } + + hasProcess(): boolean { + return this.child !== undefined; + } + + async start(): Promise { + if (this.started) { + return; + } + this.started = true; + try { + this.port = await findFreePort( + CLAUDE_SDK_BASE_PORT, + CLAUDE_SDK_PORT_RANGE + ); + this.createServer(); + + const connected = new Promise((r) => { + this.waitingForConnection = r; + }); + const initialized = new Promise((r) => { + this.waitingForInitialize = r; + }); + + const url = `ws://localhost:${this.port}`; + + this.child = spawnFn( + [ + "claude", + "-p", + "placeholder", + "--output-format", + "stream-json", + "--input-format", + "stream-json", + "--verbose", + "--model", + DEFAULT_CLAUDE_MODEL, + "--dangerously-skip-permissions", + "--sdk-url", + url, + ], + { + env: process.env, + stderr: "pipe", + stdout: "pipe", + } + ); + + drainStream(this.child.stdout); + pipeToStderr(this.child.stderr); + + const timeout = new Promise((_, reject) => { + setTimeout( + () => reject(new Error("claude sdk startup timed out")), + START_TIMEOUT_MS + ); + }); + const exited = this.child.exited.then((code) => { + throw new Error(`claude exited with code ${code} during startup`); + }); + + await Promise.race([connected.then(() => initialized), timeout, exited]); + this.ready = true; + } catch (error) { + await this.cleanup(); + throw error; + } + } + + runTurn( + prompt: string, + opts: Options, + onParsed: Callback, + onRaw: Callback, + onDelta: Callback + ): Promise { + const task = this.lock.then(() => + this.runTurnExclusive(prompt, opts, onParsed, onRaw, onDelta) + ); + this.lock = task.then( + () => undefined, + () => undefined + ); + return task; + } + + interrupt(signal: ExitSignal): void { + this.child?.kill(signal); + } + + async close(): Promise { + this.closed = true; + if (this.turn) { + this.turn.reject(new Error("claude sdk server closed")); + this.turn = undefined; + } + await this.cleanup(); + } + + private broadcastToFrontends(data: string): void { + for (const ws of this.frontends) { + ws.send(data); + } + } + + private handleFrontendMessage( + _ws: ServerWebSocket, + text: string + ): void { + try { + const msg = JSON.parse(text); + if ( + msg.type === "message" && + typeof msg.content === "string" && + this.ws + ) { + this.sendJson({ + type: "user", + message: { role: "user", content: msg.content }, + parent_tool_use_id: null, + session_id: this.sessionId, + }); + } + } catch { + // ignore parse errors from frontends + } + } + + private createServer(): void { + const self = this; + this.server = serve({ + port: this.port, + fetch(req, server) { + const path = new URL(req.url).pathname; + const role: WSRole = path === "/ws" ? "frontend" : "claude"; + if (server.upgrade(req, { data: { role } as WSData })) { + return undefined; + } + return new Response("loop", { status: 200 }); + }, + websocket: { + idleTimeout: 0, + perMessageDeflate: false, + close(ws, _code, _reason) { + const d = ws.data as WSData; + if (d.role === "frontend") { + self.frontends.delete(ws as ServerWebSocket); + return; + } + if (!self.closed) { + self.handleUnexpectedClose(); + } + }, + message(ws, data) { + const d = ws.data as WSData; + const text = + typeof data === "string" + ? data + : new TextDecoder().decode(data as ArrayBuffer); + + if (d.role === "frontend") { + self.handleFrontendMessage(ws as ServerWebSocket, text); + return; + } + + for (const line of text.split("\n")) { + if (!line.trim()) { + continue; + } + try { + self.handleMessage(JSON.parse(line) as NdjsonMessage, line); + } catch { + // ignore parse errors + } + } + }, + open(ws) { + const d = ws.data as WSData; + if (d.role === "frontend") { + self.frontends.add(ws as ServerWebSocket); + // send current status + if (self.ready) { + ws.send( + `${JSON.stringify({ + type: "status", + text: "claude code is connected", + })}\n` + ); + } + return; + } + + self.ws = ws as ServerWebSocket; + self.waitingForConnection?.(); + self.waitingForConnection = undefined; + + // Send initialize control_request per SDK protocol + self.initRequestId = crypto.randomUUID(); + ws.send( + `${JSON.stringify({ + type: "control_request", + request_id: self.initRequestId, + request: { subtype: "initialize" }, + })}\n` + ); + }, + }, + }); + } + + private handleMessage(msg: NdjsonMessage, raw: string): void { + // broadcast to frontend observers + this.broadcastToFrontends(`${raw}\n`); + + if (this.turn) { + this.turn.combined += `${raw}\n`; + this.turn.onRaw(raw); + } + + switch (msg.type) { + case "system": + if (msg.subtype === "init") { + this.sessionId = msg.session_id || ""; + } + return; + case "control_response": + this.handleControlResponse(msg); + return; + case "stream_event": + this.handleStreamEvent(msg); + return; + case "assistant": + this.handleAssistant(msg); + return; + case "result": + this.handleResult(msg); + return; + case "control_request": + this.handleControlRequest(msg); + return; + default: + return; + } + } + + private handleControlResponse(msg: NdjsonMessage): void { + if ( + msg.response?.request_id === this.initRequestId && + msg.response?.subtype === "success" + ) { + this.waitingForInitialize?.(); + this.waitingForInitialize = undefined; + } + } + + private handleStreamEvent(msg: NdjsonMessage): void { + if (!this.turn) { + return; + } + const event = msg.event; + if ( + event?.type === "content_block_delta" && + event.delta?.type === "text_delta" && + event.delta.text + ) { + this.turn.hasStreamed = true; + this.turn.onDelta(event.delta.text); + } + } + + private handleAssistant(msg: NdjsonMessage): void { + if (!this.turn) { + return; + } + const content = msg.message?.content; + if (!Array.isArray(content)) { + return; + } + const text = content + .filter((b) => b.type === "text" && b.text) + .map((b) => b.text) + .join("") + .trim(); + if (!text) { + return; + } + this.turn.parsed = this.turn.parsed ? `${this.turn.parsed}\n${text}` : text; + if (!this.turn.hasStreamed) { + this.turn.onParsed(text); + } + } + + private handleResult(msg: NdjsonMessage): void { + if (!this.turn) { + return; + } + const state = this.turn; + this.turn = undefined; + state.resolve({ + combined: state.combined, + exitCode: msg.is_error ? 1 : 0, + parsed: state.parsed, + }); + } + + private handleControlRequest(msg: NdjsonMessage): void { + if (!(this.ws && msg.request_id)) { + return; + } + if (msg.request?.subtype === "can_use_tool") { + this.sendJson({ + type: "control_response", + response: { + subtype: "success", + request_id: msg.request_id, + response: { + behavior: "allow", + updatedInput: msg.request.input || {}, + }, + }, + }); + } + } + + private sendJson(data: Record): void { + this.ws?.send(`${JSON.stringify(data)}\n`); + } + + private async runTurnExclusive( + prompt: string, + _opts: Options, + onParsed: Callback, + onRaw: Callback, + onDelta: Callback + ): Promise { + if (!(this.child && this.ready)) { + await this.start(); + } + if (!this.ws) { + throw new Error("claude sdk server not connected"); + } + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + if (this.turn) { + this.turn = undefined; + reject(new Error("claude sdk turn timed out")); + } + }, WAIT_TIMEOUT_MS); + + this.turn = { + combined: "", + hasStreamed: false, + onDelta, + onParsed, + onRaw, + parsed: "", + resolve: (result) => { + clearTimeout(timeout); + resolve(result); + }, + reject: (error) => { + clearTimeout(timeout); + reject(error); + }, + }; + + this.sendJson({ + type: "user", + message: { role: "user", content: prompt }, + parent_tool_use_id: null, + session_id: this.sessionId, + }); + }); + } + + private async cleanup(): Promise { + for (const ws of this.frontends) { + try { + ws.close(); + } catch { + // ignore close errors + } + } + this.frontends.clear(); + if (this.ws) { + try { + this.ws.close(); + } catch { + // ignore close errors + } + this.ws = undefined; + } + if (this.server) { + this.server.stop(); + this.server = undefined; + } + if (this.child) { + this.child.kill("SIGTERM"); + await this.child.exited; + this.child = undefined; + } + this.ready = false; + this.started = false; + this.sessionId = ""; + } + + private handleUnexpectedClose(): void { + const state = this.turn; + this.turn = undefined; + this.broadcastToFrontends( + `${JSON.stringify({ type: "status", text: "claude code disconnected" })}\n` + ); + state?.reject(new Error("claude sdk connection closed unexpectedly")); + this.ws = undefined; + this.cleanup().catch(() => { + // ignore cleanup errors after unexpected websocket close + }); + } +} + +let singleton: ClaudeSdkClient | undefined; + +process.on("exit", () => { + singleton?.process?.kill("SIGKILL"); +}); + +const getClient = (): ClaudeSdkClient => { + if (!singleton) { + singleton = new ClaudeSdkClient(); + } + return singleton; +}; + +export const startClaudeSdk = async (): Promise => { + await getClient().start(); +}; + +export const runClaudeTurn = ( + prompt: string, + opts: Options, + callbacks: { onDelta: Callback; onParsed: Callback; onRaw: Callback } +): Promise => { + return getClient().runTurn( + prompt, + opts, + callbacks.onParsed, + callbacks.onRaw, + callbacks.onDelta + ); +}; + +export const interruptClaudeSdk = (signal: ExitSignal): void => { + getClient().interrupt(signal); +}; + +export const hasClaudeSdkProcess = (): boolean => getClient().hasProcess(); + +export const closeClaudeSdk = async (): Promise => { + if (!singleton) { + return; + } + await singleton.close(); + singleton = undefined; +}; diff --git a/src/loop/codex-app-server.ts b/src/loop/codex-app-server.ts index 65f6e85..6167ff8 100644 --- a/src/loop/codex-app-server.ts +++ b/src/loop/codex-app-server.ts @@ -1,4 +1,5 @@ import { spawn } from "bun"; +import { findFreePort } from "./ports"; import type { Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; @@ -31,7 +32,10 @@ interface TurnState { } const APP_SERVER_CMD = "codex"; -const APP_SERVER_ARGS = ["app-server"]; +const APP_SERVER_BASE_PORT = 4500; +const APP_SERVER_PORT_RANGE = 100; +const WS_CONNECT_ATTEMPTS = 40; +const WS_CONNECT_DELAY_MS = 150; const USER_INPUT_TEXT_ELEMENTS = "text_elements"; const WAIT_TIMEOUT_MS = 600_000; @@ -62,7 +66,14 @@ const METHODS_TRIGGERING_FALLBACK = new Set([ ]); type SpawnFn = (...args: Parameters) => ReturnType; +type ConnectWsFn = (url: string) => Promise; + let spawnFn: SpawnFn = spawn; +const defaultConnectWs: ConnectWsFn = async (url) => { + const { connectWs } = await import("./ws-client"); + return connectWs(url); +}; +let connectWsFn: ConnectWsFn = defaultConnectWs; const isString = (value: unknown): value is string => typeof value === "string" && value.length > 0; @@ -203,10 +214,17 @@ export const codexAppServerInternals = { restoreSpawnFn: (): void => { spawnFn = spawn; }, + setConnectWsFn: (next: ConnectWsFn): void => { + connectWsFn = next; + }, + restoreConnectWsFn: (): void => { + connectWsFn = defaultConnectWs; + }, }; class AppServerClient { private child: ReturnType | undefined; + private ws: import("./ws-client").WsClient | undefined; private closed = false; private started = false; private ready = false; @@ -230,18 +248,38 @@ class AppServerClient { } this.started = true; try { - const child = spawnFn([APP_SERVER_CMD, ...APP_SERVER_ARGS], { - env: process.env, - stderr: "pipe", - stdin: "pipe", - stdout: "pipe", - }); + const port = await this.findPort(); + const listenUrl = `ws://0.0.0.0:${port}`; + const connectUrl = `ws://127.0.0.1:${port}`; + const child = spawnFn( + [APP_SERVER_CMD, "app-server", "--listen", listenUrl], + { + env: process.env, + stderr: "pipe", + stdin: "pipe", + stdout: "pipe", + } + ); this.child = child; this.consumeFrames(child).finally(() => { if (!this.closed) { this.handleUnexpectedExit(); } }); + const ws = await this.connectWebSocket(connectUrl); + this.ws = ws; + ws.onmessage = (data) => { + for (const line of data.split("\n")) { + if (line.trim()) { + this.handleStdoutLine(line); + } + } + }; + ws.onclose = () => { + if (!this.closed) { + this.handleUnexpectedExit(); + } + }; await this.sendRequest(METHOD_INITIALIZE, { clientInfo: { name: "loop", @@ -252,6 +290,15 @@ class AppServerClient { }); this.ready = true; } catch (error) { + const ws = this.ws; + this.ws = undefined; + if (ws) { + try { + ws.close(); + } catch { + // ignore close errors + } + } if (this.child) { this.child.kill("SIGTERM"); this.child = undefined; @@ -265,6 +312,30 @@ class AppServerClient { } } + private findPort(): Promise { + return findFreePort(APP_SERVER_BASE_PORT, APP_SERVER_PORT_RANGE); + } + + private async connectWebSocket( + url: string + ): Promise { + for (let i = 0; i < WS_CONNECT_ATTEMPTS; i++) { + try { + return await connectWsFn(url); + } catch { + if (i === WS_CONNECT_ATTEMPTS - 1) { + throw new CodexAppServerFallbackError( + "failed to connect to codex app-server WebSocket" + ); + } + await new Promise((resolve) => + setTimeout(resolve, WS_CONNECT_DELAY_MS) + ); + } + } + throw new CodexAppServerFallbackError("unreachable"); + } + runTurn( prompt: string, opts: Options, @@ -288,6 +359,15 @@ class AppServerClient { async close(): Promise { this.closed = true; this.failAll(new Error("codex app-server closed")); + const ws = this.ws; + this.ws = undefined; + if (ws) { + try { + ws.close(); + } catch { + // ignore close errors + } + } if (!this.child) { this.started = false; this.ready = false; @@ -300,12 +380,12 @@ class AppServerClient { this.started = false; } - private async ensureThread(): Promise { + private async ensureThread(model: string): Promise { if (this.threadId) { return this.threadId; } const response = await this.sendRequest(METHOD_THREAD_START, { - model: null, + model, approvalPolicy: "never", experimentalRawEvents: true, persistExtendedHistory: true, @@ -333,7 +413,7 @@ class AppServerClient { throw new CodexAppServerFallbackError("codex app-server not running"); } - const threadId = await this.ensureThread(); + const threadId = await this.ensureThread(opts.model); const response = await this.sendRequest(METHOD_TURN_START, { threadId, input: buildInput(prompt), @@ -379,11 +459,22 @@ class AppServerClient { private async consumeFrames(proc: ReturnType): Promise { await Promise.all([ - this.consumeStream(proc.stdout, this.handleStdoutLine), + this.drainStream(proc.stdout), this.consumeStream(proc.stderr, this.handleStdErrLine), ]); } + private async drainStream(stream: ReadableStream): Promise { + const reader = stream.getReader(); + try { + while (!(await reader.read()).done) { + // drain only — all JSON-RPC goes through WebSocket + } + } finally { + reader.releaseLock(); + } + } + private async consumeStream( stream: ReadableStream, handler: (line: string) => void @@ -687,7 +778,7 @@ class AppServerClient { } private sendRequest(method: string, params: unknown): Promise { - if (!this.child || this.closed) { + if (!(this.ws || this.child) || this.closed) { return Promise.reject( new CodexAppServerFallbackError("codex app-server not initialized") ); @@ -699,7 +790,7 @@ class AppServerClient { params, }; try { - this.child.stdin.write(`${JSON.stringify(payload)}\n`); + this.sendFrame(payload); } catch (error) { throw new CodexAppServerFallbackError( `codex app-server request "${method}" failed to write: ${ @@ -717,19 +808,28 @@ class AppServerClient { }); } + private sendFrame(payload: Record): void { + const data = `${JSON.stringify(payload)}\n`; + if (this.ws) { + this.ws.send(data); + } else if (this.child) { + this.child.stdin.write(data); + } + } + private sendResponse( requestId: string, result: unknown, error: unknown ): void { - if (!this.child) { + if (!(this.ws || this.child)) { return; } const payload = error === undefined ? { id: requestId, result, jsonrpc: "2.0" } : { id: requestId, error, jsonrpc: "2.0" }; - this.child.stdin.write(`${JSON.stringify(payload)}\n`); + this.sendFrame(payload); } private failAll(error: Error): void { @@ -746,6 +846,15 @@ class AppServerClient { private handleUnexpectedExit(): void { this.child = undefined; + const ws = this.ws; + this.ws = undefined; + if (ws) { + try { + ws.close(); + } catch { + // ignore close errors + } + } this.started = false; this.ready = false; this.threadId = undefined; @@ -757,6 +866,10 @@ class AppServerClient { let singleton: AppServerClient | undefined; +process.on("exit", () => { + singleton?.process?.kill("SIGKILL"); +}); + const getClient = (): AppServerClient => { if (!singleton) { singleton = new AppServerClient(); diff --git a/src/loop/main.ts b/src/loop/main.ts index 39d8acf..bb671bf 100644 --- a/src/loop/main.ts +++ b/src/loop/main.ts @@ -3,111 +3,102 @@ import { runDraftPrStep } from "./pr"; import { buildWorkPrompt } from "./prompts"; import { resolveReviewers, runReview } from "./review"; import { runAgent } from "./runner"; -import type { Options } from "./types"; +import type { Options, ReviewResult } from "./types"; import { hasSignal } from "./utils"; -const doneSignalText = (doneSignal: string) => `done signal "${doneSignal}"`; -const doneSignalMissingText = (signal: string) => - `\n[loop] ${doneSignalText(signal)} detected, stopping.`; -const doneSignalPassedText = (signal: string) => - `\n[loop] ${doneSignalText(signal)} detected and review passed, stopping.`; -const doneSignalExitText = (doneSignal: string, exitCode: number) => - `[loop] ${doneSignalText(doneSignal)} seen despite exit code ${exitCode}.`; -const bothReviewersNotes = (notes: string): string => - "Both reviewers requested changes. Decide for each comment whether to address it now. " + - `If you skip one, explain why briefly. If both reviews found the same issue, it might be worth addressing.\n\n${notes}`; +const doneText = (s: string) => `done signal "${s}"`; + +const formatFollowUp = (review: ReviewResult) => { + if (review.failureCount > 1) { + const header = review.consensusFail + ? "Both reviewers requested changes. Decide for each comment whether to address it now. If you skip one, explain why briefly. If both reviews found the same issue, it might be worth addressing." + : "Multiple reviewers requested changes. Decide for each comment whether to address it now. If you skip one, explain why briefly."; + return { + notes: review.notes ? `${header}\n\n${review.notes}` : "", + log: review.consensusFail + ? "\n[loop] both reviewers requested changes. deciding what to address." + : "\n[loop] multiple reviewers requested changes. deciding what to address.", + }; + } + + return { + notes: review.notes, + log: "\n[loop] one reviewer requested changes. continuing loop.", + }; +}; const runIterations = async ( task: string, opts: Options, - reviewers: string[], - hasExistingPr = false -): Promise => { + reviewers: string[] +) => { let reviewNotes = ""; const shouldReview = reviewers.length > 0; - const doneSignal = opts.doneSignal; + const { doneSignal, maxIterations } = opts; console.log(`\n[loop] PLAN.md:\n\n${task}`); - for (let i = 1; i <= opts.maxIterations; i++) { - const tag = Number.isFinite(opts.maxIterations) - ? `/${opts.maxIterations}` - : ""; + for (let i = 1; i <= maxIterations; i++) { + const tag = Number.isFinite(maxIterations) ? `/${maxIterations}` : ""; console.log(`\n[loop] iteration ${i}${tag}`); - const prompt = buildWorkPrompt( - task, - opts.doneSignal, - opts.proof, - reviewNotes - ); + const prompt = buildWorkPrompt(task, doneSignal, opts.proof, reviewNotes); reviewNotes = ""; const result = await runAgent(opts.agent, prompt, opts); const output = `${result.parsed}\n${result.combined}`; const done = hasSignal(output, doneSignal); - if (!done && result.exitCode !== 0) { + if (result.exitCode !== 0) { + const hint = done ? ` (${doneText(doneSignal)} seen)` : ""; throw new Error( - `[loop] ${opts.agent} exited with code ${result.exitCode}` + `[loop] ${opts.agent} exited with code ${result.exitCode}${hint}` ); } if (!done) { continue; } - if (result.exitCode !== 0) { - console.log(doneSignalExitText(doneSignal, result.exitCode)); - } if (!shouldReview) { - console.log(doneSignalMissingText(doneSignal)); + console.log(`\n[loop] ${doneText(doneSignal)} detected, stopping.`); return true; } const review = await runReview(reviewers, task, opts); if (review.approved) { - await runDraftPrStep(task, opts, hasExistingPr); - console.log(doneSignalPassedText(opts.doneSignal)); - return true; - } - if (review.consensusFail) { - reviewNotes = bothReviewersNotes(review.notes); + await runDraftPrStep(task, opts); console.log( - "\n[loop] both reviews collected. original agent deciding what to address." + `\n[loop] ${doneText(doneSignal)} detected and review passed, stopping.` ); - continue; + return true; } - reviewNotes = review.notes || "Reviewer found more work to do."; - console.log("\n[loop] review found more work. continuing loop."); + const followUp = formatFollowUp(review); + reviewNotes = followUp.notes; + console.log(followUp.log); } return false; }; export const runLoop = async (task: string, opts: Options): Promise => { const reviewers = resolveReviewers(opts.review, opts.agent); - const interactive = process.stdin.isTTY; - const rl = interactive + const rl = process.stdin.isTTY ? createInterface({ input: process.stdin, output: process.stdout }) : undefined; - let hasExistingPr = false; let loopTask = task; - while (true) { - const done = await runIterations(loopTask, opts, reviewers, hasExistingPr); - if (reviewers.length > 0 && done) { - hasExistingPr = true; - } - if (!rl) { - if (!done) { - console.log( - `\n[loop] reached max iterations (${opts.maxIterations}), stopping.` - ); + try { + while (true) { + const done = await runIterations(loopTask, opts, reviewers); + if (done || !rl) { + if (!done) { + console.log( + `\n[loop] reached max iterations (${opts.maxIterations}), stopping.` + ); + } + return; } - return; - } - if (!done) { console.log(`\n[loop] reached max iterations (${opts.maxIterations}).`); + const answer = await rl.question( + "\n[loop] follow-up prompt (blank to exit): " + ); + if (!answer.trim()) { + return; + } + loopTask = `${loopTask}\n\nFollow-up:\n${answer.trim()}`; } - const answer = await rl.question( - "\n[loop] follow-up prompt (blank to exit): " - ); - const followUp = answer.trim() || null; - if (!followUp) { - rl.close(); - return; - } - loopTask = `${loopTask}\n\nFollow-up:\n${followUp}`; + } finally { + rl?.close(); } }; diff --git a/src/loop/ports.ts b/src/loop/ports.ts new file mode 100644 index 0000000..c6c2547 --- /dev/null +++ b/src/loop/ports.ts @@ -0,0 +1,44 @@ +import { connect } from "bun"; + +const PORT_PROBE_TIMEOUT_MS = 500; + +export const isPortTaken = (port: number): Promise => + new Promise((resolve) => { + const timer = setTimeout(() => resolve(false), PORT_PROBE_TIMEOUT_MS); + connect({ + hostname: "127.0.0.1", + port, + socket: { + open(sock) { + clearTimeout(timer); + sock.end(); + resolve(true); + }, + data() { + // probe only — no data expected + }, + close() { + // probe only — close is expected + }, + error() { + clearTimeout(timer); + resolve(false); + }, + }, + }).catch(() => { + clearTimeout(timer); + resolve(false); + }); + }); + +export const findFreePort = async ( + basePort: number, + range: number +): Promise => { + for (let port = basePort; port < basePort + range; port++) { + if (!(await isPortTaken(port))) { + return port; + } + } + throw new Error(`no free port in range ${basePort}-${basePort + range}`); +}; diff --git a/src/loop/prompts.ts b/src/loop/prompts.ts index e54bf7d..e840292 100644 --- a/src/loop/prompts.ts +++ b/src/loop/prompts.ts @@ -67,13 +67,16 @@ export const buildReviewPrompt = ( parts.push(`Proof requirements:\n${proof.trim()}`); parts.push( - `If more work is needed, explain what to change and end with "${REVIEW_FAIL}" on its own final line.` + `If review is needed, end your response with exactly "${REVIEW_FAIL}" on the final non-empty line. Nothing may follow this line.` ); parts.push( - `If the work is complete, end with "${REVIEW_PASS}" on its own final line.` + `If the work is complete, end with exactly "${REVIEW_PASS}" on the final non-empty line. No extra content after this line.` ); parts.push( - `${SPAWN_TEAM_WITH_WORKTREE_ISOLATION} Do not use "${doneSignal}" in your final line.` + "When reporting failures, include concrete file paths, commands, and code locations that must change." + ); + parts.push( + `${SPAWN_TEAM_WITH_WORKTREE_ISOLATION} The final line must be one of the two review signals on its own line, with no surrounding comments or markdown, and it must not include "${doneSignal}".` ); return parts.join("\n\n"); }; diff --git a/src/loop/review.ts b/src/loop/review.ts index a72637d..5af178c 100644 --- a/src/loop/review.ts +++ b/src/loop/review.ts @@ -1,4 +1,4 @@ -import { REVIEW_FAIL, REVIEW_PASS } from "./constants"; +import { NEWLINE_RE, REVIEW_FAIL, REVIEW_PASS } from "./constants"; import { buildReviewPrompt } from "./prompts"; import { runAgent } from "./runner"; import type { @@ -8,7 +8,163 @@ import type { ReviewResult, RunResult, } from "./types"; -import { hasSignal } from "./utils"; + +const REVIEW_SIGNAL_HELP = `Expected "${REVIEW_PASS}" or "${REVIEW_FAIL}" in output.`; +const REVIEW_FAILURE_FALLBACK = "Reviewer requested more work."; +const REVIEW_MISSING_SIGNAL = + "Reviewer output was missing a valid final review signal."; +const REVIEW_MALFORMED_SIGNAL = + "Reviewer output had a malformed final review signal."; +const REVIEW_MIXED_SIGNALS = + "Output contained both review pass and review fail signals."; +const REVIEW_TRAILING_SIGNAL = + "Reviewer output had content after the final review signal."; + +const QUOTED_REVIEW_PASS = `"${REVIEW_PASS}"`; +const QUOTED_REVIEW_FAIL = `"${REVIEW_FAIL}"`; + +type ReviewSignal = "pass" | "fail"; + +interface ReviewCheck { + reason: string; + status: "pass" | "fail"; +} + +interface ReviewSignalSummary { + finalLine: string | undefined; + finalLineIndex: number | undefined; + finalSignal: ReviewSignal | undefined; + hasFailSignal: boolean; + hasPassSignal: boolean; + lastSignalLineIndex: number | undefined; + lines: string[]; +} + +const cleanOutput = (result: RunResult): string => + `${result.parsed}\n${result.combined}`.replace(/\r/g, "").trimEnd(); + +const parseSignal = (line: string): ReviewSignal | undefined => { + const trimmed = line.trim(); + if (trimmed === REVIEW_PASS || trimmed === QUOTED_REVIEW_PASS) { + return "pass"; + } + if (trimmed === REVIEW_FAIL || trimmed === QUOTED_REVIEW_FAIL) { + return "fail"; + } + return undefined; +}; + +const splitOutputLines = (output: string): string[] => output.split(NEWLINE_RE); + +const getFinalNonEmptyLine = ( + lines: string[] +): { finalLine: string | undefined; finalLineIndex: number | undefined } => { + for (let index = lines.length - 1; index >= 0; index -= 1) { + if (lines[index].trim() !== "") { + return { finalLine: lines[index], finalLineIndex: index }; + } + } + + return { finalLine: undefined, finalLineIndex: undefined }; +}; + +const collectSignalPresence = ( + lines: string[] +): { + hasFailSignal: boolean; + hasPassSignal: boolean; + lastSignalLineIndex: number | undefined; +} => { + let hasFailSignal = false; + let hasPassSignal = false; + let lastSignalLineIndex: number | undefined; + + for (const [index, line] of lines.entries()) { + const signal = parseSignal(line); + if (signal === "fail") { + hasFailSignal = true; + lastSignalLineIndex = index; + continue; + } + if (signal === "pass") { + hasPassSignal = true; + lastSignalLineIndex = index; + } + } + + return { + hasFailSignal, + hasPassSignal, + lastSignalLineIndex, + }; +}; + +const lineContainsReviewSignalToken = (line: string): boolean => { + const trimmed = line.trim(); + return ( + trimmed.includes(REVIEW_PASS) || + trimmed.includes(REVIEW_FAIL) || + trimmed.includes(QUOTED_REVIEW_PASS) || + trimmed.includes(QUOTED_REVIEW_FAIL) + ); +}; + +const parseSignalSummary = (output: string): ReviewSignalSummary => { + const lines = splitOutputLines(output); + const { finalLine, finalLineIndex } = getFinalNonEmptyLine(lines); + const { hasPassSignal, hasFailSignal, lastSignalLineIndex } = + collectSignalPresence(lines); + const finalSignal = finalLine ? parseSignal(finalLine) : undefined; + + return { + finalLine, + finalLineIndex, + finalSignal, + hasFailSignal, + hasPassSignal, + lastSignalLineIndex, + lines, + }; +}; + +const reasonFromFailureOutput = ( + lines: string[], + finalLineIndex: number | undefined +): string => { + if (finalLineIndex === undefined) { + return REVIEW_FAILURE_FALLBACK; + } + + return ( + lines + .slice(0, finalLineIndex) + .filter((line) => line.trim()) + .join("\n") || REVIEW_FAILURE_FALLBACK + ); +}; + +const formatFailure = (reason: string): string => + `${reason} (${REVIEW_SIGNAL_HELP})`; + +const formatUnknownError = (reason: unknown): string => { + if (reason instanceof Error) { + return reason.message || "unknown error"; + } + + if (reason == null) { + return "unknown error"; + } + + if (typeof reason === "string") { + return reason; + } + + if (typeof reason === "boolean" || typeof reason === "number") { + return String(reason); + } + + return JSON.stringify(reason) || "unknown error"; +}; export const resolveReviewers = ( review: ReviewMode | undefined, @@ -38,63 +194,110 @@ const runReviewWith = async ( opts: Options ): Promise => { const prompt = buildReviewPrompt(task, opts.doneSignal, opts.proof); + const orderedReviewers = [...new Set(reviewers)]; const runOne = async (reviewer: Agent) => { console.log(`\n[loop] review with ${reviewer}`); return { result: await run(reviewer, prompt, opts), reviewer }; }; - const notes: string[] = []; - let failures = 0; + const reasonForFailure = (reviewer: Agent, reason: unknown): string => + `[loop] review ${reviewer} failed: ${formatUnknownError(reason)}`; - const addFailure = (reviewer: Agent, note: string): void => { - failures++; - notes.push( - `[${reviewer}] ${note.trim() || "Reviewer requested more work."}` - ); - }; - - const evaluateResult = ({ - result, - reviewer, - }: Awaited>): void => { + const evaluateOutput = (result: RunResult): ReviewCheck => { if (result.exitCode !== 0) { - addFailure( - reviewer, - `[loop] review ${reviewer} exited with code ${result.exitCode}` - ); - return; + return { + status: "fail", + reason: formatFailure( + `[loop] review exited with code ${result.exitCode}` + ), + }; + } + + const output = cleanOutput(result); + const signals = parseSignalSummary(output); + const final = signals.finalSignal; + const finalHasReviewToken = signals.finalLine + ? lineContainsReviewSignalToken(signals.finalLine) + : false; + + if ( + signals.lastSignalLineIndex !== undefined && + signals.lastSignalLineIndex !== signals.finalLineIndex + ) { + return { + status: "fail", + reason: formatFailure(REVIEW_TRAILING_SIGNAL), + }; + } + + if (!final) { + return { + status: "fail", + reason: formatFailure( + finalHasReviewToken ? REVIEW_MALFORMED_SIGNAL : REVIEW_MISSING_SIGNAL + ), + }; + } + + if (final === "pass" && signals.hasFailSignal) { + return { + status: "fail", + reason: formatFailure(REVIEW_MIXED_SIGNALS), + }; + } + + if (final === "fail" && signals.hasPassSignal) { + return { + status: "fail", + reason: formatFailure(REVIEW_MIXED_SIGNALS), + }; } - const text = `${result.parsed}\n${result.combined}`; - if (hasSignal(text, REVIEW_PASS) && !hasSignal(text, REVIEW_FAIL)) { - return; + if (final === "fail") { + return { + status: "fail", + reason: formatFailure( + reasonFromFailureOutput(signals.lines, signals.finalLineIndex) + ), + }; } - addFailure( - reviewer, - (result.parsed || result.combined).trim() || - "Reviewer requested more work." - ); + return { + status: "pass", + reason: "", + }; + }; + + const notes: string[] = []; + let failures = 0; + + const addFailure = (reviewer: Agent, reason: string): void => { + failures += 1; + notes.push(`[${reviewer}] ${reason.trim() || REVIEW_FAILURE_FALLBACK}`); }; - const settled = await Promise.allSettled(reviewers.map(runOne)); + const settled = await Promise.allSettled( + orderedReviewers.map((reviewer) => runOne(reviewer)) + ); for (const [index, outcome] of settled.entries()) { if (outcome.status === "fulfilled") { - evaluateResult(outcome.value); + const check = evaluateOutput(outcome.value.result); + if (check.status === "fail") { + addFailure(outcome.value.reviewer, check.reason); + } continue; } - const reason = - outcome.reason instanceof Error - ? outcome.reason.message - : String(outcome.reason); - addFailure(reviewers[index], reason || "Reviewer run failed."); + const reviewer = orderedReviewers[index]; + addFailure(reviewer, reasonForFailure(reviewer, outcome.reason)); } return { approved: failures === 0, - consensusFail: reviewers.length > 1 && failures === reviewers.length, + consensusFail: + orderedReviewers.length > 1 && failures === orderedReviewers.length, + failureCount: failures, notes: notes.join("\n\n"), }; }; diff --git a/src/loop/runner.ts b/src/loop/runner.ts index 73db7d2..c0ff382 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -1,4 +1,10 @@ import { spawn } from "bun"; +import { + hasClaudeSdkProcess, + interruptClaudeSdk, + runClaudeTurn, + startClaudeSdk, +} from "./claude-sdk-server"; import { CODEX_TRANSPORT_ENV, CODEX_TRANSPORT_EXEC, @@ -34,6 +40,7 @@ const SIGNAL_EXIT_CODES: Record = { const activeChildren = new Set>(); let activeAppServerRuns = 0; +let activeClaudeSdkRuns = 0; let watchingSignals = false; let fallbackWarned = false; const runnerState: RunnerState = { @@ -50,19 +57,26 @@ const killChildren = (signal: ExitSignal): void => { const onSigint = (): void => { killChildren("SIGINT"); interruptAppServer("SIGINT"); + interruptClaudeSdk("SIGINT"); process.exit(SIGNAL_EXIT_CODES.SIGINT); }; const onSigterm = (): void => { killChildren("SIGTERM"); interruptAppServer("SIGTERM"); + interruptClaudeSdk("SIGTERM"); process.exit(SIGNAL_EXIT_CODES.SIGTERM); }; const syncSignalHandlers = (): void => { const hasAppServerWork = hasAppServerProcess(); + const hasClaudeSdkWork = hasClaudeSdkProcess(); const hasWork = - activeChildren.size > 0 || activeAppServerRuns > 0 || hasAppServerWork; + activeChildren.size > 0 || + activeAppServerRuns > 0 || + activeClaudeSdkRuns > 0 || + hasAppServerWork || + hasClaudeSdkWork; if (hasWork && !watchingSignals) { process.on("SIGINT", onSigint); process.on("SIGTERM", onSigterm); @@ -370,6 +384,43 @@ export const runnerInternals = { }, }; +const runClaudeAgent = async ( + prompt: string, + opts: Options +): Promise => { + let parsed = ""; + let state = { parsed: "", prettyCount: 0, lastMessage: "" }; + const onParsed = (text: string): void => { + state = appendParsedLine(text, opts, state); + parsed = state.parsed; + }; + const onRaw = (text: string): void => { + if (opts.format === "raw") { + process.stdout.write(`${text}\n`); + } + }; + const onDelta = (text: string): void => { + if (opts.format === "pretty") { + process.stdout.write(text); + } + }; + + activeClaudeSdkRuns += 1; + syncSignalHandlers(); + try { + await startClaudeSdk(); + const result = await runClaudeTurn(prompt, opts, { + onDelta, + onParsed, + onRaw, + }); + return { ...result, parsed: result.parsed || parsed }; + } finally { + activeClaudeSdkRuns -= 1; + syncSignalHandlers(); + } +}; + export const runAgent = ( agent: Agent, prompt: string, @@ -378,5 +429,8 @@ export const runAgent = ( if (agent === "codex") { return runCodexAgent(prompt, opts); } + if (agent === "claude") { + return runClaudeAgent(prompt, opts); + } return runLegacyAgent(agent, prompt, opts); }; diff --git a/src/loop/types.ts b/src/loop/types.ts index 1a68694..138950f 100644 --- a/src/loop/types.ts +++ b/src/loop/types.ts @@ -32,5 +32,6 @@ export interface RunResult { export interface ReviewResult { approved: boolean; consensusFail: boolean; + failureCount: number; notes: string; } diff --git a/src/loop/ws-client.ts b/src/loop/ws-client.ts new file mode 100644 index 0000000..c6f1cb3 --- /dev/null +++ b/src/loop/ws-client.ts @@ -0,0 +1,254 @@ +/** + * Minimal WebSocket client over raw TCP using Bun.connect. + * Works around Bun's built-in WebSocket client incompatibility + * with the codex app-server. + */ +import { connect } from "bun"; + +type MessageHandler = (data: string) => void; +type CloseHandler = () => void; + +const WS_OPCODE_TEXT = 0x01; +const WS_OPCODE_CONTINUATION = 0x00; +const WS_OPCODE_CLOSE = 0x08; +const WS_OPCODE_PING = 0x09; +const WS_OPCODE_PONG = 0x0a; +const WS_FIN_BIT = 0x80; +const WS_MASK_BIT = 0x80; +export interface WsClient { + close(): void; + onclose: CloseHandler | undefined; + onmessage: MessageHandler | undefined; + send(data: string): void; +} + +const encodeFrame = (text: string): Uint8Array => { + const payload = new TextEncoder().encode(text); + const mask = crypto.getRandomValues(new Uint8Array(4)); + const len = payload.length; + + let header: Uint8Array; + if (len < 126) { + header = new Uint8Array([WS_FIN_BIT | WS_OPCODE_TEXT, WS_MASK_BIT | len]); + } else if (len < 65_536) { + header = new Uint8Array([ + WS_FIN_BIT | WS_OPCODE_TEXT, + WS_MASK_BIT | 126, + (len >> 8) & 0xff, + len & 0xff, + ]); + } else { + header = new Uint8Array(10); + header[0] = WS_FIN_BIT | WS_OPCODE_TEXT; + header[1] = WS_MASK_BIT | 127; + const view = new DataView(header.buffer); + view.setBigUint64(2, BigInt(len)); + } + + const masked = new Uint8Array(payload.length); + for (let i = 0; i < payload.length; i++) { + masked[i] = payload[i] ^ mask[i % 4]; + } + + const frame = new Uint8Array(header.length + 4 + masked.length); + frame.set(header, 0); + frame.set(mask, header.length); + frame.set(masked, header.length + 4); + return frame; +}; + +const encodeCloseFrame = (): Uint8Array => { + const mask = crypto.getRandomValues(new Uint8Array(4)); + return new Uint8Array([ + WS_FIN_BIT | WS_OPCODE_CLOSE, + WS_MASK_BIT | 0, + ...mask, + ]); +}; + +const encodePongFrame = (payload: Uint8Array): Uint8Array => { + const mask = crypto.getRandomValues(new Uint8Array(4)); + const len = payload.length; + const header = new Uint8Array([ + WS_FIN_BIT | WS_OPCODE_PONG, + WS_MASK_BIT | len, + ]); + const masked = new Uint8Array(len); + for (let i = 0; i < len; i++) { + masked[i] = payload[i] ^ mask[i % 4]; + } + const frame = new Uint8Array(header.length + 4 + masked.length); + frame.set(header, 0); + frame.set(mask, header.length); + frame.set(masked, header.length + 4); + return frame; +}; + +export const connectWs = (url: string): Promise => { + const parsed = new URL(url); + const hostname = parsed.hostname; + const port = Number(parsed.port) || 80; + const path = parsed.pathname || "/"; + + return new Promise((resolve, reject) => { + let handshakeDone = false; + let httpBuffer = ""; + let frameBuffer = new Uint8Array(0); + let closed = false; + + const client: WsClient = { + onmessage: undefined, + onclose: undefined, + send: () => { + // no-op until handshake completes + }, + close: () => { + // no-op until handshake completes + }, + }; + + const key = btoa( + String.fromCharCode(...crypto.getRandomValues(new Uint8Array(16))) + ); + + const append = (existing: Uint8Array, chunk: Uint8Array): Uint8Array => { + const merged = new Uint8Array(existing.length + chunk.length); + merged.set(existing, 0); + merged.set(chunk, existing.length); + return merged; + }; + + const processFrames = (): void => { + while (frameBuffer.length >= 2) { + const fin = (frameBuffer[0] & WS_FIN_BIT) !== 0; + const opcode = frameBuffer[0] & 0x0f; + const masked = (frameBuffer[1] & WS_MASK_BIT) !== 0; + let payloadLen = frameBuffer[1] & 0x7f; + let offset = 2; + + if (payloadLen === 126) { + if (frameBuffer.length < 4) { + return; + } + payloadLen = (frameBuffer[2] << 8) | frameBuffer[3]; + offset = 4; + } else if (payloadLen === 127) { + if (frameBuffer.length < 10) { + return; + } + const view = new DataView(frameBuffer.buffer, frameBuffer.byteOffset); + payloadLen = Number(view.getBigUint64(2)); + offset = 10; + } + + if (masked) { + offset += 4; + } + if (frameBuffer.length < offset + payloadLen) { + return; + } + + const payload = frameBuffer.slice(offset, offset + payloadLen); + if (masked) { + const maskKey = frameBuffer.slice(offset - 4, offset); + for (let i = 0; i < payload.length; i++) { + payload[i] ^= maskKey[i % 4]; + } + } + + frameBuffer = frameBuffer.slice(offset + payloadLen); + + if (!fin || opcode === WS_OPCODE_CONTINUATION) { + if (!closed) { + closed = true; + process.stderr.write( + "[loop] ws-client: fragmented frames are unsupported\n" + ); + socket?.end(encodeCloseFrame()); + } + return; + } + + if (opcode === WS_OPCODE_TEXT) { + client.onmessage?.(new TextDecoder().decode(payload)); + } else if (opcode === WS_OPCODE_CLOSE) { + socket?.end(encodeCloseFrame()); + } else if (opcode === WS_OPCODE_PING) { + socket?.write(encodePongFrame(payload)); + } + } + }; + + let socket: ReturnType extends Promise ? T : never; + + connect({ + hostname, + port, + socket: { + open(sock) { + socket = sock; + sock.write( + `GET ${path} HTTP/1.1\r\nHost: ${hostname}:${port}\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: ${key}\r\nSec-WebSocket-Version: 13\r\n\r\n` + ); + }, + data(_sock, data) { + const chunk = + data instanceof Uint8Array ? data : new Uint8Array(data); + + if (!handshakeDone) { + httpBuffer += new TextDecoder().decode(chunk); + const endIdx = httpBuffer.indexOf("\r\n\r\n"); + if (endIdx === -1) { + return; + } + if (!httpBuffer.startsWith("HTTP/1.1 101")) { + reject(new Error(`WebSocket upgrade failed: ${httpBuffer}`)); + return; + } + handshakeDone = true; + + client.send = (text: string) => { + if (!closed) { + socket.write(encodeFrame(text)); + } + }; + client.close = () => { + if (closed) { + return; + } + closed = true; + socket.end(encodeCloseFrame()); + }; + + const remaining = httpBuffer.slice(endIdx + 4); + if (remaining.length > 0) { + frameBuffer = append( + frameBuffer, + new TextEncoder().encode(remaining) + ); + processFrames(); + } + resolve(client); + return; + } + + frameBuffer = append(frameBuffer, chunk); + processFrames(); + }, + close() { + if (!handshakeDone) { + reject(new Error("WebSocket connection closed before handshake")); + return; + } + closed = true; + client.onclose?.(); + }, + error(_sock, err) { + if (!handshakeDone) { + reject(err); + } + }, + }, + }).catch(reject); + }); +}; diff --git a/tests/loop/codex-app-server.test.ts b/tests/loop/codex-app-server.test.ts index c6a6655..e155502 100644 --- a/tests/loop/codex-app-server.test.ts +++ b/tests/loop/codex-app-server.test.ts @@ -113,12 +113,43 @@ const installSpawn = (appServerModule: AppServerModule): void => { ); }; +const installConnectWs = (appServerModule: AppServerModule): void => { + appServerModule.codexAppServerInternals.setConnectWsFn(() => { + const client: import("../../src/loop/ws-client").WsClient = { + onmessage: undefined, + onclose: undefined, + send: (data: string) => { + for (const raw of data.split("\n")) { + if (!raw.trim()) { + continue; + } + const proc = processes.at(-1); + if (proc) { + proc.writes.push(raw); + } + const request = JSON.parse(raw) as RequestFrame; + makeStreamResponse(request, (frame) => { + queueMicrotask(() => { + client.onmessage?.(JSON.stringify(frame)); + }); + }); + } + }, + close: () => { + // noop for tests + }, + }; + return Promise.resolve(client); + }); +}; + const getModule = async (): Promise => { if (!modulePromise) { modulePromise = import("../../src/loop/codex-app-server"); } moduleExports = await modulePromise; installSpawn(moduleExports); + installConnectWs(moduleExports); return moduleExports; }; @@ -146,6 +177,7 @@ const resetState = async (): Promise => { processes = []; currentHandler = noopRequestHandler; appServer.codexAppServerInternals.restoreSpawnFn(); + appServer.codexAppServerInternals.restoreConnectWsFn(); }; afterEach(async () => { diff --git a/tests/loop/main.test.ts b/tests/loop/main.test.ts index a1b4c26..a8aafc3 100644 --- a/tests/loop/main.test.ts +++ b/tests/loop/main.test.ts @@ -25,6 +25,7 @@ const makeRunResult = ( const noopReview = async (): Promise => ({ approved: true, consensusFail: false, + failureCount: 0, notes: "", }); @@ -92,17 +93,15 @@ test("runLoop stops immediately on done signal when review is disabled", async ( expect(runDraftPrStep).not.toHaveBeenCalled(); }); -test("runLoop stops on done signal even if agent exits non-zero when review is disabled", async () => { - const { runLoop, runAgent, runReview, runDraftPrStep } = await loadRunLoop({ +test("runLoop throws when agent exits non-zero even with done signal (no review)", async () => { + const { runLoop } = await loadRunLoop({ resolveReviewers: () => [], runAgent: async () => makeRunResult("", "", 1), }); - await runLoop("Ship feature", makeOptions({ review: undefined })); - - expect(runAgent).toHaveBeenCalledTimes(1); - expect(runReview).not.toHaveBeenCalled(); - expect(runDraftPrStep).not.toHaveBeenCalled(); + await expect( + runLoop("Ship feature", makeOptions({ review: undefined })) + ).rejects.toThrow("exited with code 1"); }); test("runLoop creates draft PR when done signal is reviewed and approved", async () => { @@ -113,6 +112,7 @@ test("runLoop creates draft PR when done signal is reviewed and approved", async runReview: async () => ({ approved: true, consensusFail: false, + failureCount: 0, notes: "", }), }); @@ -121,50 +121,60 @@ test("runLoop creates draft PR when done signal is reviewed and approved", async expect(runAgent).toHaveBeenCalledTimes(1); expect(runReview).toHaveBeenCalledTimes(1); - expect(runDraftPrStep).toHaveBeenNthCalledWith( - 1, - "Ship feature", - opts, - false - ); + expect(runDraftPrStep).toHaveBeenNthCalledWith(1, "Ship feature", opts); }); -test("runLoop creates draft PR when done signal is reviewed and approved even if agent exits non-zero", async () => { - const opts = makeOptions({ review: "claudex" }); - const { runLoop, runAgent, runReview, runDraftPrStep } = await loadRunLoop({ +test("runLoop throws when agent exits non-zero even with done signal (with review)", async () => { + const { runLoop, runReview, runDraftPrStep } = await loadRunLoop({ resolveReviewers: () => ["codex", "claude"], runAgent: async () => makeRunResult("", "", 1), - runReview: async () => ({ - approved: true, - consensusFail: false, - notes: "", - }), }); - await runLoop("Ship feature", opts); - - expect(runAgent).toHaveBeenCalledTimes(1); - expect(runReview).toHaveBeenCalledTimes(1); - expect(runDraftPrStep).toHaveBeenCalledTimes(1); - expect(runDraftPrStep).toHaveBeenNthCalledWith( - 1, - "Ship feature", - opts, - false + await expect(runLoop("Ship feature", makeOptions())).rejects.toThrow( + "exited with code 1" ); + expect(runReview).not.toHaveBeenCalled(); + expect(runDraftPrStep).not.toHaveBeenCalled(); }); -test("runLoop uses follow-up commit prompt after a PR is already created", async () => { - const answers = ["Update docs", ""]; - const { runLoop, runAgent, runReview, runDraftPrStep } = await loadRunLoop({ - resolveReviewers: () => ["codex", "claude"], +test("runLoop prompts for follow-up in interactive mode on max iterations", async () => { + let callCount = 0; + const { runLoop, runAgent } = await loadRunLoop({ + resolveReviewers: () => [], + runAgent: () => { + callCount++; + return Promise.resolve( + callCount <= 2 ? makeRunResult("working") : makeRunResult("") + ); + }, + question: async () => (callCount <= 2 ? "Do more work" : ""), + }); + + const originalIsTty = process.stdin.isTTY; + Object.defineProperty(process.stdin, "isTTY", { + configurable: true, + value: true, + }); + try { + await runLoop( + "Ship feature", + makeOptions({ maxIterations: 2, review: undefined }) + ); + } finally { + Object.defineProperty(process.stdin, "isTTY", { + configurable: true, + value: originalIsTty, + }); + } + + expect(runAgent).toHaveBeenCalledTimes(3); +}); + +test("runLoop exits immediately on done signal in interactive mode", async () => { + const { runLoop, runAgent } = await loadRunLoop({ + resolveReviewers: () => [], runAgent: async () => makeRunResult(""), - runReview: async () => ({ - approved: true, - consensusFail: false, - notes: "", - }), - question: async () => answers.shift() ?? "", + question: async () => "should not be called", }); const originalIsTty = process.stdin.isTTY; @@ -173,7 +183,7 @@ test("runLoop uses follow-up commit prompt after a PR is already created", async value: true, }); try { - await runLoop("Ship feature", makeOptions({ review: "claudex" })); + await runLoop("Ship feature", makeOptions({ review: undefined })); } finally { Object.defineProperty(process.stdin, "isTTY", { configurable: true, @@ -181,20 +191,7 @@ test("runLoop uses follow-up commit prompt after a PR is already created", async }); } - expect(runAgent).toHaveBeenCalledTimes(2); - expect(runReview).toHaveBeenCalledTimes(2); - expect(runDraftPrStep).toHaveBeenNthCalledWith( - 1, - "Ship feature", - expect.any(Object), - false - ); - expect(runDraftPrStep).toHaveBeenNthCalledWith( - 2, - "Ship feature\n\nFollow-up:\nUpdate docs", - expect.any(Object), - true - ); + expect(runAgent).toHaveBeenCalledTimes(1); }); test("runLoop forwards consensus review notes into the next iteration prompt", async () => { @@ -221,6 +218,7 @@ test("runLoop forwards consensus review notes into the next iteration prompt", a runReview: async () => ({ approved: false, consensusFail: true, + failureCount: 2, notes: "[codex] Fix tests.\n\n[claude] Improve docs.", }), }); @@ -235,7 +233,7 @@ test("runLoop forwards consensus review notes into the next iteration prompt", a expect(runReview).toHaveBeenCalledTimes(1); }); -test("runLoop forwards single-review fallback notes into the next iteration prompt", async () => { +test("runLoop forwards single-review notes into the next iteration prompt", async () => { const promptNotes: string[] = []; let runCount = 0; @@ -259,7 +257,8 @@ test("runLoop forwards single-review fallback notes into the next iteration prom runReview: async () => ({ approved: false, consensusFail: false, - notes: "", + failureCount: 1, + notes: "[codex] Reviewer found more work to do.", }), }); @@ -267,7 +266,7 @@ test("runLoop forwards single-review fallback notes into the next iteration prom expect(buildWorkPrompt).toHaveBeenCalledTimes(2); expect(promptNotes[0]).toBe(""); - expect(promptNotes[1]).toBe("Reviewer found more work to do."); + expect(promptNotes[1]).toBe("[codex] Reviewer found more work to do."); }); test("runLoop stops after max iterations when done signal is never found", async () => { diff --git a/tests/loop/prompts.test.ts b/tests/loop/prompts.test.ts index bcce0c5..38a2f89 100644 --- a/tests/loop/prompts.test.ts +++ b/tests/loop/prompts.test.ts @@ -46,13 +46,27 @@ test("buildWorkPrompt keeps proof when only a substring appears in task", () => expect(prompt).toContain("Proof requirements:\ntest"); }); -test("buildReviewPrompt includes pass and fail instructions and verification", () => { +test("buildReviewPrompt includes strict review signal instructions", () => { const prompt = buildReviewPrompt(" do task ", "", "must pass ci"); expect(prompt).toContain("Task:\ndo task"); - expect(prompt).toContain(`end with "${REVIEW_FAIL}" on its own final line`); - expect(prompt).toContain(`end with "${REVIEW_PASS}" on its own final line`); + expect(prompt).toContain( + `If review is needed, end your response with exactly "${REVIEW_FAIL}"` + ); + expect(prompt).toContain( + `If the work is complete, end with exactly "${REVIEW_PASS}"` + ); + expect(prompt).toContain("final non-empty line"); + expect(prompt).toContain("Nothing may follow this line."); + expect(prompt).toContain("No extra content after this line."); + expect(prompt).toContain(`"${REVIEW_PASS}"`); + expect(prompt).toContain( + "concrete file paths, commands, and code locations that must change." + ); expect(prompt).toContain("Proof requirements:\nmust pass ci"); expect(prompt).toContain("worktree isolation"); - expect(prompt).toContain('Do not use "" in your final line.'); + expect(prompt).toContain("must not include"); + expect(prompt).toContain( + "The final line must be one of the two review signals" + ); }); diff --git a/tests/loop/review-run.test.ts b/tests/loop/review-run.test.ts index e37913f..4f1c638 100644 --- a/tests/loop/review-run.test.ts +++ b/tests/loop/review-run.test.ts @@ -33,7 +33,12 @@ test("runReview approves when all reviewers pass", async () => { makeOptions() ); - expect(result).toEqual({ approved: true, consensusFail: false, notes: "" }); + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); expect(runAgentMock).toHaveBeenCalledTimes(2); }); @@ -41,15 +46,92 @@ test("runReview treats mixed PASS and FAIL as failure", async () => { const { runReview } = makeRunReview(async () => ({ combined: "", exitCode: 0, - parsed: `${REVIEW_PASS}\n${REVIEW_FAIL}\nNeeds one more fix.`, + parsed: `Note this issue.\n${REVIEW_PASS}\n${REVIEW_FAIL}`, })); const result = await runReview(["codex"], "ship task", makeOptions()); expect(result.approved).toBe(false); expect(result.consensusFail).toBe(false); + expect(result.failureCount).toBe(1); expect(result.notes).toContain("[codex]"); - expect(result.notes).toContain("Needs one more fix."); + expect(result.notes).toContain("both"); +}); + +test("runReview rejects missing final review signal", async () => { + const { runReview } = makeRunReview(async () => ({ + combined: "", + exitCode: 0, + parsed: "Needs one more fix.", + })); + + const result = await runReview(["codex"], "ship task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.consensusFail).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("missing a valid final review signal"); +}); + +test("runReview rejects malformed final review signal", async () => { + const { runReview } = makeRunReview(async () => ({ + combined: "", + exitCode: 0, + parsed: `${REVIEW_PASS} please`, + })); + + const result = await runReview(["codex"], "ship task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.consensusFail).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("malformed final review signal"); +}); + +test("runReview rejects trailing content after final review signal", async () => { + const { runReview } = makeRunReview(async () => ({ + combined: "", + exitCode: 0, + parsed: `${REVIEW_FAIL}\nPlease address this next`, + })); + + const result = await runReview(["codex"], "ship task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.consensusFail).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("content after the final review signal"); +}); + +test("runReview accepts quoted final review signal", async () => { + const { runReview } = makeRunReview(async () => ({ + combined: "", + exitCode: 0, + parsed: `Looks good.\n"${REVIEW_PASS}"`, + })); + + const result = await runReview(["codex"], "ship task", makeOptions()); + + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); +}); + +test("runReview uses fallback message when fail output has no body", async () => { + const { runReview } = makeRunReview(async () => ({ + combined: "", + exitCode: 0, + parsed: `${REVIEW_FAIL}`, + })); + + const result = await runReview(["codex"], "ship task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("Reviewer requested more work."); }); test("runReview marks consensus failure when every reviewer fails", async () => { @@ -77,10 +159,46 @@ test("runReview marks consensus failure when every reviewer fails", async () => expect(result.approved).toBe(false); expect(result.consensusFail).toBe(true); + expect(result.failureCount).toBe(2); expect(result.notes).toContain("[codex]"); expect(result.notes).toContain("[claude]"); }); +test("runReview accepts pass signal from combined output with parsed body", async () => { + const { runReview } = makeRunReview(() => ({ + combined: `\n\t${REVIEW_PASS}\n`, + exitCode: 0, + parsed: "Changed files:\n- src/loop/review.ts", + })); + + const result = await runReview( + ["codex", "claude"], + "ship task", + makeOptions() + ); + + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); +}); + +test("runReview reads fail signal from combined output with body", async () => { + const { runReview } = makeRunReview(() => ({ + combined: `Address one more edge case.\n${REVIEW_FAIL}`, + exitCode: 0, + parsed: "", + })); + + const result = await runReview(["codex"], "ship task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("Address one more edge case."); +}); + test("runReview is not consensus failure when only one reviewer fails", async () => { const { runReview } = makeRunReview((reviewer) => { if (reviewer === "codex") { @@ -106,22 +224,11 @@ test("runReview is not consensus failure when only one reviewer fails", async () expect(result.approved).toBe(false); expect(result.consensusFail).toBe(false); + expect(result.failureCount).toBe(1); expect(result.notes).toContain("[claude]"); expect(result.notes).not.toContain("[codex]"); }); -test("runReview falls back to default note when reviewer output is empty", async () => { - const { runReview } = makeRunReview(async () => ({ - combined: "", - exitCode: 0, - parsed: "", - })); - - const result = await runReview(["codex"], "ship task", makeOptions()); - - expect(result.notes).toContain("Reviewer requested more work."); -}); - test("runReview keeps parallel results when one reviewer exits non-zero", async () => { const { runReview } = makeRunReview((reviewer) => { if (reviewer === "codex") { @@ -147,6 +254,7 @@ test("runReview keeps parallel results when one reviewer exits non-zero", async expect(result.approved).toBe(false); expect(result.consensusFail).toBe(false); + expect(result.failureCount).toBe(1); expect(result.notes).toContain("[claude]"); expect(result.notes).toContain("exited with code 2"); expect(result.notes).not.toContain("[codex]"); @@ -162,6 +270,78 @@ test("runReview handles non-zero exit for a single reviewer as failure", async ( const result = await runReview(["codex"], "ship task", makeOptions()); expect(result.approved).toBe(false); expect(result.consensusFail).toBe(false); + expect(result.failureCount).toBe(1); expect(result.notes).toContain("[codex]"); expect(result.notes).toContain("exited with code 2"); }); + +test("runReview supports one failing reviewer through rejection and keeps deterministic notes", async () => { + const { runReview } = makeRunReview((reviewer) => { + if (reviewer === "codex") { + return Promise.resolve({ + combined: "", + exitCode: 0, + parsed: REVIEW_PASS, + }); + } + + return Promise.reject(new Error("reviewer timed out")); + }); + + const result = await runReview( + ["codex", "claude"], + "ship task", + makeOptions() + ); + + expect(result.approved).toBe(false); + expect(result.consensusFail).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toBe( + "[claude] [loop] review claude failed: reviewer timed out" + ); +}); + +test("runReview keeps deterministic note ordering regardless completion order", async () => { + const { runReview } = makeRunReview(async (reviewer) => { + if (reviewer === "codex") { + await new Promise((resolve) => setTimeout(resolve, 20)); + return { + combined: "", + exitCode: 0, + parsed: `codex failed.\n${REVIEW_FAIL}`, + }; + } + await new Promise((resolve) => setTimeout(resolve, 1)); + return { + combined: "", + exitCode: 0, + parsed: `claude failed.\n${REVIEW_FAIL}`, + }; + }); + + const result = await runReview( + ["codex", "claude"], + "ship task", + makeOptions() + ); + + expect(result.failureCount).toBe(2); + expect(result.notes).toBe( + '[codex] codex failed. (Expected "PASS" or "FAIL" in output.)\n\n' + + '[claude] claude failed. (Expected "PASS" or "FAIL" in output.)' + ); +}); + +test("runReview formats non-Error reviewer rejection deterministically", async () => { + const { runReview } = makeRunReview(() => + Promise.reject({ reason: "timeout", code: 9 }) + ); + const result = await runReview(["codex"], "ship task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toBe( + '[codex] [loop] review codex failed: {"reason":"timeout","code":9}' + ); +}); diff --git a/tests/loop/review.test.ts b/tests/loop/review.test.ts index 8e277fa..d3907dd 100644 --- a/tests/loop/review.test.ts +++ b/tests/loop/review.test.ts @@ -1,5 +1,28 @@ import { expect, test } from "bun:test"; -import { resolveReviewers } from "../../src/loop/review"; +import { REVIEW_FAIL, REVIEW_PASS } from "../../src/loop/constants"; +import { createRunReview, resolveReviewers } from "../../src/loop/review"; +import type { Options, RunResult } from "../../src/loop/types"; + +const makeRunResult = ({ + parsed = "", + combined = "", + exitCode = 0, +}: Partial = {}): RunResult => ({ + combined, + exitCode, + parsed, +}); + +const makeOptions = (overrides: Partial = {}): Options => ({ + agent: "codex", + doneSignal: "", + proof: "verify with tests", + format: "raw", + maxIterations: 2, + model: "test-model", + review: "claudex", + ...overrides, +}); test("resolveReviewers returns empty list when review is not enabled", () => { expect(resolveReviewers(undefined, "codex")).toEqual([]); @@ -14,3 +37,303 @@ test("resolveReviewers returns the explicit reviewer", () => { expect(resolveReviewers("claude", "codex")).toEqual(["claude"]); expect(resolveReviewers("codex", "claude")).toEqual(["codex"]); }); + +test("runReview approves only when final line is a valid pass signal", async () => { + const runReview = createRunReview(() => + Promise.resolve(makeRunResult({ parsed: `x\n${REVIEW_PASS}\n\n` })) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); +}); + +test("runReview accepts quoted final signal and ignores non-final review lines", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ parsed: `some context\n"${REVIEW_PASS}"\n\n ` }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); +}); + +test("runReview accepts final signal with surrounding whitespace", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ parsed: `x\n \t${REVIEW_PASS} \n\n \n` }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); +}); + +test("runReview accepts final signal from combined output with trailing blank lines", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ + parsed: "x", + combined: `\n ${REVIEW_PASS}\n\n \t\n`, + }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); +}); + +test("runReview rejects output with missing final review signal", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ parsed: "Needs one more fix.\nAdditional notes follow." }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("missing a valid final review signal"); +}); + +test("runReview accepts quoted final fail with whitespace-only body", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ + parsed: `\n \n"${REVIEW_FAIL}"\n \t \n`, + combined: "", + }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result).toEqual({ + approved: false, + consensusFail: false, + failureCount: 1, + notes: + '[codex] Reviewer requested more work. (Expected "PASS" or "FAIL" in output.)', + }); +}); + +test("runReview rejects trailing token on quoted final signal", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ parsed: `please add tests\n"${REVIEW_PASS}" extra` }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("final review signal"); +}); + +test("runReview accepts final signal when parsed is empty and combined contains body", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ + parsed: "", + combined: `Checks passed.\n${REVIEW_PASS}\n`, + }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result).toEqual({ + approved: true, + consensusFail: false, + failureCount: 0, + notes: "", + }); +}); + +test("runReview rejects output with quoted final fail plus non-final pass", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ parsed: `"${REVIEW_FAIL}"\n${REVIEW_PASS}` }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("both"); +}); + +test("runReview rejects output with final pass and earlier fail", async () => { + const runReview = createRunReview(() => + Promise.resolve(makeRunResult({ parsed: `${REVIEW_FAIL}\n${REVIEW_PASS}` })) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("both"); +}); + +test("runReview rejects malformed final signal text", async () => { + const runReview = createRunReview(() => + Promise.resolve(makeRunResult({ parsed: `${REVIEW_PASS} please` })) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("final review signal"); +}); + +test("runReview rejects output with extra non-empty content after final signal", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ parsed: `${REVIEW_FAIL}\naddressed later` }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("final review signal"); +}); + +test("runReview rejects non-final or malformed review signals", async () => { + const runReview = createRunReview(() => + Promise.resolve(makeRunResult({ parsed: `${REVIEW_PASS} with notes` })) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("final review signal"); +}); + +test("runReview requires deterministic output when both pass and fail are present", async () => { + const runReview = createRunReview(() => + Promise.resolve(makeRunResult({ parsed: `${REVIEW_PASS}\n${REVIEW_FAIL}` })) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("both"); +}); + +test("runReview extracts fail reasons from body before final signal", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ parsed: `Fix this.\nAnd this.\n${REVIEW_FAIL}` }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("Fix this."); + expect(result.notes).toContain("And this."); +}); + +test("runReview keeps deterministic output when both pass and fail are present", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ parsed: `${REVIEW_PASS}\n${REVIEW_FAIL}\n` }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("both"); +}); + +test("runReview keeps deterministic output when one pass and one fail in output", async () => { + const runReview = createRunReview(() => + Promise.resolve( + makeRunResult({ + parsed: `${REVIEW_PASS}\nSome info\n${REVIEW_FAIL}\n`, + }) + ) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(1); + expect(result.notes).toContain("both"); +}); + +test("runReview keeps deterministic note ordering regardless completion order", async () => { + const runReview = createRunReview(async (reviewer) => { + if (reviewer === "codex") { + await new Promise((resolve) => setTimeout(resolve, 20)); + return makeRunResult({ parsed: `codex review failed.\n${REVIEW_FAIL}` }); + } + await new Promise((resolve) => setTimeout(resolve, 1)); + return makeRunResult({ parsed: `claude review failed.\n${REVIEW_FAIL}` }); + }); + + const result = await runReview(["codex", "claude"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.failureCount).toBe(2); + expect(result.notes).toBe( + '[codex] codex review failed. (Expected "PASS" or "FAIL" in output.)\n\n[claude] claude review failed. (Expected "PASS" or "FAIL" in output.)' + ); +}); + +test("runReview handles non-zero exit code as deterministic reviewer failure", async () => { + const runReview = createRunReview(() => + Promise.resolve(makeRunResult({ exitCode: 3, parsed: "", combined: "x" })) + ); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result).toEqual({ + approved: false, + consensusFail: false, + failureCount: 1, + notes: + '[codex] [loop] review exited with code 3 (Expected "PASS" or "FAIL" in output.)', + }); +}); + +test("runReview handles reviewer runtime failures", async () => { + const runReview = createRunReview(() => Promise.reject("network glitch")); + const result = await runReview(["codex"], "task", makeOptions()); + + expect(result).toEqual({ + approved: false, + consensusFail: false, + failureCount: 1, + notes: "[codex] [loop] review codex failed: network glitch", + }); +}); + +test("runReview marks consensus failure when all reviewers fail", async () => { + const runReview = createRunReview(() => + Promise.resolve(makeRunResult({ parsed: REVIEW_FAIL })) + ); + const result = await runReview(["claude", "codex"], "task", makeOptions()); + + expect(result.approved).toBe(false); + expect(result.consensusFail).toBe(true); + expect(result.failureCount).toBe(2); + expect(result.notes).toContain("[claude]"); +});