From 3ce7b1f2b2dc141f50873c32d95760edb2082560 Mon Sep 17 00:00:00 2001 From: Axel Delafosse Date: Tue, 24 Feb 2026 17:00:24 -0800 Subject: [PATCH 1/2] fix: harden agent process lifecycle handling - run app-server and legacy agent processes detached on non-Windows platforms - kill process groups when interrupting or shutting down, with child.kill fallback - remove cached thread id in app-server mode so each turn starts a fresh thread - forward frontend websocket frames directly to Claude SDK and restart session after each turn - add test coverage for detached process-group interrupt behavior --- src/loop/claude-sdk-server.ts | 30 +++++++------------ src/loop/codex-app-server.ts | 37 ++++++++++++++++------- src/loop/runner.ts | 28 +++++++++++++++++- tests/loop/codex-app-server.test.ts | 46 +++++++++++++++++++++++++++-- 4 files changed, 107 insertions(+), 34 deletions(-) diff --git a/src/loop/claude-sdk-server.ts b/src/loop/claude-sdk-server.ts index 8d241ed..64e8e55 100644 --- a/src/loop/claude-sdk-server.ts +++ b/src/loop/claude-sdk-server.ts @@ -232,23 +232,9 @@ class ClaudeSdkClient { _ws: ServerWebSocket, text: string ): void { - try { - const msg = JSON.parse(text); - if ( - msg.type === "message" && - typeof msg.content === "string" && - this.ws - ) { - this.sendJson({ - type: "user", - message: { role: "user", content: msg.content }, - parent_tool_use_id: null, - session_id: this.sessionId, - }); - } - } catch { - // ignore parse errors from frontends - } + // Dumb pipe: forward raw to Claude. Frontend is responsible for + // sending messages in Claude's native format. + this.ws?.send(text); } private createServer(): void { @@ -465,7 +451,7 @@ class ClaudeSdkClient { throw new Error("claude sdk server not connected"); } - return new Promise((resolve, reject) => { + const result = await new Promise((resolve, reject) => { const timeout = setTimeout(() => { if (this.turn) { this.turn = undefined; @@ -480,9 +466,9 @@ class ClaudeSdkClient { onParsed, onRaw, parsed: "", - resolve: (result) => { + resolve: (r) => { clearTimeout(timeout); - resolve(result); + resolve(r); }, reject: (error) => { clearTimeout(timeout); @@ -497,6 +483,10 @@ class ClaudeSdkClient { session_id: this.sessionId, }); }); + + // Restart the process so the next turn gets a fresh session ID + await this.cleanup(); + return result; } private async cleanup(): Promise { diff --git a/src/loop/codex-app-server.ts b/src/loop/codex-app-server.ts index 2b6d2e1..9a3d1d6 100644 --- a/src/loop/codex-app-server.ts +++ b/src/loop/codex-app-server.ts @@ -3,6 +3,7 @@ import { findFreePort } from "./ports"; import type { Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; +type KillSignal = ExitSignal | "SIGKILL"; type TransportMode = "app-server" | "exec"; type Callback = (text: string) => void; interface RunCodexTurnCallbacks { @@ -43,6 +44,7 @@ const WS_CONNECT_DELAY_MS = 150; const USER_INPUT_TEXT_ELEMENTS = "text_elements"; const WAIT_TIMEOUT_MS = 600_000; const NOOP_CALLBACK: Callback = () => undefined; +const DETACH_CHILD_PROCESS = process.platform !== "win32"; export const CODEX_TRANSPORT_APP_SERVER: TransportMode = "app-server"; export const CODEX_TRANSPORT_EXEC: TransportMode = "exec"; @@ -80,6 +82,25 @@ const defaultConnectWs: ConnectWsFn = async (url) => { }; let connectWsFn: ConnectWsFn = defaultConnectWs; +const killChildProcess = ( + child: ReturnType | undefined, + signal: KillSignal +): void => { + if (!child) { + return; + } + const pid = child.pid; + if (process.platform !== "win32" && typeof pid === "number" && pid > 0) { + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to direct child signaling if group kill is unavailable. + } + } + child.kill(signal); +}; + const isString = (value: unknown): value is string => typeof value === "string" && value.length > 0; const asString = (value: unknown): string | undefined => @@ -234,7 +255,6 @@ class AppServerClient { private started = false; private ready = false; private requestId = 1; - private threadId: string | undefined; private readonly pending = new Map(); private readonly turns = new Map(); private lock: Promise = Promise.resolve(); @@ -259,6 +279,7 @@ class AppServerClient { const child = spawnFn( [APP_SERVER_CMD, "app-server", "--listen", listenUrl], { + detached: DETACH_CHILD_PROCESS, env: process.env, stderr: "pipe", stdin: "pipe", @@ -305,12 +326,11 @@ class AppServerClient { } } if (this.child) { - this.child.kill("SIGTERM"); + killChildProcess(this.child, "SIGTERM"); this.child = undefined; } this.ready = false; this.started = false; - this.threadId = undefined; throw new CodexAppServerFallbackError( toError(error).message || "failed to start codex app-server" ); @@ -358,7 +378,7 @@ class AppServerClient { } interrupt(signal: ExitSignal): void { - this.child?.kill(signal); + killChildProcess(this.child, signal); } async close(): Promise { @@ -378,7 +398,7 @@ class AppServerClient { this.ready = false; return; } - this.child.kill("SIGTERM"); + killChildProcess(this.child, "SIGTERM"); await this.child.exited; this.child = undefined; this.ready = false; @@ -386,9 +406,6 @@ class AppServerClient { } private async ensureThread(model: string): Promise { - if (this.threadId) { - return this.threadId; - } const response = await this.sendRequest(METHOD_THREAD_START, { model, approvalPolicy: "never", @@ -401,7 +418,6 @@ class AppServerClient { "codex app-server returned thread/start without thread id" ); } - this.threadId = thread.id; return thread.id; } @@ -862,7 +878,6 @@ class AppServerClient { } this.started = false; this.ready = false; - this.threadId = undefined; this.failAll( new CodexAppServerFallbackError("codex app-server exited unexpectedly") ); @@ -872,7 +887,7 @@ class AppServerClient { let singleton: AppServerClient | undefined; process.on("exit", () => { - singleton?.process?.kill("SIGKILL"); + killChildProcess(singleton?.process, "SIGKILL"); }); const getClient = (): AppServerClient => { diff --git a/src/loop/runner.ts b/src/loop/runner.ts index 056f2a3..64a38d1 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -20,6 +20,7 @@ import { DEFAULT_CLAUDE_MODEL } from "./constants"; import type { Agent, Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; +type KillSignal = ExitSignal | "SIGKILL"; interface SpawnConfig { args: string[]; cmd: string; @@ -50,12 +51,36 @@ const runnerState: RunnerState = { useAppServer: () => useAppServer(), }; +const killChildProcess = ( + child: ReturnType, + signal: KillSignal +): void => { + const pid = child.pid; + if (process.platform !== "win32" && typeof pid === "number" && pid > 0) { + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to direct child signaling if group kill is unavailable. + } + } + child.kill(signal); +}; + const killChildren = (signal: ExitSignal): void => { for (const child of activeChildren) { - child.kill(signal); + killChildProcess(child, signal); } }; +const killChildrenHard = (): void => { + for (const child of activeChildren) { + killChildProcess(child, "SIGKILL"); + } +}; + +process.on("exit", killChildrenHard); + const onSigint = (): void => { killChildren("SIGINT"); interruptAppServer("SIGINT"); @@ -300,6 +325,7 @@ const runLegacyAgent = async ( ): Promise => { const { args, cmd } = buildCommand(agent, prompt, opts.model); const proc = spawn([cmd, ...args], { + detached: process.platform !== "win32", env: process.env, stderr: "pipe", stdout: "pipe", diff --git a/tests/loop/codex-app-server.test.ts b/tests/loop/codex-app-server.test.ts index e155502..700a6cb 100644 --- a/tests/loop/codex-app-server.test.ts +++ b/tests/loop/codex-app-server.test.ts @@ -20,6 +20,8 @@ interface TestStream { interface TestProcess { close: () => void; + killSignals: string[]; + pid: number; writes: string[]; } @@ -64,6 +66,8 @@ const installSpawn = (appServerModule: AppServerModule): void => { appServerModule.codexAppServerInternals.setSpawnFn( (_command: unknown, _options: unknown): unknown => { const writes: string[] = []; + const killSignals: string[] = []; + const pid = 10_000 + processes.length + 1; const stdout = createStream(); const stderr = createStream(); let exitedResolve = () => undefined; @@ -88,9 +92,11 @@ const installSpawn = (appServerModule: AppServerModule): void => { const child = { exited, - kill: () => { + kill: (signal?: string) => { + killSignals.push(signal ?? "SIGTERM"); close(); }, + pid, stdin: { write: (chunk: string): void => { const lines = chunk.split("\n"); @@ -107,7 +113,7 @@ const installSpawn = (appServerModule: AppServerModule): void => { stderr: stderr.stream, stdout: stdout.stream, }; - processes.push({ close, writes }); + processes.push({ close, killSignals, pid, writes }); return child; } ); @@ -166,6 +172,8 @@ const latestWrites = (): string[] => { return processes.at(-1)?.writes ?? []; }; +const latestProcess = (): TestProcess | undefined => processes.at(-1); + const resetState = async (): Promise => { const appServer = await getModule(); await appServer.closeAppServer(); @@ -593,6 +601,40 @@ test("runCodexTurn falls back to exec mode when turn/start is unsupported", asyn ).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError); }); +test("interruptAppServer kills detached process group when pid is available", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + } + }; + + const originalKill = process.kill; + const killCalls: Array<{ pid: number; signal: NodeJS.Signals | number }> = []; + const killSpy = (( + pid: number, + signal: NodeJS.Signals | number = "SIGTERM" + ): boolean => { + killCalls.push({ pid, signal }); + return true; + }) as typeof process.kill; + (process as { kill: typeof process.kill }).kill = killSpy; + + try { + await appServer.startAppServer(); + const proc = latestProcess(); + expect(proc).toBeDefined(); + appServer.interruptAppServer("SIGTERM"); + expect(killCalls).toContainEqual({ + pid: -(proc?.pid ?? 0), + signal: "SIGTERM", + }); + expect(proc?.killSignals.length).toBe(0); + } finally { + process.kill = originalKill; + } +}); + test("runCodexTurn recovers after an unexpected app-server exit and can restart", async () => { const appServer = await getModule(); currentHandler = (request, write) => { From e98f7065eba39390778deba6687fbd6d7ac245df Mon Sep 17 00:00:00 2001 From: Axel Delafosse Date: Tue, 24 Feb 2026 20:36:50 -0800 Subject: [PATCH 2/2] refactor: unify child process lifecycle handling --- src/loop/claude-sdk-server.ts | 31 ++++++++++++--- src/loop/codex-app-server.ts | 22 +---------- src/loop/process.ts | 26 +++++++++++++ src/loop/runner.ts | 20 +--------- tests/loop/process.test.ts | 71 +++++++++++++++++++++++++++++++++++ 5 files changed, 125 insertions(+), 45 deletions(-) create mode 100644 src/loop/process.ts create mode 100644 tests/loop/process.test.ts diff --git a/src/loop/claude-sdk-server.ts b/src/loop/claude-sdk-server.ts index 64e8e55..5cccb62 100644 --- a/src/loop/claude-sdk-server.ts +++ b/src/loop/claude-sdk-server.ts @@ -1,6 +1,7 @@ import { type Server, type ServerWebSocket, serve, spawn } from "bun"; import { DEFAULT_CLAUDE_MODEL } from "./constants"; import { findFreePort } from "./ports"; +import { DETACH_CHILD_PROCESS, killChildProcess } from "./process"; import type { Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; @@ -92,6 +93,21 @@ const pipeToStderr = (stream: ReadableStream): void => { pump(); }; +const isValidNdjson = (text: string): boolean => { + for (const line of text.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + try { + JSON.parse(trimmed); + } catch { + return false; + } + } + return true; +}; + let spawnFn: SpawnFn = spawn; export const claudeSdkInternals = { @@ -165,6 +181,7 @@ class ClaudeSdkClient { url, ], { + detached: DETACH_CHILD_PROCESS, env: process.env, stderr: "pipe", stdout: "pipe", @@ -210,7 +227,7 @@ class ClaudeSdkClient { } interrupt(signal: ExitSignal): void { - this.child?.kill(signal); + killChildProcess(this.child, signal); } async close(): Promise { @@ -232,8 +249,9 @@ class ClaudeSdkClient { _ws: ServerWebSocket, text: string ): void { - // Dumb pipe: forward raw to Claude. Frontend is responsible for - // sending messages in Claude's native format. + if (!isValidNdjson(text)) { + return; + } this.ws?.send(text); } @@ -484,7 +502,8 @@ class ClaudeSdkClient { }); }); - // Restart the process so the next turn gets a fresh session ID + // Claude SDK session state is process-bound, so restart per turn to force a + // fresh session ID and avoid carrying state across independent loop turns. await this.cleanup(); return result; } @@ -511,7 +530,7 @@ class ClaudeSdkClient { this.server = undefined; } if (this.child) { - this.child.kill("SIGTERM"); + killChildProcess(this.child, "SIGTERM"); await this.child.exited; this.child = undefined; } @@ -537,7 +556,7 @@ class ClaudeSdkClient { let singleton: ClaudeSdkClient | undefined; process.on("exit", () => { - singleton?.process?.kill("SIGKILL"); + killChildProcess(singleton?.process, "SIGKILL"); }); const getClient = (): ClaudeSdkClient => { diff --git a/src/loop/codex-app-server.ts b/src/loop/codex-app-server.ts index 9a3d1d6..8f4937e 100644 --- a/src/loop/codex-app-server.ts +++ b/src/loop/codex-app-server.ts @@ -1,9 +1,9 @@ import { spawn } from "bun"; import { findFreePort } from "./ports"; +import { DETACH_CHILD_PROCESS, killChildProcess } from "./process"; import type { Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; -type KillSignal = ExitSignal | "SIGKILL"; type TransportMode = "app-server" | "exec"; type Callback = (text: string) => void; interface RunCodexTurnCallbacks { @@ -44,7 +44,6 @@ const WS_CONNECT_DELAY_MS = 150; const USER_INPUT_TEXT_ELEMENTS = "text_elements"; const WAIT_TIMEOUT_MS = 600_000; const NOOP_CALLBACK: Callback = () => undefined; -const DETACH_CHILD_PROCESS = process.platform !== "win32"; export const CODEX_TRANSPORT_APP_SERVER: TransportMode = "app-server"; export const CODEX_TRANSPORT_EXEC: TransportMode = "exec"; @@ -82,25 +81,6 @@ const defaultConnectWs: ConnectWsFn = async (url) => { }; let connectWsFn: ConnectWsFn = defaultConnectWs; -const killChildProcess = ( - child: ReturnType | undefined, - signal: KillSignal -): void => { - if (!child) { - return; - } - const pid = child.pid; - if (process.platform !== "win32" && typeof pid === "number" && pid > 0) { - try { - process.kill(-pid, signal); - return; - } catch { - // Fall back to direct child signaling if group kill is unavailable. - } - } - child.kill(signal); -}; - const isString = (value: unknown): value is string => typeof value === "string" && value.length > 0; const asString = (value: unknown): string | undefined => diff --git a/src/loop/process.ts b/src/loop/process.ts new file mode 100644 index 0000000..0376dfe --- /dev/null +++ b/src/loop/process.ts @@ -0,0 +1,26 @@ +import type { spawn } from "bun"; + +type ExitSignal = "SIGINT" | "SIGTERM"; +export type KillSignal = ExitSignal | "SIGKILL"; +export type ChildProcess = ReturnType; + +export const DETACH_CHILD_PROCESS = process.platform !== "win32"; + +export const killChildProcess = ( + child: ChildProcess | undefined, + signal: KillSignal +): void => { + if (!child) { + return; + } + const pid = child.pid; + if (DETACH_CHILD_PROCESS && typeof pid === "number" && pid > 0) { + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to direct child signaling if group kill is unavailable. + } + } + child.kill(signal); +}; diff --git a/src/loop/runner.ts b/src/loop/runner.ts index 64a38d1..fcc15cb 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -17,10 +17,10 @@ import { } from "./codex-app-server"; import { createCodexRenderer } from "./codex-render"; import { DEFAULT_CLAUDE_MODEL } from "./constants"; +import { DETACH_CHILD_PROCESS, killChildProcess } from "./process"; import type { Agent, Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; -type KillSignal = ExitSignal | "SIGKILL"; interface SpawnConfig { args: string[]; cmd: string; @@ -51,22 +51,6 @@ const runnerState: RunnerState = { useAppServer: () => useAppServer(), }; -const killChildProcess = ( - child: ReturnType, - signal: KillSignal -): void => { - const pid = child.pid; - if (process.platform !== "win32" && typeof pid === "number" && pid > 0) { - try { - process.kill(-pid, signal); - return; - } catch { - // Fall back to direct child signaling if group kill is unavailable. - } - } - child.kill(signal); -}; - const killChildren = (signal: ExitSignal): void => { for (const child of activeChildren) { killChildProcess(child, signal); @@ -325,7 +309,7 @@ const runLegacyAgent = async ( ): Promise => { const { args, cmd } = buildCommand(agent, prompt, opts.model); const proc = spawn([cmd, ...args], { - detached: process.platform !== "win32", + detached: DETACH_CHILD_PROCESS, env: process.env, stderr: "pipe", stdout: "pipe", diff --git a/tests/loop/process.test.ts b/tests/loop/process.test.ts new file mode 100644 index 0000000..39f1a35 --- /dev/null +++ b/tests/loop/process.test.ts @@ -0,0 +1,71 @@ +import { expect, mock, test } from "bun:test"; +import type { spawn } from "bun"; +import { + DETACH_CHILD_PROCESS, + type KillSignal, + killChildProcess, +} from "../../src/loop/process"; + +interface MockChildProcess { + kill: ReturnType boolean>>; + pid?: number; +} + +const asChildProcess = (child: MockChildProcess): ReturnType => + child as unknown as ReturnType; + +test("killChildProcess no-ops when child is undefined", () => { + const originalKill = process.kill; + const processKillSpy = mock(() => true) as typeof process.kill; + (process as { kill: typeof process.kill }).kill = processKillSpy; + + try { + killChildProcess(undefined, "SIGTERM"); + expect(processKillSpy).not.toHaveBeenCalled(); + } finally { + process.kill = originalKill; + } +}); + +test("killChildProcess uses process group signaling when detached mode is supported", () => { + const childKillSpy = mock<(signal: KillSignal) => boolean>(() => true); + const child = asChildProcess({ kill: childKillSpy, pid: 1234 }); + const originalKill = process.kill; + const processKillSpy = mock(() => true) as typeof process.kill; + (process as { kill: typeof process.kill }).kill = processKillSpy; + + try { + killChildProcess(child, "SIGTERM"); + if (DETACH_CHILD_PROCESS) { + expect(processKillSpy).toHaveBeenCalledWith(-1234, "SIGTERM"); + expect(childKillSpy).not.toHaveBeenCalled(); + return; + } + expect(processKillSpy).not.toHaveBeenCalled(); + expect(childKillSpy).toHaveBeenCalledWith("SIGTERM"); + } finally { + process.kill = originalKill; + } +}); + +test("killChildProcess falls back to direct child kill when group kill throws", () => { + if (!DETACH_CHILD_PROCESS) { + return; + } + + const childKillSpy = mock<(signal: KillSignal) => boolean>(() => true); + const child = asChildProcess({ kill: childKillSpy, pid: 4321 }); + const originalKill = process.kill; + const processKillSpy = mock(() => { + throw new Error("group kill not available"); + }) as typeof process.kill; + (process as { kill: typeof process.kill }).kill = processKillSpy; + + try { + killChildProcess(child, "SIGKILL"); + expect(processKillSpy).toHaveBeenCalledWith(-4321, "SIGKILL"); + expect(childKillSpy).toHaveBeenCalledWith("SIGKILL"); + } finally { + process.kill = originalKill; + } +});