diff --git a/src/loop/claude-sdk-server.ts b/src/loop/claude-sdk-server.ts index 8d241ed..5cccb62 100644 --- a/src/loop/claude-sdk-server.ts +++ b/src/loop/claude-sdk-server.ts @@ -1,6 +1,7 @@ import { type Server, type ServerWebSocket, serve, spawn } from "bun"; import { DEFAULT_CLAUDE_MODEL } from "./constants"; import { findFreePort } from "./ports"; +import { DETACH_CHILD_PROCESS, killChildProcess } from "./process"; import type { Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; @@ -92,6 +93,21 @@ const pipeToStderr = (stream: ReadableStream): void => { pump(); }; +const isValidNdjson = (text: string): boolean => { + for (const line of text.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + try { + JSON.parse(trimmed); + } catch { + return false; + } + } + return true; +}; + let spawnFn: SpawnFn = spawn; export const claudeSdkInternals = { @@ -165,6 +181,7 @@ class ClaudeSdkClient { url, ], { + detached: DETACH_CHILD_PROCESS, env: process.env, stderr: "pipe", stdout: "pipe", @@ -210,7 +227,7 @@ class ClaudeSdkClient { } interrupt(signal: ExitSignal): void { - this.child?.kill(signal); + killChildProcess(this.child, signal); } async close(): Promise { @@ -232,23 +249,10 @@ class ClaudeSdkClient { _ws: ServerWebSocket, text: string ): void { - try { - const msg = JSON.parse(text); - if ( - msg.type === "message" && - typeof msg.content === "string" && - this.ws - ) { - this.sendJson({ - type: "user", - message: { role: "user", content: msg.content }, - parent_tool_use_id: null, - session_id: this.sessionId, - }); - } - } catch { - // ignore parse errors from frontends + if (!isValidNdjson(text)) { + return; } + this.ws?.send(text); } private createServer(): void { @@ -465,7 +469,7 @@ class ClaudeSdkClient { throw new Error("claude sdk server not connected"); } - return new Promise((resolve, reject) => { + const result = await new Promise((resolve, reject) => { const timeout = setTimeout(() => { if (this.turn) { this.turn = undefined; @@ -480,9 +484,9 @@ class ClaudeSdkClient { onParsed, onRaw, parsed: "", - resolve: (result) => { + resolve: (r) => { clearTimeout(timeout); - resolve(result); + resolve(r); }, reject: (error) => { clearTimeout(timeout); @@ -497,6 +501,11 @@ class ClaudeSdkClient { session_id: this.sessionId, }); }); + + // Claude SDK session state is process-bound, so restart per turn to force a + // fresh session ID and avoid carrying state across independent loop turns. + await this.cleanup(); + return result; } private async cleanup(): Promise { @@ -521,7 +530,7 @@ class ClaudeSdkClient { this.server = undefined; } if (this.child) { - this.child.kill("SIGTERM"); + killChildProcess(this.child, "SIGTERM"); await this.child.exited; this.child = undefined; } @@ -547,7 +556,7 @@ class ClaudeSdkClient { let singleton: ClaudeSdkClient | undefined; process.on("exit", () => { - singleton?.process?.kill("SIGKILL"); + killChildProcess(singleton?.process, "SIGKILL"); }); const getClient = (): ClaudeSdkClient => { diff --git a/src/loop/codex-app-server.ts b/src/loop/codex-app-server.ts index 2b6d2e1..8f4937e 100644 --- a/src/loop/codex-app-server.ts +++ b/src/loop/codex-app-server.ts @@ -1,5 +1,6 @@ import { spawn } from "bun"; import { findFreePort } from "./ports"; +import { DETACH_CHILD_PROCESS, killChildProcess } from "./process"; import type { Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; @@ -234,7 +235,6 @@ class AppServerClient { private started = false; private ready = false; private requestId = 1; - private threadId: string | undefined; private readonly pending = new Map(); private readonly turns = new Map(); private lock: Promise = Promise.resolve(); @@ -259,6 +259,7 @@ class AppServerClient { const child = spawnFn( [APP_SERVER_CMD, "app-server", "--listen", listenUrl], { + detached: DETACH_CHILD_PROCESS, env: process.env, stderr: "pipe", stdin: "pipe", @@ -305,12 +306,11 @@ class AppServerClient { } } if (this.child) { - this.child.kill("SIGTERM"); + killChildProcess(this.child, "SIGTERM"); this.child = undefined; } this.ready = false; this.started = false; - this.threadId = undefined; throw new CodexAppServerFallbackError( toError(error).message || "failed to start codex app-server" ); @@ -358,7 +358,7 @@ class AppServerClient { } interrupt(signal: ExitSignal): void { - this.child?.kill(signal); + killChildProcess(this.child, signal); } async close(): Promise { @@ -378,7 +378,7 @@ class AppServerClient { this.ready = false; return; } - this.child.kill("SIGTERM"); + killChildProcess(this.child, "SIGTERM"); await this.child.exited; this.child = undefined; this.ready = false; @@ -386,9 +386,6 @@ class AppServerClient { } private async ensureThread(model: string): Promise { - if (this.threadId) { - return this.threadId; - } const response = await this.sendRequest(METHOD_THREAD_START, { model, approvalPolicy: "never", @@ -401,7 +398,6 @@ class AppServerClient { "codex app-server returned thread/start without thread id" ); } - this.threadId = thread.id; return thread.id; } @@ -862,7 +858,6 @@ class AppServerClient { } this.started = false; this.ready = false; - this.threadId = undefined; this.failAll( new CodexAppServerFallbackError("codex app-server exited unexpectedly") ); @@ -872,7 +867,7 @@ class AppServerClient { let singleton: AppServerClient | undefined; process.on("exit", () => { - singleton?.process?.kill("SIGKILL"); + killChildProcess(singleton?.process, "SIGKILL"); }); const getClient = (): AppServerClient => { diff --git a/src/loop/process.ts b/src/loop/process.ts new file mode 100644 index 0000000..0376dfe --- /dev/null +++ b/src/loop/process.ts @@ -0,0 +1,26 @@ +import type { spawn } from "bun"; + +type ExitSignal = "SIGINT" | "SIGTERM"; +export type KillSignal = ExitSignal | "SIGKILL"; +export type ChildProcess = ReturnType; + +export const DETACH_CHILD_PROCESS = process.platform !== "win32"; + +export const killChildProcess = ( + child: ChildProcess | undefined, + signal: KillSignal +): void => { + if (!child) { + return; + } + const pid = child.pid; + if (DETACH_CHILD_PROCESS && typeof pid === "number" && pid > 0) { + try { + process.kill(-pid, signal); + return; + } catch { + // Fall back to direct child signaling if group kill is unavailable. + } + } + child.kill(signal); +}; diff --git a/src/loop/runner.ts b/src/loop/runner.ts index 056f2a3..fcc15cb 100644 --- a/src/loop/runner.ts +++ b/src/loop/runner.ts @@ -17,6 +17,7 @@ import { } from "./codex-app-server"; import { createCodexRenderer } from "./codex-render"; import { DEFAULT_CLAUDE_MODEL } from "./constants"; +import { DETACH_CHILD_PROCESS, killChildProcess } from "./process"; import type { Agent, Options, RunResult } from "./types"; type ExitSignal = "SIGINT" | "SIGTERM"; @@ -52,10 +53,18 @@ const runnerState: RunnerState = { const killChildren = (signal: ExitSignal): void => { for (const child of activeChildren) { - child.kill(signal); + killChildProcess(child, signal); } }; +const killChildrenHard = (): void => { + for (const child of activeChildren) { + killChildProcess(child, "SIGKILL"); + } +}; + +process.on("exit", killChildrenHard); + const onSigint = (): void => { killChildren("SIGINT"); interruptAppServer("SIGINT"); @@ -300,6 +309,7 @@ const runLegacyAgent = async ( ): Promise => { const { args, cmd } = buildCommand(agent, prompt, opts.model); const proc = spawn([cmd, ...args], { + detached: DETACH_CHILD_PROCESS, env: process.env, stderr: "pipe", stdout: "pipe", diff --git a/tests/loop/codex-app-server.test.ts b/tests/loop/codex-app-server.test.ts index e155502..700a6cb 100644 --- a/tests/loop/codex-app-server.test.ts +++ b/tests/loop/codex-app-server.test.ts @@ -20,6 +20,8 @@ interface TestStream { interface TestProcess { close: () => void; + killSignals: string[]; + pid: number; writes: string[]; } @@ -64,6 +66,8 @@ const installSpawn = (appServerModule: AppServerModule): void => { appServerModule.codexAppServerInternals.setSpawnFn( (_command: unknown, _options: unknown): unknown => { const writes: string[] = []; + const killSignals: string[] = []; + const pid = 10_000 + processes.length + 1; const stdout = createStream(); const stderr = createStream(); let exitedResolve = () => undefined; @@ -88,9 +92,11 @@ const installSpawn = (appServerModule: AppServerModule): void => { const child = { exited, - kill: () => { + kill: (signal?: string) => { + killSignals.push(signal ?? "SIGTERM"); close(); }, + pid, stdin: { write: (chunk: string): void => { const lines = chunk.split("\n"); @@ -107,7 +113,7 @@ const installSpawn = (appServerModule: AppServerModule): void => { stderr: stderr.stream, stdout: stdout.stream, }; - processes.push({ close, writes }); + processes.push({ close, killSignals, pid, writes }); return child; } ); @@ -166,6 +172,8 @@ const latestWrites = (): string[] => { return processes.at(-1)?.writes ?? []; }; +const latestProcess = (): TestProcess | undefined => processes.at(-1); + const resetState = async (): Promise => { const appServer = await getModule(); await appServer.closeAppServer(); @@ -593,6 +601,40 @@ test("runCodexTurn falls back to exec mode when turn/start is unsupported", asyn ).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError); }); +test("interruptAppServer kills detached process group when pid is available", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + } + }; + + const originalKill = process.kill; + const killCalls: Array<{ pid: number; signal: NodeJS.Signals | number }> = []; + const killSpy = (( + pid: number, + signal: NodeJS.Signals | number = "SIGTERM" + ): boolean => { + killCalls.push({ pid, signal }); + return true; + }) as typeof process.kill; + (process as { kill: typeof process.kill }).kill = killSpy; + + try { + await appServer.startAppServer(); + const proc = latestProcess(); + expect(proc).toBeDefined(); + appServer.interruptAppServer("SIGTERM"); + expect(killCalls).toContainEqual({ + pid: -(proc?.pid ?? 0), + signal: "SIGTERM", + }); + expect(proc?.killSignals.length).toBe(0); + } finally { + process.kill = originalKill; + } +}); + test("runCodexTurn recovers after an unexpected app-server exit and can restart", async () => { const appServer = await getModule(); currentHandler = (request, write) => { diff --git a/tests/loop/process.test.ts b/tests/loop/process.test.ts new file mode 100644 index 0000000..39f1a35 --- /dev/null +++ b/tests/loop/process.test.ts @@ -0,0 +1,71 @@ +import { expect, mock, test } from "bun:test"; +import type { spawn } from "bun"; +import { + DETACH_CHILD_PROCESS, + type KillSignal, + killChildProcess, +} from "../../src/loop/process"; + +interface MockChildProcess { + kill: ReturnType boolean>>; + pid?: number; +} + +const asChildProcess = (child: MockChildProcess): ReturnType => + child as unknown as ReturnType; + +test("killChildProcess no-ops when child is undefined", () => { + const originalKill = process.kill; + const processKillSpy = mock(() => true) as typeof process.kill; + (process as { kill: typeof process.kill }).kill = processKillSpy; + + try { + killChildProcess(undefined, "SIGTERM"); + expect(processKillSpy).not.toHaveBeenCalled(); + } finally { + process.kill = originalKill; + } +}); + +test("killChildProcess uses process group signaling when detached mode is supported", () => { + const childKillSpy = mock<(signal: KillSignal) => boolean>(() => true); + const child = asChildProcess({ kill: childKillSpy, pid: 1234 }); + const originalKill = process.kill; + const processKillSpy = mock(() => true) as typeof process.kill; + (process as { kill: typeof process.kill }).kill = processKillSpy; + + try { + killChildProcess(child, "SIGTERM"); + if (DETACH_CHILD_PROCESS) { + expect(processKillSpy).toHaveBeenCalledWith(-1234, "SIGTERM"); + expect(childKillSpy).not.toHaveBeenCalled(); + return; + } + expect(processKillSpy).not.toHaveBeenCalled(); + expect(childKillSpy).toHaveBeenCalledWith("SIGTERM"); + } finally { + process.kill = originalKill; + } +}); + +test("killChildProcess falls back to direct child kill when group kill throws", () => { + if (!DETACH_CHILD_PROCESS) { + return; + } + + const childKillSpy = mock<(signal: KillSignal) => boolean>(() => true); + const child = asChildProcess({ kill: childKillSpy, pid: 4321 }); + const originalKill = process.kill; + const processKillSpy = mock(() => { + throw new Error("group kill not available"); + }) as typeof process.kill; + (process as { kill: typeof process.kill }).kill = processKillSpy; + + try { + killChildProcess(child, "SIGKILL"); + expect(processKillSpy).toHaveBeenCalledWith(-4321, "SIGKILL"); + expect(childKillSpy).toHaveBeenCalledWith("SIGKILL"); + } finally { + process.kill = originalKill; + } +});