Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions src/loop/claude-sdk-server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type Server, type ServerWebSocket, serve, spawn } from "bun";
import { DEFAULT_CLAUDE_MODEL } from "./constants";
import { findFreePort } from "./ports";
import { DETACH_CHILD_PROCESS, killChildProcess } from "./process";
import type { Options, RunResult } from "./types";

type ExitSignal = "SIGINT" | "SIGTERM";
Expand Down Expand Up @@ -92,6 +93,21 @@ const pipeToStderr = (stream: ReadableStream<Uint8Array>): void => {
pump();
};

const isValidNdjson = (text: string): boolean => {
for (const line of text.split("\n")) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
try {
JSON.parse(trimmed);
} catch {
return false;
}
}
return true;
};

let spawnFn: SpawnFn = spawn;

export const claudeSdkInternals = {
Expand Down Expand Up @@ -165,6 +181,7 @@ class ClaudeSdkClient {
url,
],
{
detached: DETACH_CHILD_PROCESS,
env: process.env,
stderr: "pipe",
stdout: "pipe",
Expand Down Expand Up @@ -210,7 +227,7 @@ class ClaudeSdkClient {
}

interrupt(signal: ExitSignal): void {
this.child?.kill(signal);
killChildProcess(this.child, signal);
}

async close(): Promise<void> {
Expand All @@ -232,23 +249,10 @@ class ClaudeSdkClient {
_ws: ServerWebSocket<WSData>,
text: string
): void {
try {
const msg = JSON.parse(text);
if (
msg.type === "message" &&
typeof msg.content === "string" &&
this.ws
) {
this.sendJson({
type: "user",
message: { role: "user", content: msg.content },
parent_tool_use_id: null,
session_id: this.sessionId,
});
}
} catch {
// ignore parse errors from frontends
if (!isValidNdjson(text)) {
return;
}
this.ws?.send(text);
}

private createServer(): void {
Expand Down Expand Up @@ -465,7 +469,7 @@ class ClaudeSdkClient {
throw new Error("claude sdk server not connected");
}

return new Promise<RunResult>((resolve, reject) => {
const result = await new Promise<RunResult>((resolve, reject) => {
const timeout = setTimeout(() => {
if (this.turn) {
this.turn = undefined;
Expand All @@ -480,9 +484,9 @@ class ClaudeSdkClient {
onParsed,
onRaw,
parsed: "",
resolve: (result) => {
resolve: (r) => {
clearTimeout(timeout);
resolve(result);
resolve(r);
},
reject: (error) => {
clearTimeout(timeout);
Expand All @@ -497,6 +501,11 @@ class ClaudeSdkClient {
session_id: this.sessionId,
});
});

// Claude SDK session state is process-bound, so restart per turn to force a
// fresh session ID and avoid carrying state across independent loop turns.
await this.cleanup();
return result;
}

private async cleanup(): Promise<void> {
Expand All @@ -521,7 +530,7 @@ class ClaudeSdkClient {
this.server = undefined;
}
if (this.child) {
this.child.kill("SIGTERM");
killChildProcess(this.child, "SIGTERM");
await this.child.exited;
this.child = undefined;
}
Expand All @@ -547,7 +556,7 @@ class ClaudeSdkClient {
let singleton: ClaudeSdkClient | undefined;

process.on("exit", () => {
singleton?.process?.kill("SIGKILL");
killChildProcess(singleton?.process, "SIGKILL");
});

const getClient = (): ClaudeSdkClient => {
Expand Down
17 changes: 6 additions & 11 deletions src/loop/codex-app-server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { spawn } from "bun";
import { findFreePort } from "./ports";
import { DETACH_CHILD_PROCESS, killChildProcess } from "./process";
import type { Options, RunResult } from "./types";

type ExitSignal = "SIGINT" | "SIGTERM";
Expand Down Expand Up @@ -234,7 +235,6 @@ class AppServerClient {
private started = false;
private ready = false;
private requestId = 1;
private threadId: string | undefined;
private readonly pending = new Map<string, PendingRequest>();
private readonly turns = new Map<string, TurnState>();
private lock: Promise<void> = Promise.resolve();
Expand All @@ -259,6 +259,7 @@ class AppServerClient {
const child = spawnFn(
[APP_SERVER_CMD, "app-server", "--listen", listenUrl],
{
detached: DETACH_CHILD_PROCESS,
env: process.env,
stderr: "pipe",
stdin: "pipe",
Expand Down Expand Up @@ -305,12 +306,11 @@ class AppServerClient {
}
}
if (this.child) {
this.child.kill("SIGTERM");
killChildProcess(this.child, "SIGTERM");
this.child = undefined;
}
this.ready = false;
this.started = false;
this.threadId = undefined;
throw new CodexAppServerFallbackError(
toError(error).message || "failed to start codex app-server"
);
Expand Down Expand Up @@ -358,7 +358,7 @@ class AppServerClient {
}

interrupt(signal: ExitSignal): void {
this.child?.kill(signal);
killChildProcess(this.child, signal);
}

async close(): Promise<void> {
Expand All @@ -378,17 +378,14 @@ class AppServerClient {
this.ready = false;
return;
}
this.child.kill("SIGTERM");
killChildProcess(this.child, "SIGTERM");
await this.child.exited;
this.child = undefined;
this.ready = false;
this.started = false;
}

private async ensureThread(model: string): Promise<string> {
if (this.threadId) {
return this.threadId;
}
const response = await this.sendRequest(METHOD_THREAD_START, {
model,
approvalPolicy: "never",
Expand All @@ -401,7 +398,6 @@ class AppServerClient {
"codex app-server returned thread/start without thread id"
);
}
this.threadId = thread.id;
return thread.id;
}

Expand Down Expand Up @@ -862,7 +858,6 @@ class AppServerClient {
}
this.started = false;
this.ready = false;
this.threadId = undefined;
this.failAll(
new CodexAppServerFallbackError("codex app-server exited unexpectedly")
);
Expand All @@ -872,7 +867,7 @@ class AppServerClient {
let singleton: AppServerClient | undefined;

process.on("exit", () => {
singleton?.process?.kill("SIGKILL");
killChildProcess(singleton?.process, "SIGKILL");
});

const getClient = (): AppServerClient => {
Expand Down
26 changes: 26 additions & 0 deletions src/loop/process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { spawn } from "bun";

type ExitSignal = "SIGINT" | "SIGTERM";
export type KillSignal = ExitSignal | "SIGKILL";
export type ChildProcess = ReturnType<typeof spawn>;

export const DETACH_CHILD_PROCESS = process.platform !== "win32";

export const killChildProcess = (
child: ChildProcess | undefined,
signal: KillSignal
): void => {
if (!child) {
return;
}
const pid = child.pid;
if (DETACH_CHILD_PROCESS && typeof pid === "number" && pid > 0) {
try {
process.kill(-pid, signal);
return;
} catch {
// Fall back to direct child signaling if group kill is unavailable.
}
}
child.kill(signal);
};
12 changes: 11 additions & 1 deletion src/loop/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from "./codex-app-server";
import { createCodexRenderer } from "./codex-render";
import { DEFAULT_CLAUDE_MODEL } from "./constants";
import { DETACH_CHILD_PROCESS, killChildProcess } from "./process";
import type { Agent, Options, RunResult } from "./types";

type ExitSignal = "SIGINT" | "SIGTERM";
Expand Down Expand Up @@ -52,10 +53,18 @@ const runnerState: RunnerState = {

const killChildren = (signal: ExitSignal): void => {
for (const child of activeChildren) {
child.kill(signal);
killChildProcess(child, signal);
}
};

const killChildrenHard = (): void => {
for (const child of activeChildren) {
killChildProcess(child, "SIGKILL");
}
};

process.on("exit", killChildrenHard);

const onSigint = (): void => {
killChildren("SIGINT");
interruptAppServer("SIGINT");
Expand Down Expand Up @@ -300,6 +309,7 @@ const runLegacyAgent = async (
): Promise<RunResult> => {
const { args, cmd } = buildCommand(agent, prompt, opts.model);
const proc = spawn([cmd, ...args], {
detached: DETACH_CHILD_PROCESS,
env: process.env,
stderr: "pipe",
stdout: "pipe",
Expand Down
46 changes: 44 additions & 2 deletions tests/loop/codex-app-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ interface TestStream {

interface TestProcess {
close: () => void;
killSignals: string[];
pid: number;
writes: string[];
}

Expand Down Expand Up @@ -64,6 +66,8 @@ const installSpawn = (appServerModule: AppServerModule): void => {
appServerModule.codexAppServerInternals.setSpawnFn(
(_command: unknown, _options: unknown): unknown => {
const writes: string[] = [];
const killSignals: string[] = [];
const pid = 10_000 + processes.length + 1;
const stdout = createStream();
const stderr = createStream();
let exitedResolve = () => undefined;
Expand All @@ -88,9 +92,11 @@ const installSpawn = (appServerModule: AppServerModule): void => {

const child = {
exited,
kill: () => {
kill: (signal?: string) => {
killSignals.push(signal ?? "SIGTERM");
close();
},
pid,
stdin: {
write: (chunk: string): void => {
const lines = chunk.split("\n");
Expand All @@ -107,7 +113,7 @@ const installSpawn = (appServerModule: AppServerModule): void => {
stderr: stderr.stream,
stdout: stdout.stream,
};
processes.push({ close, writes });
processes.push({ close, killSignals, pid, writes });
return child;
}
);
Expand Down Expand Up @@ -166,6 +172,8 @@ const latestWrites = (): string[] => {
return processes.at(-1)?.writes ?? [];
};

const latestProcess = (): TestProcess | undefined => processes.at(-1);

const resetState = async (): Promise<void> => {
const appServer = await getModule();
await appServer.closeAppServer();
Expand Down Expand Up @@ -593,6 +601,40 @@ test("runCodexTurn falls back to exec mode when turn/start is unsupported", asyn
).rejects.toBeInstanceOf(appServer.CodexAppServerFallbackError);
});

test("interruptAppServer kills detached process group when pid is available", async () => {
const appServer = await getModule();
currentHandler = (request, write) => {
if (request.method === "initialize") {
write({ id: request.id, result: {} });
}
};

const originalKill = process.kill;
const killCalls: Array<{ pid: number; signal: NodeJS.Signals | number }> = [];
const killSpy = ((
pid: number,
signal: NodeJS.Signals | number = "SIGTERM"
): boolean => {
killCalls.push({ pid, signal });
return true;
}) as typeof process.kill;
(process as { kill: typeof process.kill }).kill = killSpy;

try {
await appServer.startAppServer();
const proc = latestProcess();
expect(proc).toBeDefined();
appServer.interruptAppServer("SIGTERM");
expect(killCalls).toContainEqual({
pid: -(proc?.pid ?? 0),
signal: "SIGTERM",
});
expect(proc?.killSignals.length).toBe(0);
} finally {
process.kill = originalKill;
}
});

test("runCodexTurn recovers after an unexpected app-server exit and can restart", async () => {
const appServer = await getModule();
currentHandler = (request, write) => {
Expand Down
Loading