diff --git a/README.md b/README.md index 9051798..95a5bc1 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,11 @@ This _is not_ an "agent harness" and the goal isn't to re-invent the wheel: `loo - Create a GitHub fine-grained personal access token - Once you are done, take a snapshot of your "golden image" (e.g. `lume clone`) - Now you can even set up Tailscale to SSH remotely to your sandbox +- By default, `loop` uses the Codex App Server transport (`codex app-server`) to keep a single session alive. +- Set `CODEX_TRANSPORT=exec` to force the legacy `codex exec --json` path. + - Use `exec` when app-server compatibility problems are expected; `loop` will print: + `[loop] codex app-server transport failed. Falling back to \`codex exec --json\`.` once per session, and continue with the legacy path. + - To permanently force legacy mode for all runs, keep `CODEX_TRANSPORT=exec` set in your environment. ## Requirements diff --git a/src/loop.ts b/src/loop.ts index bea4f7b..c7d463b 100644 --- a/src/loop.ts +++ b/src/loop.ts @@ -1,30 +1,35 @@ #!/usr/bin/env bun +import { closeAppServer } from "./loop/codex-app-server"; import { cliDeps } from "./loop/deps"; import { updateDeps } from "./loop/update-deps"; const TMUX_DETACH_HINT = "[loop] detach with Ctrl-b d"; export const runCli = async (argv: string[]): Promise => { - await updateDeps.applyStagedUpdateOnStartup(); - if (await updateDeps.handleManualUpdateCommand(argv)) { - return; - } - updateDeps.startAutoUpdateCheck(); + try { + await updateDeps.applyStagedUpdateOnStartup(); + if (await updateDeps.handleManualUpdateCommand(argv)) { + return; + } + updateDeps.startAutoUpdateCheck(); - if (process.env.TMUX) { - console.log(TMUX_DETACH_HINT); - } - if (argv.length === 0) { - await cliDeps.runPanel(); - return; - } - const opts = cliDeps.parseArgs(argv); - if (opts.tmux && cliDeps.runInTmux(argv)) { - return; + if (process.env.TMUX) { + console.log(TMUX_DETACH_HINT); + } + if (argv.length === 0) { + await cliDeps.runPanel(); + return; + } + const opts = cliDeps.parseArgs(argv); + if (opts.tmux && cliDeps.runInTmux(argv)) { + return; + } + await cliDeps.maybeEnterWorktree(opts); + const task = await cliDeps.resolveTask(opts); + await cliDeps.runLoop(task, opts); + } finally { + await closeAppServer(); } - await cliDeps.maybeEnterWorktree(opts); - const task = await cliDeps.resolveTask(opts); - await cliDeps.runLoop(task, opts); }; const main = async (): Promise => { diff --git a/src/loop/codex-app-server.ts b/src/loop/codex-app-server.ts new file mode 100644 index 0000000..69e0424 --- /dev/null +++ b/src/loop/codex-app-server.ts @@ -0,0 +1,794 @@ +import { spawn } from "bun"; +import type { Options, RunResult } from "./types"; + +type ExitSignal = "SIGINT" | "SIGTERM"; +type TransportMode = "app-server" | "exec"; +type Callback = (text: string) => void; + +interface JsonFrame { + error?: unknown; + id?: unknown; + method?: string; + params?: unknown; + result?: unknown; +} + +interface PendingRequest { + method: string; + reject: (error: Error) => void; + resolve: (value: unknown) => void; + timeout: ReturnType; +} + +interface TurnState { + combined: string; + lastChunk: string; + onParsed: Callback; + onRaw: Callback; + parsed: string; + reject: (error: Error) => void; + resolve: (result: RunResult) => void; +} + +const APP_SERVER_CMD = "codex"; +const APP_SERVER_ARGS = ["app-server"]; +const USER_INPUT_TEXT_ELEMENTS = "text_elements"; +const WAIT_TIMEOUT_MS = 30_000; + +export const CODEX_TRANSPORT_APP_SERVER: TransportMode = "app-server"; +export const CODEX_TRANSPORT_EXEC: TransportMode = "exec"; +export const CODEX_TRANSPORT_ENV = "CODEX_TRANSPORT"; +export const DEFAULT_CODEX_TRANSPORT: TransportMode = + CODEX_TRANSPORT_APP_SERVER; + +const METHOD_INITIALIZE = "initialize"; +const METHOD_THREAD_START = "thread/start"; +const METHOD_TURN_START = "turn/start"; +const METHOD_TURN_COMPLETED = "turn/completed"; +const METHOD_ERROR = "error"; +const METHOD_ITEM_COMPLETED = "item/completed"; +const METHOD_ITEM_DELTA = "item/agentMessage/delta"; +const METHOD_COMMAND_APPROVAL = "item/commandExecution/requestApproval"; +const METHOD_FILE_CHANGE_APPROVAL = "item/fileChange/requestApproval"; +const METHOD_TOOL_USER_INPUT = "item/tool/requestUserInput"; +const METHOD_TOOL_CALL = "item/tool/call"; +const METHOD_APPLY_PATCH_APPROVAL = "applyPatchApproval"; +const METHOD_EXEC_COMMAND_APPROVAL = "execCommandApproval"; +const METHOD_AUTH_REFRESH = "account/chatgptAuthTokens/refresh"; +const METHODS_TRIGGERING_FALLBACK = new Set([ + METHOD_INITIALIZE, + METHOD_THREAD_START, + METHOD_TURN_START, +]); + +type SpawnFn = (...args: Parameters) => ReturnType; +let spawnFn: SpawnFn = spawn; + +const isString = (value: unknown): value is string => + typeof value === "string" && value.length > 0; +const asString = (value: unknown): string | undefined => + isString(value) ? value : undefined; +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null; +const asRecord = (value: unknown): Record => + isRecord(value) ? value : {}; +const asRequestId = (value: unknown): string | undefined => { + if (typeof value === "number" && Number.isInteger(value)) { + return String(value); + } + return isString(value) ? value : undefined; +}; + +const parseLine = (line: string): JsonFrame | undefined => { + const trimmed = line.trim(); + if (!trimmed.startsWith("{")) { + return undefined; + } + try { + const parsed = JSON.parse(trimmed); + return isRecord(parsed) ? (parsed as JsonFrame) : undefined; + } catch { + return undefined; + } +}; + +const collectText = (value: unknown, out: string[]): void => { + if (isString(value)) { + out.push(value); + return; + } + if (Array.isArray(value)) { + for (const item of value) { + collectText(item, out); + } + return; + } + if (!isRecord(value)) { + return; + } + const record = asRecord(value); + const direct = asString(record.text) || asString(record.delta); + if (direct) { + out.push(direct); + } + collectText(record.content, out); + collectText(record.item, out); + collectText(record.payload, out); +}; + +const parseText = (value: unknown): string | undefined => { + const parts: string[] = []; + collectText(value, parts); + return parts.length > 0 + ? parts + .map((part) => part.trim()) + .filter(Boolean) + .join("\n") + : undefined; +}; + +const isUnsupportedTransportError = (error: unknown): boolean => { + const message = parseErrorText(error)?.toLowerCase(); + return ( + !!message && + (message.includes("unsupported") || + message.includes("method not found") || + message.includes("unknown method") || + message.includes("unsupported transport")) + ); +}; + +const parseErrorText = (value: unknown): string | undefined => { + const record = asRecord(value); + const errorRecord = asRecord(record.error); + const turn = asRecord(record.turn); + const turnError = asRecord(turn.error); + return ( + asString(errorRecord.message) || + asString(record.message) || + asString(record.reason) || + asString(turnError.message) + ); +}; + +const extractTurnId = (value: unknown): string | undefined => { + const record = asRecord(value); + const fromValue = asString(record.turnId) ?? asString(record.turn_id); + if (fromValue) { + return fromValue; + } + const turn = asRecord(record.turn); + return asString(turn.id); +}; + +const extractThreadId = (value: unknown): string | undefined => { + const record = asRecord(value); + return asString(record.threadId) ?? asString(record.thread_id); +}; + +const extractThreadFromTurnStart = (value: unknown): { id?: string } => { + const record = asRecord(value); + const thread = asRecord(record.thread); + return { id: asString(thread.id) }; +}; + +const extractTurnFromStart = (value: unknown): { id?: string } => { + const record = asRecord(value); + const turn = asRecord(record.turn); + return { id: asString(turn.id) || asString(record.id) }; +}; + +const buildInput = (prompt: string): Record[] => [ + { + type: "text", + text: prompt, + [USER_INPUT_TEXT_ELEMENTS]: [], + }, +]; + +const toError = (value: Error | unknown): Error => + value instanceof Error ? value : new Error(String(value)); + +export class CodexAppServerFallbackError extends Error {} + +export const codexAppServerInternals = { + parseLine, + parseText, + extractTurnId, + extractThreadId, + parseErrorText, + setSpawnFn: (next: SpawnFn): void => { + spawnFn = next; + }, + restoreSpawnFn: (): void => { + spawnFn = spawn; + }, +}; + +class AppServerClient { + private child: ReturnType | undefined; + private closed = false; + private started = false; + private ready = false; + private requestId = 1; + private threadId: string | undefined; + private readonly pending = new Map(); + private readonly turns = new Map(); + private lock: Promise = Promise.resolve(); + + get process(): ReturnType | undefined { + return this.child; + } + + hasProcess(): boolean { + return this.child !== undefined; + } + + async start(): Promise { + if (this.started) { + return; + } + this.started = true; + try { + const child = spawnFn([APP_SERVER_CMD, ...APP_SERVER_ARGS], { + env: process.env, + stderr: "pipe", + stdin: "pipe", + stdout: "pipe", + }); + this.child = child; + this.consumeFrames(child).finally(() => { + if (!this.closed) { + this.handleUnexpectedExit(); + } + }); + await this.sendRequest(METHOD_INITIALIZE, { + clientInfo: { + name: "loop", + title: "loop", + version: "1.0.3", + }, + capabilities: { experimentalApi: true }, + }); + this.ready = true; + } catch (error) { + if (this.child) { + this.child.kill("SIGTERM"); + this.child = undefined; + } + this.ready = false; + this.started = false; + this.threadId = undefined; + throw new CodexAppServerFallbackError( + toError(error).message || "failed to start codex app-server" + ); + } + } + + runTurn( + prompt: string, + opts: Options, + onParsed: Callback, + onRaw: Callback + ): Promise { + const task = this.lock.then(() => + this.runTurnExclusive(prompt, opts, onParsed, onRaw) + ); + this.lock = task.then( + () => undefined, + () => undefined + ); + return task; + } + + interrupt(signal: ExitSignal): void { + this.child?.kill(signal); + } + + async close(): Promise { + this.closed = true; + this.failAll(new Error("codex app-server closed")); + if (!this.child) { + this.started = false; + this.ready = false; + return; + } + this.child.kill("SIGTERM"); + await this.child.exited; + this.child = undefined; + this.ready = false; + this.started = false; + } + + private async ensureThread(): Promise { + if (this.threadId) { + return this.threadId; + } + const response = await this.sendRequest(METHOD_THREAD_START, { + model: null, + approvalPolicy: "never", + experimentalRawEvents: true, + persistExtendedHistory: true, + }); + const thread = extractThreadFromTurnStart(response); + if (!thread.id) { + throw new CodexAppServerFallbackError( + "codex app-server returned thread/start without thread id" + ); + } + this.threadId = thread.id; + return thread.id; + } + + private async runTurnExclusive( + prompt: string, + opts: Options, + onParsed: Callback, + onRaw: Callback + ): Promise { + if (!(this.child && this.ready)) { + await this.start(); + } + if (!this.child) { + throw new CodexAppServerFallbackError("codex app-server not running"); + } + + const threadId = await this.ensureThread(); + const response = await this.sendRequest(METHOD_TURN_START, { + threadId, + input: buildInput(prompt), + model: opts.model, + effort: null, + cwd: null, + }); + const turn = extractTurnFromStart(response); + if (!turn.id) { + throw new CodexAppServerFallbackError( + "codex app-server returned turn/start without turn id" + ); + } + + const turnId = turn.id; + return new Promise((resolve, reject) => { + const state: TurnState = { + combined: "", + lastChunk: "", + onParsed, + onRaw, + parsed: "", + reject, + resolve, + }; + this.turns.set(turnId, state); + const timeout = setTimeout(() => { + if (this.turns.delete(turnId)) { + state.reject(new Error(`codex app-server turn ${turnId} timed out`)); + } + }, WAIT_TIMEOUT_MS); + const clear = () => clearTimeout(timeout); + state.resolve = (result) => { + clear(); + resolve(result); + }; + state.reject = (error) => { + clear(); + reject(error); + }; + }); + } + + private async consumeFrames(proc: ReturnType): Promise { + await Promise.all([ + this.consumeStream(proc.stdout, this.handleStdoutLine), + this.consumeStream(proc.stderr, this.handleStdErrLine), + ]); + } + + private async consumeStream( + stream: ReadableStream, + handler: (line: string) => void + ): Promise { + const decoder = new TextDecoder(); + let buffer = ""; + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + if (!value) { + continue; + } + buffer += decoder.decode(value, { stream: true }); + let index = buffer.indexOf("\n"); + while (index !== -1) { + const line = buffer.slice(0, index); + handler(line); + buffer = buffer.slice(index + 1); + index = buffer.indexOf("\n"); + } + } + if (buffer.trim()) { + handler(buffer.trim()); + } + } finally { + reader.releaseLock(); + } + } + + private readonly handleStdoutLine = (line: string): void => { + for (const turn of this.turns.values()) { + turn.combined += `${line}\n`; + turn.onRaw(line); + } + const frame = parseLine(line); + if (!frame) { + return; + } + this.handleFrame(frame); + }; + + private readonly handleStdErrLine = (line: string): void => { + for (const turn of this.turns.values()) { + turn.combined += `${line}\n`; + turn.onRaw(line); + } + }; + + private selectTurnState( + payload: Record + ): TurnState | undefined { + const turnId = extractTurnId(payload) || extractThreadId(payload); + if (turnId) { + const byTurn = this.turns.get(turnId); + if (byTurn) { + return byTurn; + } + } + return this.turns.size === 1 ? [...this.turns.values()][0] : undefined; + } + + private handleFrame(frame: JsonFrame): void { + const method = asString(frame.method); + const requestId = asRequestId(frame.id); + + if (requestId && method) { + this.handleServerRequest(requestId, method); + return; + } + if (requestId) { + this.handleResponse(requestId, frame.result, frame.error); + return; + } + if (method) { + this.handleNotification(method, asRecord(frame.params)); + } + } + + private handleResponse( + requestId: string, + result: unknown, + err: unknown + ): void { + const request = this.pending.get(requestId); + if (!request) { + return; + } + clearTimeout(request.timeout); + this.pending.delete(requestId); + if (err !== undefined) { + const message = + parseErrorText(err) || + parseErrorText({ error: err }) || + `codex app-server request "${request.method}" failed`; + const shouldFallback = + isUnsupportedTransportError(err) && + METHODS_TRIGGERING_FALLBACK.has(request.method); + if (shouldFallback) { + request.reject(new CodexAppServerFallbackError(message)); + } else { + request.reject(new Error(message)); + } + return; + } + request.resolve(result); + } + + private handleServerRequest(requestId: string, method: string): void { + if ( + method === METHOD_COMMAND_APPROVAL || + method === METHOD_FILE_CHANGE_APPROVAL + ) { + this.sendResponse(requestId, { decision: "accept" }, undefined); + return; + } + if (method === METHOD_TOOL_USER_INPUT) { + this.sendResponse(requestId, { answers: {} }, undefined); + return; + } + if ( + method === METHOD_TOOL_CALL || + method === METHOD_APPLY_PATCH_APPROVAL || + method === METHOD_EXEC_COMMAND_APPROVAL || + method === METHOD_AUTH_REFRESH + ) { + this.sendResponse(requestId, undefined, { + code: -32_601, + message: "request unsupported by loop runner", + }); + return; + } + + this.sendResponse(requestId, undefined, { + code: -32_601, + message: `unsupported request method ${method}`, + }); + } + + private handleNotification( + method: string, + params: Record + ): void { + const state = this.selectTurnState(params); + switch (method) { + case METHOD_ITEM_DELTA: + this.handleItemDeltaNotification(state, params); + return; + case METHOD_ITEM_COMPLETED: + this.handleItemCompletedNotification(state, params); + return; + case METHOD_ERROR: + this.handleErrorNotification(state, params); + return; + case METHOD_TURN_COMPLETED: + this.handleTurnCompletedNotification(state, params); + return; + default: + return; + } + } + + private handleItemDeltaNotification( + state: TurnState | undefined, + params: Record + ): void { + if (!state) { + return; + } + const chunk = parseText(params.delta || params); + if (!chunk) { + return; + } + const text = chunk.trim(); + if (!text || text === state.lastChunk) { + return; + } + state.lastChunk = text; + state.parsed = `${state.parsed ? `${state.parsed}\n` : ""}${text}`; + state.onParsed(text); + } + + private handleItemCompletedNotification( + state: TurnState | undefined, + params: Record + ): void { + if (!state) { + return; + } + const item = asRecord(params.item); + const itemType = asString(item.type); + if (itemType !== "agentMessage" && itemType !== "agent_message") { + return; + } + const text = parseText(item); + if (!text) { + return; + } + const candidate = text.trim(); + if (!candidate || candidate === state.lastChunk) { + return; + } + state.lastChunk = candidate; + state.parsed = `${state.parsed ? `${state.parsed}\n` : ""}${candidate}`; + state.onParsed(candidate); + } + + private handleErrorNotification( + state: TurnState | undefined, + params: Record + ): void { + const turnId = extractTurnId(params) || extractThreadId(params); + if (state) { + for (const [turnId, current] of this.turns.entries()) { + if (current === state) { + this.turns.delete(turnId); + break; + } + } + state.reject(new Error(parseErrorText(params) || "codex turn failed")); + return; + } + if (turnId && this.turns.has(turnId)) { + const activeState = this.turns.get(turnId); + if (activeState) { + this.turns.delete(turnId); + activeState.reject( + new Error(parseErrorText(params) || `turn ${turnId} failed`) + ); + } + return; + } + if (this.turns.size !== 1) { + return; + } + const [first] = this.turns.values(); + this.turns.clear(); + first?.reject(new Error(parseErrorText(params) || "codex turn failed")); + } + + private handleTurnCompletedNotification( + state: TurnState | undefined, + params: Record + ): void { + if (!state && this.turns.size === 1) { + const first = [...this.turns.values()][0]; + if (!first) { + return; + } + this.resolveTurnState(first, params); + return; + } + if (!state) { + return; + } + this.resolveTurnState(state, params); + } + + private resolveTurnState( + state: TurnState, + params: Record + ): void { + const turn = asRecord(params.turn); + const status = asString(params.status) ?? asString(turn.status); + const exitCode = status === "failed" ? 1 : 0; + const turnId = extractTurnId(params) || asString(turn.id); + + if (turnId && this.turns.has(turnId)) { + this.turns.delete(turnId); + } else { + for (const [key, value] of this.turns.entries()) { + if (value === state) { + this.turns.delete(key); + break; + } + } + } + + if (exitCode === 1) { + const message = + parseErrorText(params) || parseErrorText(turn) || "codex turn failed"; + const nextParsed = message + ? `${state.parsed ? `${state.parsed}\n` : ""}${message}` + : state.parsed; + state.resolve({ + combined: state.combined, + exitCode, + parsed: nextParsed, + }); + return; + } + + state.resolve({ + combined: state.combined, + exitCode: 0, + parsed: state.parsed, + }); + } + + private sendRequest(method: string, params: unknown): Promise { + if (!this.child || this.closed) { + return Promise.reject( + new CodexAppServerFallbackError("codex app-server not initialized") + ); + } + const requestId = String(this.requestId++); + const payload: Record = { + id: requestId, + method, + params, + }; + try { + this.child.stdin.write(`${JSON.stringify(payload)}\n`); + } catch (error) { + throw new CodexAppServerFallbackError( + `codex app-server request "${method}" failed to write: ${ + toError(error).message + }` + ); + } + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + this.pending.delete(requestId); + reject(new Error(`codex app-server request "${method}" timed out`)); + }, WAIT_TIMEOUT_MS); + this.pending.set(requestId, { method, resolve, reject, timeout }); + }); + } + + private sendResponse( + requestId: string, + result: unknown, + error: unknown + ): void { + if (!this.child) { + return; + } + const payload = + error === undefined + ? { id: requestId, result, jsonrpc: "2.0" } + : { id: requestId, error, jsonrpc: "2.0" }; + this.child.stdin.write(`${JSON.stringify(payload)}\n`); + } + + private failAll(error: Error): void { + for (const state of this.turns.values()) { + state.reject(error); + } + this.turns.clear(); + for (const request of this.pending.values()) { + clearTimeout(request.timeout); + request.reject(error); + } + this.pending.clear(); + } + + private handleUnexpectedExit(): void { + this.child = undefined; + this.started = false; + this.ready = false; + this.threadId = undefined; + this.failAll( + new CodexAppServerFallbackError("codex app-server exited unexpectedly") + ); + } +} + +let singleton: AppServerClient | undefined; + +const getClient = (): AppServerClient => { + if (!singleton) { + singleton = new AppServerClient(); + } + return singleton; +}; + +export const useAppServer = (): boolean => + process.env[CODEX_TRANSPORT_ENV] !== CODEX_TRANSPORT_EXEC; + +export const startAppServer = async (): Promise => { + await getClient().start(); +}; + +export const runCodexTurn = ( + prompt: string, + opts: Options, + callbacks: { onParsed: Callback; onRaw: Callback } +): Promise => { + return getClient().runTurn(prompt, opts, callbacks.onParsed, callbacks.onRaw); +}; + +export const interruptAppServer = (signal: ExitSignal): void => { + getClient().interrupt(signal); +}; + +export const hasAppServerProcess = (): boolean => getClient().hasProcess(); + +export const closeAppServer = async (): Promise => { + if (!singleton) { + return; + } + await singleton.close(); + singleton = undefined; +}; diff --git a/src/loop/constants.ts b/src/loop/constants.ts index f64206e..495d5eb 100644 --- a/src/loop/constants.ts +++ b/src/loop/constants.ts @@ -20,6 +20,7 @@ Options: -p, --prompt Prompt text or path to a .md prompt file -m, --max-iterations . Max loops (default: infinite) -d, --done Done signal (default: DONE) + CODEX_TRANSPORT=app-server|exec Codex transport mode (default: app-server) --proof Proof requirements for task completion (required) --format Log format (default: pretty) --review [claude|codex|claudex] Review on done (default: claudex) diff --git a/src/loop/main.ts b/src/loop/main.ts index e1d89b3..2438ad3 100644 --- a/src/loop/main.ts +++ b/src/loop/main.ts @@ -13,6 +13,7 @@ const runIterations = async ( hasExistingPr = false ): Promise => { let reviewNotes = ""; + const stopMessage = `\n[loop] done signal "${opts.doneSignal}" detected`; console.log(`\n[loop] PLAN.md:\n\n${task}`); for (let i = 1; i <= opts.maxIterations; i++) { const tag = Number.isFinite(opts.maxIterations) @@ -32,22 +33,17 @@ const runIterations = async ( `[loop] ${opts.agent} exited with code ${result.exitCode}` ); } - const output = `${result.parsed}\n${result.combined}`; - if (!hasSignal(output, opts.doneSignal)) { + if (!hasSignal(`${result.parsed}\n${result.combined}`, opts.doneSignal)) { continue; } if (reviewers.length === 0) { - console.log( - `\n[loop] done signal "${opts.doneSignal}" detected, stopping.` - ); + console.log(`${stopMessage}, stopping.`); return true; } const review = await runReview(reviewers, task, opts); if (review.approved) { await runDraftPrStep(task, opts, hasExistingPr); - console.log( - `\n[loop] done signal "${opts.doneSignal}" detected and review passed, stopping.` - ); + console.log(`${stopMessage} and review passed, stopping.`); return true; } if (review.consensusFail) { diff --git a/src/loop/panel.ts b/src/loop/panel.ts index ce94ad5..147169a 100644 --- a/src/loop/panel.ts +++ b/src/loop/panel.ts @@ -71,6 +71,67 @@ const TRAILING_CR_RE = /\r$/; const TRAILING_LINE_BREAKS_RE = /[\r\n]+$/g; const LOOP_SESSION_RE = /-loop-(\d+)$/; +const isNodeOrBunToken = (token: string): boolean => + token === "node" || + token === "bun" || + token.endsWith("/node") || + token.endsWith("/bun"); + +const isCodexBinaryToken = (token: string): boolean => + token === "codex" || + token.includes("codex@") || + token.endsWith("/codex") || + token.endsWith("/bin/codex") || + token.endsWith("/codex.js") || + token.endsWith("/codex.mjs") || + token.endsWith("/codex-app-server") || + token.endsWith("/codex-app-server.js") || + token.includes("/openai/codex/") || + token.includes("/node_modules/@openai/codex") || + token.includes("/@openai/codex"); + +const isCodexAppServerToken = (token: string): boolean => + token === "app-server" || + token.includes("app-server/") || + token.endsWith("/app-server") || + token.endsWith("/app-server.js") || + token.endsWith("/app-server.mjs") || + token.includes("/codex-app-server"); + +const tokenizeCommand = (command: string): string[] => + command.split(TOKEN_RE).filter(Boolean); + +const commandBinaryTokens = (tokens: string[]): string[] => { + const first = tokens[0] ?? ""; + if (!isNodeOrBunToken(first)) { + return [first]; + } + + return [first, ...tokens.slice(1).filter((token) => !token.startsWith("-"))]; +}; + +const isCodexAppServerProcess = (tokens: string[]): boolean => { + const binaryTokens = commandBinaryTokens(tokens); + const hasAppServer = tokens.some(isCodexAppServerToken); + const hasCodexBinary = + binaryTokens.some(isCodexBinaryToken) || + binaryTokens.some(isCodexAppServerToken); + return hasAppServer && hasCodexBinary; +}; + +const isCodexAppServerWrapper = (tokens: string[]): boolean => { + const appServerIndex = tokens.findIndex(isCodexAppServerToken); + if (appServerIndex <= 0) { + return false; + } + return tokens + .slice(0, appServerIndex) + .some((token) => isCodexBinaryToken(token) || isNodeOrBunToken(token)); +}; + +const isCodexEngine = (command: string, tokens: string[]): boolean => + command.includes("/codex/codex") || isCodexAppServerProcess(tokens); + const sleep = async (ms: number): Promise => await new Promise((resolve) => setTimeout(resolve, ms)); @@ -108,30 +169,37 @@ const parseProcessList = (text: string): AgentProcess[] => { const codexEngineParents = new Set(); for (const row of rows) { - if (row.command.includes("/codex/codex")) { + if (isCodexEngine(row.command, tokenizeCommand(row.command))) { codexEngineParents.add(row.ppid); } } const processes: AgentProcess[] = []; + const codexEngineParentPids = new Set(); + + for (const row of rows) { + const tokens = tokenizeCommand(row.command); + if (isCodexEngine(row.command, tokens)) { + codexEngineParentPids.add(row.ppid); + } + } + for (const row of rows) { const command = row.command.trim(); - const firstToken = command.split(TOKEN_RE)[0] ?? ""; + const tokens = tokenizeCommand(command); + const firstToken = tokens[0] ?? ""; const isClaude = firstToken === "claude" || firstToken.endsWith("/claude"); - const isCodexEngine = command.includes("/codex/codex"); - const isCodexWrapper = - command.includes("/bin/codex") || - firstToken === "codex" || - firstToken.endsWith("/codex"); - if (isClaude) { - processes.push({ agent: "claude", pid: row.pid }); + const isCodex = isCodexEngine(command, tokens); + const isCodexWrapper = isCodexAppServerWrapper(tokens); + + if (isCodexWrapper && codexEngineParentPids.has(row.pid)) { continue; } - if (isCodexEngine) { - processes.push({ agent: "codex", pid: row.pid }); + if (isClaude) { + processes.push({ agent: "claude", pid: row.pid }); continue; } - if (isCodexWrapper && !codexEngineParents.has(row.pid)) { + if (isCodex) { processes.push({ agent: "codex", pid: row.pid }); } } @@ -747,12 +815,21 @@ const parseCodexHistoryRow = (path: string): DoneRow | undefined => { return undefined; } const first = parseObject(readFirstLine(path)); - const payload = + const firstPayload = typeof first?.payload === "object" && first.payload !== null ? (first.payload as Record) : undefined; - const session = str(payload, "id") || basename(path, ".jsonl"); - const cwd = str(payload, "cwd") || "-"; + const payload = + typeof firstPayload === "object" && firstPayload !== null + ? firstPayload + : undefined; + const session = + str(payload, "id") || + str(first, "id") || + str(first, "session") || + str(first, "sessionId") || + basename(path, ".jsonl"); + const cwd = str(payload, "cwd") || str(first, "cwd") || "-"; const last = parseObject(readLastLine(path)); const iso = str(last, "timestamp"); @@ -761,7 +838,7 @@ const parseCodexHistoryRow = (path: string): DoneRow | undefined => { ? (last.payload as Record) : undefined; const payloadType = str(lastPayload, "type"); - const eventBase = str(last, "type") || "unknown"; + const eventBase = str(last, "type") || str(lastPayload, "type") || "unknown"; const event = payloadType ? `${eventBase}/${payloadType}` : eventBase; const endedAtMs = Number.isFinite(parseTimestampMs(iso)) ? parseTimestampMs(iso) @@ -1101,4 +1178,5 @@ export const panelInternals = { projectKeyFromCwd, reconcileDoneRows, rowId, + parseCodexHistoryRow, }; diff --git a/src/loop/runner.ts b/src/loop/runner.ts index ce577fb..ed5ce93 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -1,8 +1,31 @@ import { spawn } from "bun"; +import { + CODEX_TRANSPORT_ENV, + CODEX_TRANSPORT_EXEC, + CodexAppServerFallbackError, + hasAppServerProcess, + interruptAppServer, + runCodexTurn, + startAppServer, + useAppServer, +} from "./codex-app-server"; import { DEFAULT_CLAUDE_MODEL, DEFAULT_CODEX_MODEL } from "./constants"; import type { Agent, Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; +interface SpawnConfig { + args: string[]; + cmd: string; +} +type LegacyAgentRunner = ( + agent: Agent, + prompt: string, + opts: Options +) => Promise; +interface RunnerState { + runLegacyAgent: LegacyAgentRunner; + useAppServer: () => boolean; +} const SIGNAL_EXIT_CODES: Record = { SIGINT: 130, @@ -10,7 +33,13 @@ const SIGNAL_EXIT_CODES: Record = { }; const activeChildren = new Set>(); +let activeAppServerRuns = 0; let watchingSignals = false; +let fallbackWarned = false; +const runnerState: RunnerState = { + runLegacyAgent: (agent, prompt, opts) => runLegacyAgent(agent, prompt, opts), + useAppServer: () => useAppServer(), +}; const killChildren = (signal: ExitSignal): void => { for (const child of activeChildren) { @@ -20,23 +49,28 @@ const killChildren = (signal: ExitSignal): void => { const onSigint = (): void => { killChildren("SIGINT"); + interruptAppServer("SIGINT"); process.exit(SIGNAL_EXIT_CODES.SIGINT); }; const onSigterm = (): void => { killChildren("SIGTERM"); + interruptAppServer("SIGTERM"); process.exit(SIGNAL_EXIT_CODES.SIGTERM); }; const syncSignalHandlers = (): void => { - if (activeChildren.size > 0 && !watchingSignals) { + const hasAppServerWork = hasAppServerProcess(); + const hasWork = + activeChildren.size > 0 || activeAppServerRuns > 0 || hasAppServerWork; + if (hasWork && !watchingSignals) { process.on("SIGINT", onSigint); process.on("SIGTERM", onSigterm); watchingSignals = true; return; } - if (activeChildren.size === 0 && watchingSignals) { + if (!hasWork && watchingSignals) { process.off("SIGINT", onSigint); process.off("SIGTERM", onSigterm); watchingSignals = false; @@ -47,7 +81,7 @@ const buildCommand = ( agent: Agent, prompt: string, model: string -): { args: string[]; cmd: string } => { +): SpawnConfig => { if (agent === "claude") { const claudeModel = model && model !== DEFAULT_CODEX_MODEL ? model : DEFAULT_CLAUDE_MODEL; @@ -156,7 +190,94 @@ const consume = async ( } }; -export const runAgent = async ( +const appendParsedLine = ( + text: string, + opts: Options, + state: { parsed: string; prettyCount: number; lastMessage: string } +): { parsed: string; prettyCount: number; lastMessage: string } => { + const trimmed = text.trim(); + if (!trimmed || (opts.format === "pretty" && trimmed === state.lastMessage)) { + return state; + } + if (opts.format === "pretty") { + if (state.prettyCount > 0) { + process.stdout.write("\n"); + } + process.stdout.write(`${trimmed}\n`); + return { + lastMessage: trimmed, + prettyCount: state.prettyCount + 1, + parsed: `${state.parsed ? `${state.parsed}\n` : ""}${trimmed}`, + }; + } + + return { + ...state, + lastMessage: trimmed, + parsed: `${state.parsed ? `${state.parsed}\n` : ""}${trimmed}`, + }; +}; + +const runCodexAgent = async ( + prompt: string, + opts: Options +): Promise => { + if (!runnerState.useAppServer()) { + return runnerState.runLegacyAgent("codex", prompt, opts); + } + + let parsed = ""; + let state = { parsed: "", prettyCount: 0, lastMessage: "" }; + const onParsed = (text: string): void => { + state = appendParsedLine(text, opts, state); + parsed = state.parsed; + }; + const onRaw = (text: string): void => { + if (opts.format === "raw") { + process.stdout.write(`${text}\n`); + } + }; + + activeAppServerRuns += 1; + syncSignalHandlers(); + try { + try { + await startAppServer(); + } catch (error) { + if (process.env[CODEX_TRANSPORT_ENV] === CODEX_TRANSPORT_EXEC) { + throw error; + } + throw new CodexAppServerFallbackError( + error instanceof Error ? error.message : String(error) + ); + } + + const result = await runCodexTurn(prompt, opts, { + onParsed, + onRaw, + }); + return { ...result, parsed: result.parsed || parsed }; + } catch (error) { + if ( + process.env[CODEX_TRANSPORT_ENV] !== CODEX_TRANSPORT_EXEC && + error instanceof CodexAppServerFallbackError + ) { + if (!fallbackWarned) { + fallbackWarned = true; + console.error( + "[loop] codex app-server transport failed. Falling back to `codex exec --json`." + ); + } + return runnerState.runLegacyAgent("codex", prompt, opts); + } + throw error; + } finally { + activeAppServerRuns -= 1; + syncSignalHandlers(); + } +}; + +const runLegacyAgent = async ( agent: Agent, prompt: string, opts: Options @@ -174,38 +295,26 @@ export const runAgent = async ( let stderr = ""; let parsed = ""; let pending = ""; - let lastMessage = ""; - let prettyCount = 0; - - const writePretty = (text: string): void => { - if (opts.format !== "pretty") { - return; - } - if (!text.trim()) { - return; - } - if (prettyCount > 0) { - process.stdout.write("\n"); - } - process.stdout.write(`${text}\n`); - prettyCount++; - }; + let state = { parsed: "", prettyCount: 0, lastMessage: "" }; const onLine = (line: string): void => { const message = eventMessage(line); if (message) { - if (message === lastMessage) { - return; - } - - lastMessage = message; - parsed += `${parsed ? "\n" : ""}${message}`; - writePretty(message); + state = appendParsedLine(message, opts, state); + parsed = state.parsed; return; } - if (!line.trim().startsWith("{")) { - writePretty(line); + if ( + !line.trim().startsWith("{") && + opts.format === "pretty" && + line.trim() + ) { + if (state.prettyCount > 0) { + process.stdout.write("\n"); + } + process.stdout.write(`${line}\n`); + state.prettyCount += 1; } }; @@ -243,3 +352,33 @@ export const runAgent = async ( syncSignalHandlers(); } }; + +const defaultRunLegacyAgent: LegacyAgentRunner = ( + agent: Agent, + prompt: string, + opts: Options +): Promise => runLegacyAgent(agent, prompt, opts); + +export const runnerInternals = { + reset(): void { + runnerState.useAppServer = () => useAppServer(); + runnerState.runLegacyAgent = defaultRunLegacyAgent; + }, + setUseAppServer(next: () => boolean): void { + runnerState.useAppServer = next; + }, + setLegacyAgent(next: LegacyAgentRunner): void { + runnerState.runLegacyAgent = next; + }, +}; + +export const runAgent = ( + agent: Agent, + prompt: string, + opts: Options +): Promise => { + if (agent === "codex") { + return runCodexAgent(prompt, opts); + } + return runLegacyAgent(agent, prompt, opts); +}; diff --git a/tests/loop/codex-app-server.test.ts b/tests/loop/codex-app-server.test.ts new file mode 100644 index 0000000..c6a6655 --- /dev/null +++ b/tests/loop/codex-app-server.test.ts @@ -0,0 +1,620 @@ +import { afterEach, expect, test } from "bun:test"; +import { codexAppServerInternals } from "../../src/loop/codex-app-server"; +import type { Options } from "../../src/loop/types"; + +interface RequestFrame { + error?: unknown; + id?: unknown; + method?: unknown; + params?: unknown; + result?: unknown; +} + +type AppServerModule = typeof import("../../src/loop/codex-app-server"); + +interface TestStream { + close: () => void; + enqueue: (line: string) => void; + stream: ReadableStream; +} + +interface TestProcess { + close: () => void; + writes: string[]; +} + +type ResponseWriter = (frame: Record) => void; +type RequestHandler = (request: RequestFrame, write: ResponseWriter) => void; + +const noopRequestHandler: RequestHandler = () => { + // noop +}; + +const createStream = (): TestStream => { + let controller: ReadableStreamDefaultController | undefined; + const stream = new ReadableStream({ + start(c) { + controller = c; + }, + }); + return { + close: () => { + controller?.close(); + }, + enqueue: (line) => { + controller?.enqueue(new TextEncoder().encode(line)); + }, + stream, + }; +}; + +let modulePromise: Promise | undefined; +let moduleExports: AppServerModule | undefined; +let processes: TestProcess[] = []; +let currentHandler: RequestHandler = noopRequestHandler; + +const makeStreamResponse = ( + request: RequestFrame, + write: ResponseWriter +): void => { + currentHandler(request, write); +}; + +const installSpawn = (appServerModule: AppServerModule): void => { + appServerModule.codexAppServerInternals.setSpawnFn( + (_command: unknown, _options: unknown): unknown => { + const writes: string[] = []; + const stdout = createStream(); + const stderr = createStream(); + let exitedResolve = () => undefined; + const exited = new Promise((resolve) => { + exitedResolve = resolve; + }); + let closed = false; + + const close = (): void => { + if (closed) { + return; + } + closed = true; + stdout.close(); + stderr.close(); + exitedResolve(0); + }; + + const emit = (frame: Record): void => { + stdout.enqueue(`${JSON.stringify(frame)}\n`); + }; + + const child = { + exited, + kill: () => { + close(); + }, + stdin: { + write: (chunk: string): void => { + const lines = chunk.split("\n"); + for (const raw of lines) { + if (!raw.trim()) { + continue; + } + writes.push(raw); + const request = JSON.parse(raw) as RequestFrame; + makeStreamResponse(request, emit); + } + }, + }, + stderr: stderr.stream, + stdout: stdout.stream, + }; + processes.push({ close, writes }); + return child; + } + ); +}; + +const getModule = async (): Promise => { + if (!modulePromise) { + modulePromise = import("../../src/loop/codex-app-server"); + } + moduleExports = await modulePromise; + installSpawn(moduleExports); + return moduleExports; +}; + +const makeOptions = (): Options => ({ + agent: "codex", + doneSignal: "", + format: "raw", + maxIterations: 1, + model: "test-model", + proof: "proof", +}); + +const latestWrites = (): string[] => { + return processes.at(-1)?.writes ?? []; +}; + +const resetState = async (): Promise => { + const appServer = await getModule(); + await appServer.closeAppServer(); + for (const process of processes) { + process.close(); + } + moduleExports = undefined; + modulePromise = undefined; + processes = []; + currentHandler = noopRequestHandler; + appServer.codexAppServerInternals.restoreSpawnFn(); +}; + +afterEach(async () => { + await resetState(); +}); + +test("parseLine returns strict JSON frames only", () => { + expect(codexAppServerInternals.parseLine('{"ok":true}')).toEqual({ + ok: true, + }); + expect(codexAppServerInternals.parseLine("not-json")).toBeUndefined(); + expect(codexAppServerInternals.parseLine(' {"ok":1}\n')).toEqual({ ok: 1 }); +}); + +test("parseText flattens nested text payloads", () => { + expect( + codexAppServerInternals.parseText({ + content: [{ text: " one " }, { content: [{ text: "two" }, ""] }], + }) + ).toBe("one\ntwo"); +}); + +test("startAppServer fails fast when initialize returns an error", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, error: { message: "unsupported transport" } }); + } + }; + + await expect(appServer.startAppServer()).rejects.toBeInstanceOf( + appServer.CodexAppServerFallbackError + ); + expect(latestWrites().length).toBeGreaterThan(0); +}); + +test("runCodexTurn promotes thread/start unsupported errors to fallback errors", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, error: { message: "method not found" } }); + } + }; + + await expect( + appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }) + ).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError); +}); + +test("runCodexTurn promotes turn/start unsupported errors to fallback errors", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, error: { message: "unknown method" } }); + } + }; + + await expect( + appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }) + ).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError); +}); + +test("runCodexTurn recovers after unexpected exit", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + processes.at(-1)?.close(); + return; + } + }; + + await expect( + appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }) + ).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError); + + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-2" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-2" } } }); + setTimeout(() => { + write({ + method: "turn/completed", + params: { + turnId: "turn-2", + turn: { id: "turn-2", status: "completed" }, + }, + }); + }, 0); + } + }; + + const result = await appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }); + + const frames = latestWrites().map((line) => JSON.parse(line)); + expect(result.exitCode).toBe(0); + expect(frames.some((frame) => frame.method === "initialize")).toBe(true); + expect(frames.some((frame) => frame.method === "thread/start")).toBe(true); +}); + +test("runCodexTurn parses successful deltas and completion", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + setTimeout(() => { + write({ + method: "item/agentMessage/delta", + params: { + turnId: "turn-1", + delta: "hello", + }, + }); + write({ + method: "item/completed", + params: { + item: { type: "agentMessage", content: [{ text: " there" }] }, + }, + }); + write({ + method: "turn/completed", + params: { + turnId: "turn-1", + turn: { id: "turn-1", status: "completed" }, + }, + }); + }, 0); + } + }; + + const parsedLines: string[] = []; + const rawLines: string[] = []; + const result = await appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: (text) => parsedLines.push(text), + onRaw: (text) => rawLines.push(text), + }); + + expect(result.exitCode).toBe(0); + expect(result.parsed).toContain("hello"); + expect(result.parsed).toContain("there"); + expect(parsedLines).toContain("hello"); + expect(parsedLines.some((line) => line.includes("there"))).toBe(true); + expect(result.combined).toContain("item/agentMessage/delta"); + expect(rawLines.length).toBeGreaterThan(0); + expect(rawLines.join(" ")).toContain("item/agentMessage/delta"); + expect(rawLines.join(" ")).toContain("turn/completed"); +}); + +test("runCodexTurn maps failed turns to non-zero exit", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + setTimeout(() => { + write({ + method: "turn/completed", + params: { + turnId: "turn-1", + turn: { + id: "turn-1", + status: "failed", + error: { message: "policy blocked" }, + }, + }, + }); + }, 0); + } + }; + + const result = await appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }); + + expect(result.exitCode).toBe(1); + expect(result.parsed).toContain("policy blocked"); +}); + +test("runCodexTurn maps root-level failed status to non-zero exit", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + setTimeout(() => { + write({ + method: "turn/completed", + params: { + turnId: "turn-1", + status: "failed", + error: { message: "root error" }, + }, + }); + }, 0); + } + }; + + const result = await appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }); + + expect(result.exitCode).toBe(1); + expect(result.parsed).toContain("root error"); +}); + +test("runCodexTurn accepts snake_case agent message item type", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + setTimeout(() => { + write({ + method: "item/completed", + params: { + item: { + type: "agent_message", + content: [{ text: "works with snake_case" }], + }, + }, + }); + write({ + method: "turn/completed", + params: { + turnId: "turn-1", + turn: { id: "turn-1", status: "completed" }, + }, + }); + }, 0); + } + }; + + const result = await appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }); + + expect(result.exitCode).toBe(0); + expect(result.parsed).toContain("works with snake_case"); +}); + +test("runCodexTurn responds to approval requests", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + setTimeout(() => { + write({ + id: 101, + method: "item/commandExecution/requestApproval", + params: { + command: "rm -rf /", + }, + }); + write({ + method: "turn/completed", + params: { + turnId: "turn-1", + turn: { id: "turn-1", status: "completed" }, + }, + }); + }, 0); + } + }; + + const result = await appServer.runCodexTurn( + "dangerous command", + makeOptions(), + { + onParsed: () => undefined, + onRaw: () => undefined, + } + ); + + const responses = latestWrites().map( + (line) => JSON.parse(line) as RequestFrame + ); + const accepted = responses.some( + (entry) => + (typeof entry.id === "number" || typeof entry.id === "string") && + String(entry.id) === "101" && + (entry.result as Record)?.decision === "accept" + ); + expect(accepted).toBe(true); + expect(result.exitCode).toBe(0); +}); + +test("runCodexTurn falls back to exec mode when thread/start is unsupported", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ + id: request.id, + error: { message: "method not found: thread/start" }, + }); + } + }; + + await expect( + appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }) + ).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError); +}); + +test("runCodexTurn falls back to exec mode when turn/start is unsupported", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ + id: request.id, + error: { message: "unsupported: turn/start" }, + }); + } + }; + + await expect( + appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }) + ).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError); +}); + +test("runCodexTurn recovers after an unexpected app-server exit and can restart", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-1" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + setTimeout(() => { + const latest = processes.at(-1); + latest?.close(); + }, 0); + } + }; + + await expect( + appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }) + ).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError); + + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/start") { + write({ id: request.id, result: { thread: { id: "thread-2" } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-2" } } }); + setTimeout(() => { + write({ + method: "turn/completed", + params: { + turnId: "turn-2", + turn: { id: "turn-2", status: "completed" }, + }, + }); + }, 0); + } + }; + + await appServer.startAppServer(); + const result = await appServer.runCodexTurn("say hi", makeOptions(), { + onParsed: () => undefined, + onRaw: () => undefined, + }); + expect(result.exitCode).toBe(0); +}); diff --git a/tests/loop/main.test.ts b/tests/loop/main.test.ts index c9e4d7d..cc77d51 100644 --- a/tests/loop/main.test.ts +++ b/tests/loop/main.test.ts @@ -40,6 +40,8 @@ const loadRunLoop = async (mocks: { runReview?: () => Promise; question?: () => Promise; }) => { + mock.restore(); + const realReview = await import("../../src/loop/review"); mock.module("node:readline/promises", () => ({ createInterface: mock(() => ({ close: mock(() => undefined), @@ -52,6 +54,7 @@ const loadRunLoop = async (mocks: { mock.module("../../src/loop/review", () => ({ resolveReviewers: mock(mocks.resolveReviewers ?? (() => [])), runReview: mock(mocks.runReview ?? noopReview), + createRunReview: realReview.createRunReview, })); mock.module("../../src/loop/runner", () => ({ runAgent: mock(mocks.runAgent ?? (async () => makeRunResult("working"))), @@ -126,7 +129,19 @@ test("runLoop uses follow-up commit prompt after a PR is already created", async question: async () => answers.shift() ?? "", }); - await runLoop("Ship feature", makeOptions({ review: "claudex" })); + const originalIsTty = process.stdin.isTTY; + Object.defineProperty(process.stdin, "isTTY", { + configurable: true, + value: true, + }); + try { + await runLoop("Ship feature", makeOptions({ review: "claudex" })); + } finally { + Object.defineProperty(process.stdin, "isTTY", { + configurable: true, + value: originalIsTty, + }); + } expect(runAgent).toHaveBeenCalledTimes(2); expect(runReview).toHaveBeenCalledTimes(2); diff --git a/tests/loop/panel.test.ts b/tests/loop/panel.test.ts index 2c228fa..d7f28ba 100644 --- a/tests/loop/panel.test.ts +++ b/tests/loop/panel.test.ts @@ -1,4 +1,7 @@ import { expect, test } from "bun:test"; +import { mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; import { panelInternals } from "../../src/loop/panel"; test("parseProcessList keeps codex engine and deduplicates wrapper", () => { @@ -15,6 +18,85 @@ test("parseProcessList keeps codex engine and deduplicates wrapper", () => { ]); }); +test("parseProcessList deduplicates codex app-server wrapper and engine", () => { + const output = ` + PID PPID COMMAND + 66100 1200 codex app-server + 66101 66100 /usr/local/bin/node /Users/me/.nvm/.../node_modules/@openai/codex/dist/app-server.js + 77220 1201 claude + `.trim(); + const rows = panelInternals.parseProcessList(output); + expect(rows).toEqual([ + { agent: "codex", pid: 66_101 }, + { agent: "claude", pid: 77_220 }, + ]); +}); + +test("parseProcessList handles node app-server launches with runtime flags", () => { + const output = ` + PID PPID COMMAND + 66110 1200 node --no-warnings /Users/me/.nvm/.../node_modules/@openai/codex/dist/app-server.js + 77230 1201 claude + `.trim(); + const rows = panelInternals.parseProcessList(output); + expect(rows).toEqual([ + { agent: "codex", pid: 66_110 }, + { agent: "claude", pid: 77_230 }, + ]); +}); + +test("parseProcessList detects a direct codex app-server process", () => { + const output = ` + PID PPID COMMAND + 66200 1200 /usr/local/bin/codex-app-server run + 77221 1201 claude +`.trim(); + const rows = panelInternals.parseProcessList(output); + expect(rows).toEqual([ + { agent: "codex", pid: 66_200 }, + { agent: "claude", pid: 77_221 }, + ]); +}); + +test("parseProcessList detects a bare app-server process", () => { + const output = ` + PID PPID COMMAND + 66300 1200 app-server + 77231 1201 claude + `.trim(); + const rows = panelInternals.parseProcessList(output); + expect(rows).toEqual([ + { agent: "codex", pid: 66_300 }, + { agent: "claude", pid: 77_231 }, + ]); +}); + +test("parseProcessList detects a bun app-server invocation with flags", () => { + const output = ` + PID PPID COMMAND + 66201 1200 bun --inspect=0.0.0.0:9229 /Users/me/.nvm/versions/node/v24.9.0/bin/codex app-server + 77222 1201 claude +`.trim(); + const rows = panelInternals.parseProcessList(output); + expect(rows).toEqual([ + { agent: "codex", pid: 66_201 }, + { agent: "claude", pid: 77_222 }, + ]); +}); + +test("parseProcessList detects a standalone app-server invocation", () => { + const output = ` + PID PPID COMMAND + 66202 1200 app-server run + 77223 1201 claude +`.trim(); + const rows = panelInternals.parseProcessList(output); + expect(rows).toEqual([ + { agent: "codex", pid: 66_202 }, + { agent: "claude", pid: 77_223 }, + ]); +}); + test("parseLsofSnapshot extracts cwd and file names", () => { const output = ` p54666 @@ -143,3 +225,21 @@ test("buildLines uses stacked layout on narrow terminals", () => { expect(lines.some((line) => line.startsWith("session: "))).toBe(true); expect(lines.some((line) => line.startsWith("final: "))).toBe(true); }); + +test("parseCodexHistoryRow falls back to top-level session keys when payload is missing", () => { + const dir = mkdtempSync(join(tmpdir(), "loop-codex-history-")); + const path = join(dir, "session-123.jsonl"); + writeFileSync( + path, + `${JSON.stringify({ id: "session-123", cwd: "/repo/example" })}\n${JSON.stringify( + { + type: "turn/completed", + } + )}\n` + ); + const row = panelInternals.parseCodexHistoryRow(path); + rmSync(dir, { recursive: true, force: true }); + expect(row?.row.session).toBe("session-123"); + expect(row?.row.cwd).toBe("/repo/example"); + expect(row?.row.event).toBe("turn/completed"); +}); diff --git a/tests/loop/runner.test.ts b/tests/loop/runner.test.ts new file mode 100644 index 0000000..3ec52a8 --- /dev/null +++ b/tests/loop/runner.test.ts @@ -0,0 +1,230 @@ +import { afterAll, beforeEach, expect, mock, test } from "bun:test"; +import { resolve } from "node:path"; +import type { Options, RunResult } from "../../src/loop/types"; + +interface AppServerModule { + CodexAppServerFallbackError: typeof Error; +} + +const CODEX_TRANSPORT_ENV = "CODEX_TRANSPORT"; +const CODEX_TRANSPORT_EXEC = "exec"; +const projectRoot = process.cwd(); +const runnerPath = resolve(projectRoot, "src/loop/runner.ts"); +const runnerImportPath = `${runnerPath}?runner-test`; +const codexAppServerPath = resolve(projectRoot, "src/loop/codex-app-server.ts"); + +type MockFn unknown> = ReturnType< + typeof mock +>; + +const makeResult = (overrides: Partial = {}): RunResult => ({ + combined: "", + exitCode: 0, + parsed: "", + ...overrides, +}); + +const makeOptions = (opts: Partial = {}): Options => ({ + agent: "codex", + doneSignal: "", + format: "raw", + maxIterations: 1, + model: "test-model", + proof: "verify", + ...opts, +}); + +class RunnerCodexFallbackError extends Error {} +const appServerFallback: AppServerModule["CodexAppServerFallbackError"] = + RunnerCodexFallbackError; + +const hasAppServerProcess: MockFn<() => boolean> = mock(() => false); +const interruptAppServer: MockFn<(signal: "SIGINT" | "SIGTERM") => void> = mock( + () => undefined +); +const runCodexTurn: MockFn< + (_prompt: string, _opts: Options) => Promise +> = mock(async () => makeResult()); +const runLegacyAgent: MockFn< + (agent: string, prompt: string, opts: Options) => Promise +> = mock(async () => makeResult()); +let runAgent: ( + agent: string, + prompt: string, + opts: Options +) => Promise; +const startAppServer: MockFn<() => Promise> = mock(async () => undefined); +const useAppServer: MockFn<() => boolean> = mock( + () => process.env[CODEX_TRANSPORT_ENV] !== CODEX_TRANSPORT_EXEC +); +let runnerInternals: { + reset: () => void; + setLegacyAgent: ( + next: (agent: string, prompt: string, opts: Options) => Promise + ) => void; + setUseAppServer: (next: () => boolean) => void; +}; + +const installCodexServerMock = (): void => { + mock.module(codexAppServerPath, () => ({ + CODEX_TRANSPORT_ENV, + CODEX_TRANSPORT_EXEC, + CodexAppServerFallbackError: appServerFallback, + hasAppServerProcess, + interruptAppServer, + runCodexTurn, + runLegacyAgent, + startAppServer, + useAppServer, + })); +}; + +mock.restore(); +installCodexServerMock(); + +beforeEach(async () => { + mock.restore(); + installCodexServerMock(); + ({ runAgent, runnerInternals } = await import(runnerImportPath)); + process.env[CODEX_TRANSPORT_ENV] = ""; + startAppServer.mockReset(); + startAppServer.mockResolvedValue(undefined); + hasAppServerProcess.mockReset(); + hasAppServerProcess.mockReturnValue(false); + interruptAppServer.mockReset(); + runCodexTurn.mockReset(); + runCodexTurn.mockResolvedValue(makeResult()); + runLegacyAgent.mockReset(); + runLegacyAgent.mockResolvedValue(makeResult()); + useAppServer.mockReset(); + useAppServer.mockImplementation( + () => process.env[CODEX_TRANSPORT_ENV] !== CODEX_TRANSPORT_EXEC + ); + runnerInternals.reset(); + runnerInternals.setUseAppServer(() => useAppServer()); + runnerInternals.setLegacyAgent( + (agent: string, prompt: string, opts: Options) => + runLegacyAgent(agent, prompt, opts) + ); +}); + +afterAll(() => { + mock.restore(); +}); + +test("runAgent uses app-server transport by default", async () => { + const result = await runAgent("codex", "say hello", makeOptions()); + + expect(result.exitCode).toBe(0); + expect(startAppServer).toHaveBeenCalledTimes(1); + expect(runCodexTurn).toHaveBeenCalledTimes(1); + expect(runnerInternals).toBeDefined(); +}); + +test("runAgent honors CODEX_TRANSPORT=exec and uses legacy codex exec", async () => { + process.env[CODEX_TRANSPORT_ENV] = CODEX_TRANSPORT_EXEC; + runnerInternals.setUseAppServer(() => false); + runLegacyAgent.mockResolvedValue(makeResult({ parsed: "legacy done" })); + + const result = await runAgent("codex", "say hello", makeOptions()); + + expect(result.exitCode).toBe(0); + expect(result.parsed).toBe("legacy done"); + expect(runLegacyAgent).toHaveBeenCalledTimes(1); + expect(startAppServer).not.toHaveBeenCalled(); + expect(runCodexTurn).not.toHaveBeenCalled(); + expect(useAppServer).not.toHaveBeenCalled(); +}); + +test("runAgent propagates turn/completed success exit code", async () => { + runCodexTurn.mockResolvedValue(makeResult({ exitCode: 0, parsed: "done" })); + + const result = await runAgent("codex", "say hi", makeOptions()); + + expect(result.exitCode).toBe(0); +}); + +test("runAgent propagates turn/completed failure exit code", async () => { + runCodexTurn.mockResolvedValue(makeResult({ exitCode: 1, parsed: "failed" })); + + const result = await runAgent("codex", "say hi", makeOptions()); + + expect(result.exitCode).toBe(1); + expect(result.parsed).toBe("failed"); +}); + +test("runAgent only falls back to legacy once per process for app-server compatibility errors", async () => { + runCodexTurn.mockImplementation(() => { + throw new appServerFallback("app-server unsupported"); + }); + + const originalError = console.error; + const errorSpy = mock(() => undefined); + console.error = errorSpy; + + try { + const first = await runAgent("codex", "say hi", makeOptions()); + const second = await runAgent("codex", "say hi again", makeOptions()); + + expect(first.exitCode).toBe(0); + expect(second.exitCode).toBe(0); + expect(errorSpy).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledWith( + "[loop] codex app-server transport failed. Falling back to `codex exec --json`." + ); + expect(runLegacyAgent).toHaveBeenCalledTimes(2); + } finally { + console.error = originalError; + } +}); + +test("runAgent does not fallback on non-compatibility app-server failures", async () => { + runCodexTurn.mockImplementation(() => { + throw new Error("something else"); + }); + + await expect(runAgent("codex", "say hi", makeOptions())).rejects.toThrow( + "something else" + ); + expect(runLegacyAgent).not.toHaveBeenCalled(); +}); + +test("SIGINT forwards to interruptAppServer while app-server run is active", async () => { + let resolveTurn: ((result: RunResult) => void) | undefined; + runCodexTurn.mockImplementation( + () => + new Promise((resolve) => { + resolveTurn = resolve; + }) + ); + + const signalBase = process.listenerCount("SIGINT"); + const signalTermBase = process.listenerCount("SIGTERM"); + + const originalExit = process.exit; + const exitSpy = mock((code?: number): never => { + return code as never; + }); + (process as { exit: typeof exitSpy }).exit = exitSpy; + + try { + const pending = runAgent("codex", "say hi", makeOptions()); + + await Promise.resolve(); + expect(process.listenerCount("SIGINT")).toBe(signalBase + 1); + expect(process.listenerCount("SIGTERM")).toBe(signalTermBase + 1); + + process.emit("SIGINT"); + expect(interruptAppServer).toHaveBeenCalledTimes(1); + expect(interruptAppServer).toHaveBeenCalledWith("SIGINT"); + expect(exitSpy).toHaveBeenCalledWith(130); + + resolveTurn?.(makeResult()); + const result = await pending; + expect(result.exitCode).toBe(0); + expect(process.listenerCount("SIGINT")).toBe(signalBase); + expect(process.listenerCount("SIGTERM")).toBe(signalTermBase); + } finally { + process.exit = originalExit; + } +}); diff --git a/tests/loop/task.test.ts b/tests/loop/task.test.ts index 96b1eb9..fa50074 100644 --- a/tests/loop/task.test.ts +++ b/tests/loop/task.test.ts @@ -38,10 +38,12 @@ const loadResolveTask = async (deps: TaskDeps = {}) => { parsed: "", })) ); + const realUtils = await import("../../src/loop/utils"); mock.module("../../src/loop/utils", () => ({ isFile: isFileMock, readPrompt: readPromptMock, + hasSignal: realUtils.hasSignal, })); mock.module("../../src/loop/runner", () => ({ runAgent: runAgentMock }));