From 254631a9948db46ad2ff106849f362e563ddebd7 Mon Sep 17 00:00:00 2001 From: Yingjie Qi Date: Mon, 2 Mar 2026 16:31:13 +0800 Subject: [PATCH 1/5] fix: replace kill archiving with file-based IPC and make threads resumable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rework the kill command to use file-based signal IPC instead of directly archiving threads. The running process now polls for a signal file via Promise.race, enabling cross-process kill detection even during slow turn/start requests. Key changes: - Kill signal polling starts before turn/start with two Promise.race stages covering both the request and completion phases - PID file tracking (~/.codex-collab/pids/) enables stale "running" detection in `jobs` when the owning process dies without cleanup - `kill` no longer archives threads — it just interrupts, making threads resumable. `delete` handles permanent cleanup and now also stops running threads before archiving. - Status uses "interrupted" instead of the removed "killed" terminal state, with guard against killing non-running threads - SIGTERM handler added alongside existing SIGINT for graceful shutdown - Thread IDs validated at registration for Windows filename safety - Comprehensive error handling: EPERM-aware process checks, try-catch in setInterval polling, warning logs for all catch blocks - 20 tests covering kill signal races, error propagation, and thread status updates --- SKILL.md | 10 ++- src/cli.test.ts | 5 +- src/cli.ts | 197 ++++++++++++++++++++++++++++++++++++----- src/config.ts | 2 + src/threads.ts | 6 +- src/turns.test.ts | 219 +++++++++++++++++++++++++++++++++++++++++++++- src/turns.ts | 110 ++++++++++++++++++++++- src/types.ts | 2 +- 8 files changed, 518 insertions(+), 33 deletions(-) diff --git a/SKILL.md b/SKILL.md index 7a035fd..72987f2 100644 --- a/SKILL.md +++ b/SKILL.md @@ -32,7 +32,11 @@ codex-collab run --resume "now check the error handling" --content-only codex-collab run "investigate the auth module" -d /path/to/project --content-only ``` -**IMPORTANT: Always use `run_in_background=true` and `dangerouslyDisableSandbox=true`** for all `codex-collab` Bash commands. Tasks take minutes, and the tool writes to `~/.codex-collab/` which is outside the sandbox allowlist. You will be notified automatically when the command finishes. After launching, tell the user it's running and end your turn. Do NOT use TaskOutput, block, poll, wait, or spawn an agent to monitor the result — the background task notification handles this automatically. +**IMPORTANT: Always use `dangerouslyDisableSandbox=true`** for all `codex-collab` Bash commands — the tool writes to `~/.codex-collab/` which is outside the sandbox allowlist. + +For **`run` and `review`** commands, also use `run_in_background=true` — these take minutes. You will be notified automatically when the command finishes. After launching, tell the user it's running and end your turn. Do NOT use TaskOutput, block, poll, wait, or spawn an agent to monitor the result — the background task notification handles this automatically. + +For **all other commands** (`kill`, `jobs`, `progress`, `output`, `approve`, `decline`, `clean`, `delete`, `models`, `health`), run in the **foreground** — they complete in seconds. If the user asks about progress mid-task, use `progress` to check the recent activity: @@ -67,7 +71,7 @@ codex-collab review --resume -d /path/to/project --content-only Review modes: `pr` (default), `uncommitted`, `commit` -**IMPORTANT: Always use `run_in_background=true` and `dangerouslyDisableSandbox=true`** — reviews typically take 5-20 minutes. You will be notified automatically when done. After launching, tell the user it's running and end your turn. Do NOT use TaskOutput, block, poll, wait, or spawn an agent to monitor the result — the background task notification handles this automatically. +**IMPORTANT: Use `run_in_background=true` and `dangerouslyDisableSandbox=true`** — reviews typically take 5-20 minutes. You will be notified automatically when done. After launching, tell the user it's running and end your turn. Do NOT use TaskOutput, block, poll, wait, or spawn an agent to monitor the result — the background task notification handles this automatically. ## Context Efficiency @@ -165,7 +169,7 @@ codex-collab progress # Recent activity (tail of log) ```bash codex-collab jobs # List threads codex-collab jobs --json # List threads (JSON) -codex-collab kill # Interrupt and archive thread +codex-collab kill # Stop a running thread codex-collab delete # Archive thread, delete local files codex-collab clean # Delete old logs and stale mappings ``` diff --git a/src/cli.test.ts b/src/cli.test.ts index d9a3738..8de19d0 100644 --- a/src/cli.test.ts +++ b/src/cli.test.ts @@ -40,9 +40,10 @@ describe("CLI valid commands", () => { }); it("health command runs without crashing", () => { - // May fail if codex not installed, but should not crash with unhandled exception + // May fail if codex not installed, but should not crash with unhandled exception. + // Exit code 143 = SIGTERM during app-server cleanup (our signal handler). const { exitCode } = run("health"); - expect([0, 1]).toContain(exitCode); + expect([0, 1, 143]).toContain(exitCode); }); }); diff --git a/src/cli.ts b/src/cli.ts index bf2f246..34db73e 100755 --- a/src/cli.ts +++ b/src/cli.ts @@ -45,18 +45,34 @@ import type { } from "./types"; // --------------------------------------------------------------------------- -// SIGINT handler — clean up spawned app-server on Ctrl+C +// Signal handlers — clean up spawned app-server and update thread status // --------------------------------------------------------------------------- let activeClient: AppServerClient | undefined; +let activeThreadId: string | undefined; +let activeShortId: string | undefined; let shuttingDown = false; -process.on("SIGINT", async () => { +async function handleShutdownSignal(exitCode: number): Promise { if (shuttingDown) { - process.exit(130); + process.exit(exitCode); } shuttingDown = true; console.error("[codex] Shutting down..."); + + // Update thread status and clean up PID file synchronously before async + // cleanup — ensures the mapping is written even if client.close() hangs. + if (activeThreadId) { + try { + updateThreadStatus(config.threadsFile, activeThreadId, "interrupted"); + } catch (e) { + console.error(`[codex] Warning: could not update thread status during shutdown: ${e instanceof Error ? e.message : String(e)}`); + } + } + if (activeShortId) { + removePidFile(activeShortId); + } + try { if (activeClient) { await activeClient.close(); @@ -64,8 +80,11 @@ process.on("SIGINT", async () => { } catch (e) { console.error(`[codex] Warning: cleanup failed: ${e instanceof Error ? e.message : String(e)}`); } - process.exit(130); -}); + process.exit(exitCode); +} + +process.on("SIGINT", () => handleShutdownSignal(130)); +process.on("SIGTERM", () => handleShutdownSignal(143)); // --------------------------------------------------------------------------- // Argument parsing @@ -397,6 +416,44 @@ function pluralize(n: number, word: string): string { return `${n} ${word}${n === 1 ? "" : "s"}`; } +/** Write a PID file for the current process so cmdJobs can detect stale "running" status. */ +function writePidFile(shortId: string): void { + try { + writeFileSync(join(config.pidsDir, shortId), String(process.pid), { mode: 0o600 }); + } catch (e) { + console.error(`[codex] Warning: could not write PID file: ${e instanceof Error ? e.message : String(e)}`); + } +} + +/** Remove the PID file for a thread. */ +function removePidFile(shortId: string): void { + try { + unlinkSync(join(config.pidsDir, shortId)); + } catch (e) { + if ((e as NodeJS.ErrnoException).code !== "ENOENT") { + console.error(`[codex] Warning: could not remove PID file: ${e instanceof Error ? e.message : String(e)}`); + } + } +} + +/** Check if the process that owns a thread is still alive. */ +function isProcessAlive(shortId: string): boolean { + const pidPath = join(config.pidsDir, shortId); + try { + const pid = Number(readFileSync(pidPath, "utf-8").trim()); + if (!Number.isFinite(pid) || pid <= 0) return false; + process.kill(pid, 0); // signal 0 = existence check + return true; + } catch (e) { + const code = (e as NodeJS.ErrnoException).code; + if (code === "ESRCH" || code === "ENOENT") return false; + if (code === "EPERM") return true; // process exists but we can't signal it + // Unexpected error — assume alive to avoid incorrectly marking live threads as dead + console.error(`[codex] Warning: could not check process for ${shortId}: ${e instanceof Error ? e.message : String(e)}`); + return true; + } +} + // --------------------------------------------------------------------------- // Commands // --------------------------------------------------------------------------- @@ -489,6 +546,9 @@ async function cmdRun(positional: string[], opts: Options) { } updateThreadStatus(config.threadsFile, threadId, "running"); + activeThreadId = threadId; + activeShortId = shortId; + writePidFile(shortId); const dispatcher = createDispatcher(shortId, opts); @@ -510,6 +570,10 @@ async function cmdRun(positional: string[], opts: Options) { } catch (e) { updateThreadStatus(config.threadsFile, threadId, "failed"); throw e; + } finally { + activeThreadId = undefined; + activeShortId = undefined; + removePidFile(shortId); } }); @@ -535,6 +599,9 @@ async function cmdReview(positional: string[], opts: Options) { } updateThreadStatus(config.threadsFile, threadId, "running"); + activeThreadId = threadId; + activeShortId = shortId; + writePidFile(shortId); const dispatcher = createDispatcher(shortId, opts); @@ -553,6 +620,10 @@ async function cmdReview(positional: string[], opts: Options) { } catch (e) { updateThreadStatus(config.threadsFile, threadId, "failed"); throw e; + } finally { + activeThreadId = undefined; + activeShortId = undefined; + removePidFile(shortId); } }); @@ -592,6 +663,18 @@ async function cmdJobs(opts: Options) { // Only show threads created by this CLI (those with a local short ID mapping) let localThreads = allThreads.filter((t) => reverseMap.has(t.id)); + + // Detect stale "running" status: if the owning process is dead, mark as interrupted. + for (const t of localThreads) { + const sid = reverseMap.get(t.id)!; + const entry = mapping[sid]; + if (entry?.lastStatus === "running" && !isProcessAlive(sid)) { + updateThreadStatus(config.threadsFile, t.id, "interrupted"); + entry.lastStatus = "interrupted"; + removePidFile(sid); + } + } + if (opts.limit !== Infinity) localThreads = localThreads.slice(0, opts.limit); if (opts.json) { @@ -634,19 +717,35 @@ async function cmdKill(positional: string[]) { validateIdOrDie(id); const threadId = resolveThreadId(config.threadsFile, id); - - // Check local status — skip server operations if already killed - const mapping = loadThreadMapping(config.threadsFile); const shortId = findShortId(config.threadsFile, threadId); - const localStatus = shortId ? mapping[shortId]?.lastStatus : undefined; - if (localStatus === "killed") { - progress(`Thread ${id} is already killed`); - return; + // Skip kill for threads that have already reached a terminal status + if (shortId) { + const mapping = loadThreadMapping(config.threadsFile); + const localStatus = mapping[shortId]?.lastStatus; + if (localStatus && localStatus !== "running") { + progress(`Thread ${id} is already ${localStatus}`); + return; + } } - const archived = await withClient(async (client) => { - // Try to read thread status first and interrupt active turn if any + // Write kill signal file so the running process can detect the kill + let killSignalWritten = false; + const signalPath = join(config.killSignalsDir, threadId); + try { + writeFileSync(signalPath, "", { mode: 0o600 }); + killSignalWritten = true; + } catch (e) { + console.error( + `[codex] Warning: could not write kill signal: ${e instanceof Error ? e.message : String(e)}. ` + + `The running process may not detect the kill.`, + ); + } + + // Try to interrupt the active turn on the server (immediate effect). + // The kill signal file handles the case where the run process is polling. + let serverInterrupted = false; + await withClient(async (client) => { try { const { thread } = await client.request<{ thread: { @@ -665,6 +764,7 @@ async function cmdKill(positional: string[]) { threadId, turnId: activeTurn.id, }); + serverInterrupted = true; progress(`Interrupted turn ${activeTurn.id}`); } } @@ -673,15 +773,14 @@ async function cmdKill(positional: string[]) { console.error(`[codex] Warning: could not read/interrupt thread: ${e.message}`); } } - - return tryArchive(client, threadId); }); - updateThreadStatus(config.threadsFile, threadId, "killed"); - if (archived === "failed") { - progress(`Killed thread ${id} (server archive failed)`); + updateThreadStatus(config.threadsFile, threadId, "interrupted"); + if (shortId) removePidFile(shortId); + if (killSignalWritten || serverInterrupted) { + progress(`Stopped thread ${id}`); } else { - progress(`Killed thread ${id}`); + progress(`Marked thread ${id} as interrupted, but could not signal the running process`); } } @@ -802,6 +901,8 @@ async function cmdClean() { const logsDeleted = deleteOldFiles(config.logsDir, sevenDaysMs); const approvalsDeleted = deleteOldFiles(config.approvalsDir, oneDayMs); + const killSignalsDeleted = deleteOldFiles(config.killSignalsDir, oneDayMs); + const pidsDeleted = deleteOldFiles(config.pidsDir, oneDayMs); // Clean stale thread mappings — use log file mtime as proxy for last // activity so recently-used threads aren't pruned just because they @@ -835,6 +936,10 @@ async function cmdClean() { if (logsDeleted > 0) parts.push(`${logsDeleted} log files deleted`); if (approvalsDeleted > 0) parts.push(`${approvalsDeleted} approval files deleted`); + if (killSignalsDeleted > 0) + parts.push(`${killSignalsDeleted} kill signal files deleted`); + if (pidsDeleted > 0) + parts.push(`${pidsDeleted} stale PID files deleted`); if (mappingsRemoved > 0) parts.push(`${mappingsRemoved} stale mappings removed`); @@ -853,9 +958,54 @@ async function cmdDelete(positional: string[]) { const threadId = resolveThreadId(config.threadsFile, id); const shortId = findShortId(config.threadsFile, threadId); + // If the thread is currently running, stop it first before archiving + const localStatus = shortId ? loadThreadMapping(config.threadsFile)[shortId]?.lastStatus : undefined; + if (localStatus === "running") { + const signalPath = join(config.killSignalsDir, threadId); + try { + writeFileSync(signalPath, "", { mode: 0o600 }); + } catch (e) { + console.error( + `[codex] Warning: could not write kill signal: ${e instanceof Error ? e.message : String(e)}. ` + + `The running process may not detect the delete.`, + ); + } + } + let archiveResult: "archived" | "already_done" | "failed" = "failed"; try { - archiveResult = await withClient((client) => tryArchive(client, threadId)); + archiveResult = await withClient(async (client) => { + // Interrupt active turn before archiving (only if running) + if (localStatus === "running") { + try { + const { thread } = await client.request<{ + thread: { + id: string; + status: { type: string }; + turns: Array<{ id: string; status: string }>; + }; + }>("thread/read", { threadId, includeTurns: true }); + + if (thread.status.type === "active") { + const activeTurn = thread.turns?.find( + (t) => t.status === "inProgress", + ); + if (activeTurn) { + await client.request("turn/interrupt", { + threadId, + turnId: activeTurn.id, + }); + } + } + } catch (e) { + if (e instanceof Error && !e.message.includes("not found") && !e.message.includes("archived")) { + console.error(`[codex] Warning: could not read/interrupt thread during delete: ${e.message}`); + } + } + } + + return tryArchive(client, threadId); + }); } catch (e) { if (e instanceof Error && !e.message.includes("not found")) { console.error(`[codex] Warning: could not archive on server: ${e.message}`); @@ -863,6 +1013,7 @@ async function cmdDelete(positional: string[]) { } if (shortId) { + removePidFile(shortId); const logPath = join(config.logsDir, `${shortId}.log`); if (existsSync(logPath)) unlinkSync(logPath); removeThread(config.threadsFile, shortId); @@ -912,7 +1063,7 @@ Commands: review [opts] Run code review (PR-style by default) review "instructions" Custom review with specific focus jobs [--json] [--all] List threads (--limit to cap) - kill Interrupt and archive thread + kill Stop a running thread output Read full log for thread progress Show recent activity for thread models List available models @@ -957,6 +1108,8 @@ Examples: function ensureDataDirs(): void { mkdirSync(config.logsDir, { recursive: true }); mkdirSync(config.approvalsDir, { recursive: true }); + mkdirSync(config.killSignalsDir, { recursive: true }); + mkdirSync(config.pidsDir, { recursive: true }); } async function main() { diff --git a/src/config.ts b/src/config.ts index 3d22ac0..896ecba 100644 --- a/src/config.ts +++ b/src/config.ts @@ -36,6 +36,8 @@ export const config = { get threadsFile() { return join(this.dataDir, "threads.json"); }, get logsDir() { return join(this.dataDir, "logs"); }, get approvalsDir() { return join(this.dataDir, "approvals"); }, + get killSignalsDir() { return join(this.dataDir, "kill-signals"); }, + get pidsDir() { return join(this.dataDir, "pids"); }, // Display jobsListLimit: 20, diff --git a/src/threads.ts b/src/threads.ts index 6b8528e..a7e93ba 100644 --- a/src/threads.ts +++ b/src/threads.ts @@ -3,6 +3,7 @@ import { readFileSync, writeFileSync, existsSync, mkdirSync, renameSync, openSync, closeSync, unlinkSync, statSync } from "fs"; import { randomBytes } from "crypto"; import { dirname } from "path"; +import { validateId } from "./config"; import type { ThreadMapping } from "./types"; /** @@ -127,6 +128,7 @@ export function registerThread( threadId: string, meta?: { model?: string; cwd?: string }, ): ThreadMapping { + validateId(threadId); // ensure safe for use as filename (kill signals, etc.) return withThreadLock(threadsFile, () => { const mapping = loadThreadMapping(threadsFile); let shortId = generateShortId(); @@ -171,16 +173,16 @@ export function findShortId(threadsFile: string, threadId: string): string | nul export function updateThreadStatus( threadsFile: string, threadId: string, - status: "running" | "completed" | "failed" | "interrupted" | "killed", + status: "running" | "completed" | "failed" | "interrupted", ): void { withThreadLock(threadsFile, () => { const mapping = loadThreadMapping(threadsFile); let found = false; for (const entry of Object.values(mapping)) { if (entry.threadId === threadId) { + found = true; entry.lastStatus = status; entry.updatedAt = new Date().toISOString(); - found = true; break; } } diff --git a/src/turns.test.ts b/src/turns.test.ts index 17ff893..2b70c74 100644 --- a/src/turns.test.ts +++ b/src/turns.test.ts @@ -8,15 +8,23 @@ import type { TurnCompletedParams, TurnStartResponse, ReviewStartResponse, } from "./types"; -import { mkdirSync, rmSync, existsSync } from "fs"; +import { mkdirSync, rmSync, existsSync, writeFileSync } from "fs"; import { join } from "path"; import { tmpdir } from "os"; +import { + updateThreadStatus, + loadThreadMapping, + saveThreadMapping, +} from "./threads"; const TEST_LOG_DIR = join(tmpdir(), "codex-collab-test-turns"); +const TEST_KILL_DIR = join(tmpdir(), "codex-collab-test-kill-signals"); beforeEach(() => { if (existsSync(TEST_LOG_DIR)) rmSync(TEST_LOG_DIR, { recursive: true }); mkdirSync(TEST_LOG_DIR, { recursive: true }); + if (existsSync(TEST_KILL_DIR)) rmSync(TEST_KILL_DIR, { recursive: true }); + mkdirSync(TEST_KILL_DIR, { recursive: true }); }); // --------------------------------------------------------------------------- @@ -132,6 +140,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -158,6 +167,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -177,6 +187,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("failed"); @@ -202,6 +213,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -218,6 +230,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 200, + killSignalsDir: TEST_KILL_DIR, }); expect(true).toBe(false); // should not reach here } catch (e) { @@ -262,6 +275,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -291,6 +305,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, cwd: "/my/project", model: "gpt-5.3-codex", effort: "high", @@ -327,6 +342,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -364,6 +380,7 @@ describe("runReview", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }, ); @@ -391,6 +408,7 @@ describe("runReview", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }, ); @@ -434,6 +452,7 @@ describe("review output via exitedReviewMode", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }, ); @@ -442,6 +461,203 @@ describe("review output via exitedReviewMode", () => { }); }); +// --------------------------------------------------------------------------- +// Kill signal tests +// --------------------------------------------------------------------------- + +describe("kill signal", () => { + + test("kill signal during slow turn/start returns interrupted result", async () => { + const { client } = buildMockClient((method) => { + if (method === "turn/start") { + // Simulate a slow/stuck turn/start — resolves after 2s + return new Promise((resolve) => + setTimeout(() => resolve(inProgressTurn("turn-1")), 2000), + ); + } + throw new Error(`Unexpected method: ${method}`); + }); + + // Kill signal arrives after 100ms — should win race 1 against the 2s request + setTimeout(() => { + writeFileSync(join(TEST_KILL_DIR, "thr-1"), "", { mode: 0o600 }); + }, 100); + + const dispatcher = new EventDispatcher("test-kill-request", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 10_000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("interrupted"); + expect(result.error).toBe("Thread killed by user"); + expect(existsSync(join(TEST_KILL_DIR, "thr-1"))).toBe(false); + }); + + test("kill signal during turn returns interrupted result", async () => { + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + // Write the kill signal file after a short delay (simulates `codex-collab kill`) + setTimeout(() => { + writeFileSync(join(TEST_KILL_DIR, "thr-1"), "", { mode: 0o600 }); + }, 100); + // Never fire turn/completed — the kill signal should end the turn + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-kill", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 10_000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("interrupted"); + expect(result.error).toBe("Thread killed by user"); + // Signal file should be cleaned up in finally block + expect(existsSync(join(TEST_KILL_DIR, "thr-1"))).toBe(false); + }); + + test("stale signal cleared at turn start — turn completes normally", async () => { + // Pre-write a stale signal file (simulates leftover from a previous kill) + writeFileSync(join(TEST_KILL_DIR, "thr-1"), "", { mode: 0o600 }); + + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + setTimeout(() => emit("turn/completed", completedTurn("turn-1")), 50); + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-stale-kill", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("completed"); + }); + + test("normal completion wins race — no kill signal", async () => { + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + setTimeout(() => emit("turn/completed", completedTurn("turn-1")), 50); + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-no-kill", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("completed"); + }); + + test("signal file cleaned up on normal completion", async () => { + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + setTimeout(() => emit("turn/completed", completedTurn("turn-1")), 50); + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-cleanup", TEST_LOG_DIR, () => {}); + + await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }); + + // unlinkSync in finally should not error when no signal exists + expect(existsSync(join(TEST_KILL_DIR, "thr-1"))).toBe(false); + }); +}); + +// --------------------------------------------------------------------------- +// Non-kill error propagation +// --------------------------------------------------------------------------- + +describe("error propagation", () => { + test("non-kill errors propagate through the catch block", async () => { + const { client } = buildMockClient((method) => { + if (method === "turn/start") { + throw new Error("Server exploded"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-propagate", TEST_LOG_DIR, () => {}); + + await expect( + runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }), + ).rejects.toThrow("Server exploded"); + }); +}); + +// --------------------------------------------------------------------------- +// updateThreadStatus +// --------------------------------------------------------------------------- + +const TEST_THREADS_FILE = join(tmpdir(), "codex-collab-test-threads", "threads.json"); + +describe("updateThreadStatus", () => { + beforeEach(() => { + const dir = join(tmpdir(), "codex-collab-test-threads"); + if (existsSync(dir)) rmSync(dir, { recursive: true }); + mkdirSync(dir, { recursive: true }); + }); + + test("updates status and timestamp", () => { + saveThreadMapping(TEST_THREADS_FILE, { + abc12345: { + threadId: "thr-1", + createdAt: "2026-01-01T00:00:00Z", + lastStatus: "running", + }, + }); + + updateThreadStatus(TEST_THREADS_FILE, "thr-1", "completed"); + const loaded = loadThreadMapping(TEST_THREADS_FILE); + expect(loaded.abc12345.lastStatus).toBe("completed"); + expect(loaded.abc12345.updatedAt).toBeDefined(); + }); + + test("warns on unknown thread", () => { + saveThreadMapping(TEST_THREADS_FILE, {}); + const warnings: string[] = []; + const origError = console.error; + console.error = (msg: string) => warnings.push(msg); + updateThreadStatus(TEST_THREADS_FILE, "thr-unknown", "running"); + console.error = origError; + expect(warnings.some((w) => w.includes("unknown thread"))).toBe(true); + }); +}); + describe("approval wiring", () => { test("approval requests are routed through the handler", async () => { const approvalCalls: string[] = []; @@ -488,6 +704,7 @@ describe("approval wiring", () => { dispatcher, approvalHandler: mockApprovalHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(approvalCalls).toContain("cmd:sudo rm -rf"); diff --git a/src/turns.ts b/src/turns.ts index 61ff83e..a6dd3c0 100644 --- a/src/turns.ts +++ b/src/turns.ts @@ -1,5 +1,7 @@ // src/turns.ts — Turn lifecycle (runTurn, runReview) +import { existsSync, unlinkSync } from "fs"; +import { join } from "path"; import type { AppServerClient } from "./protocol"; import type { UserInput, TurnStartParams, TurnStartResponse, TurnCompletedParams, @@ -11,6 +13,7 @@ import type { } from "./types"; import type { EventDispatcher } from "./events"; import type { ApprovalHandler } from "./approvals"; +import { config } from "./config"; export interface TurnOptions { dispatcher: EventDispatcher; @@ -20,6 +23,8 @@ export interface TurnOptions { model?: string; effort?: ReasoningEffort; approvalPolicy?: ApprovalPolicy; + /** Directory for kill signal files. Defaults to config.killSignalsDir. */ + killSignalsDir?: string; } export interface ReviewOptions extends TurnOptions { @@ -67,6 +72,14 @@ export async function runReview( return executeTurn(client, "review/start", params, opts); } +/** Error thrown when a kill signal file is detected during turn execution. */ +class KillSignalError extends Error { + constructor(public readonly threadId: string) { + super(`Thread ${threadId} killed by user`); + this.name = "KillSignalError"; + } +} + /** * Shared turn lifecycle: register handlers, send the start request, * wait for completion, collect results, and clean up. @@ -80,6 +93,10 @@ async function executeTurn( const startTime = Date.now(); opts.dispatcher.reset(); + const signalsDir = opts.killSignalsDir ?? config.killSignalsDir; + const threadId = params.threadId; + const signalPath = join(signalsDir, threadId); + // AbortController for cancelling in-flight approval polls on turn completion/timeout const abortController = new AbortController(); const unsubs = registerEventHandlers(client, opts, abortController.signal); @@ -94,9 +111,39 @@ async function executeTurn( const completion = createTurnCompletionAwaiter(client, opts.timeoutMs); unsubs.push(completion.unsubscribe); + // AbortController specifically for kill signal polling — aborted when + // the turn completes normally or on timeout so the poll interval stops. + const killAbort = new AbortController(); + + // Clear stale signal file immediately before starting the poll to minimize + // the TOCTOU window between cleanup and detection. + try { unlinkSync(signalPath); } catch (e) { + if ((e as NodeJS.ErrnoException).code !== "ENOENT") { + console.error(`[codex] Warning: could not remove stale kill signal: ${e instanceof Error ? e.message : String(e)}`); + } + } + + // Start kill signal polling before the request so kills are detected even + // if turn/start is slow or stuck. + const killSignal = createKillSignalAwaiter( + threadId, signalsDir, 500, killAbort.signal, + ); + killSignal.catch((e) => { + if (!(e instanceof KillSignalError)) { + console.error(`[codex] Unexpected error in kill signal awaiter: ${e instanceof Error ? e.message : String(e)}`); + } + }); + try { - const { turn } = await client.request(method, params); - const completedTurn = await completion.waitFor(turn.id); + const { turn } = await Promise.race([ + client.request(method, params), + killSignal, + ]); + + const completedTurn = await Promise.race([ + completion.waitFor(turn.id), + killSignal, + ]); opts.dispatcher.flushOutput(); opts.dispatcher.flush(); @@ -115,9 +162,30 @@ async function executeTurn( error: completedTurn.turn.error?.message, durationMs: Date.now() - startTime, }; + } catch (e) { + if (e instanceof KillSignalError) { + opts.dispatcher.flushOutput(); + opts.dispatcher.flush(); + return { + status: "interrupted", + output: opts.dispatcher.getAccumulatedOutput(), + filesChanged: opts.dispatcher.getFilesChanged(), + commandsRun: opts.dispatcher.getCommandsRun(), + error: "Thread killed by user", + durationMs: Date.now() - startTime, + }; + } + throw e; } finally { + killAbort.abort(); abortController.abort(); for (const unsub of unsubs) unsub(); + // Clean up signal file + try { unlinkSync(signalPath); } catch (e) { + if ((e as NodeJS.ErrnoException).code !== "ENOENT") { + console.error(`[codex] Warning: could not clean up kill signal: ${e instanceof Error ? e.message : String(e)}`); + } + } } } @@ -193,6 +261,44 @@ function registerEventHandlers(client: AppServerClient, opts: TurnOptions, signa return unsubs; } +/** + * Create a promise that rejects with KillSignalError when a kill signal file + * appears for the given thread. Polls the filesystem at the given interval. + * Stops polling when the provided AbortSignal fires (i.e. when the turn finishes for any reason). + */ +function createKillSignalAwaiter( + threadId: string, + signalsDir: string, + pollIntervalMs: number, + signal: AbortSignal, +): Promise { + return new Promise((_resolve, reject) => { + // Check immediately + if (existsSync(join(signalsDir, threadId))) { + reject(new KillSignalError(threadId)); + return; + } + + const timer = setInterval(() => { + try { + if (signal.aborted) { + clearInterval(timer); + return; + } + if (existsSync(join(signalsDir, threadId))) { + clearInterval(timer); + reject(new KillSignalError(threadId)); + } + } catch (e) { + clearInterval(timer); + console.error(`[codex] Warning: kill signal polling failed: ${e instanceof Error ? e.message : String(e)}`); + } + }, pollIntervalMs); + + signal.addEventListener("abort", () => clearInterval(timer), { once: true }); + }); +} + /** * Create a turn/completed awaiter that buffers events from the moment it's * created. Call waitFor(turnId) after the request to resolve with the matching diff --git a/src/types.ts b/src/types.ts index cc656c6..d89d17d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -444,7 +444,7 @@ export interface ThreadMappingEntry { createdAt: string; model?: string; cwd?: string; - lastStatus?: "running" | "completed" | "failed" | "interrupted" | "killed"; + lastStatus?: "running" | "completed" | "failed" | "interrupted"; updatedAt?: string; } From 5a20ee3c8efaefe837638ff47aa330d286ccc52c Mon Sep 17 00:00:00 2001 From: Yingjie Qi Date: Mon, 2 Mar 2026 17:14:22 +0800 Subject: [PATCH 2/5] fix: await process exit on Windows to prevent background task hangs close() on Windows was returning immediately after taskkill without awaiting proc.exited or readLoop, leaving dangling promises that kept the event loop alive. This caused background tasks to stay "running" long after their output had completed. Now awaits process exit with a 3s timeout on Windows (matching the Unix path). Removes the 1s afterEach delay workaround from protocol tests since close() properly cleans up on all platforms. --- src/protocol.test.ts | 10 ++-------- src/protocol.ts | 8 ++++---- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/protocol.test.ts b/src/protocol.test.ts index 2d580fc..8be65ae 100644 --- a/src/protocol.test.ts +++ b/src/protocol.test.ts @@ -239,14 +239,8 @@ describe("parseMessage", () => { // Each test manages its own client lifecycle to avoid dangling-process races // when bun runs tests concurrently within a describe block. describe("AppServerClient", () => { - // On Windows, bun's test runner doesn't fully await async finally blocks before moving - // to the next test. close() takes ~400ms on Windows (dominated by taskkill process tree - // cleanup). This delay ensures close() completes before the next test spawns a mock server. - afterEach(async () => { - if (process.platform === "win32") { - await new Promise((r) => setTimeout(r, 1000)); - } - }); + // close() now properly awaits process exit on all platforms, so no + // inter-test delay is needed. test("connect performs initialize handshake and returns userAgent", async () => { const c = await connect({ diff --git a/src/protocol.ts b/src/protocol.ts index 4f9d510..7a61aae 100644 --- a/src/protocol.ts +++ b/src/protocol.ts @@ -383,10 +383,6 @@ export async function connect(opts?: ConnectOptions): Promise { // proc.kill(). This order matters: if codex is a .cmd wrapper, killing // the direct child first removes the PID that taskkill needs to traverse // the tree, potentially leaving the real app-server alive. - // Note: we intentionally do NOT await readLoop/proc.exited here because - // Bun's test runner on Windows doesn't fully await async finally blocks, - // and the extra latency causes inter-test process races. The process is - // already dead after taskkill+kill, so the dangling promise is benign. if (proc.pid) { try { const r = spawnSync("taskkill", ["/PID", String(proc.pid), "/T", "/F"], { stdio: "pipe", timeout: 5000 }); @@ -404,6 +400,10 @@ export async function connect(opts?: ConnectOptions): Promise { console.error(`[codex] Warning: proc.kill() failed: ${e instanceof Error ? e.message : String(e)}`); } } + // Wait for the process to fully exit so dangling readLoop / proc.exited + // promises don't keep the event loop alive (which blocks background tasks + // from reporting completion). + if (await waitForExit(3000)) { await readLoop; } return; } From 6810bc1e1defef5e0a739ea1c998217af444c0c6 Mon Sep 17 00:00:00 2001 From: Yingjie Qi Date: Mon, 2 Mar 2026 17:33:15 +0800 Subject: [PATCH 3/5] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20?= =?UTF-8?q?PID=20file=20absence=20and=20stale=20signal=20race?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. isProcessAlive: return true (assume alive) when PID file is missing instead of false. Prevents cmdJobs from marking live threads as interrupted when PID file write failed or predates the PID tracking feature. 2. Stale signal cleanup: use timestamp comparison instead of unconditional unlink. Only removes signal files older than the current process, preserving fresh kill requests that arrive between process start and poll start. --- src/cli.ts | 19 +++++++++++++++---- src/turns.test.ts | 10 +++++++--- src/turns.ts | 15 ++++++++++----- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/cli.ts b/src/cli.ts index 34db73e..e9b4c5d 100755 --- a/src/cli.ts +++ b/src/cli.ts @@ -436,17 +436,28 @@ function removePidFile(shortId: string): void { } } -/** Check if the process that owns a thread is still alive. */ +/** Check if the process that owns a thread is still alive. + * Returns true (assume alive) when the PID file is missing — the thread may + * have been started before PID tracking existed, or PID file write may have + * failed. Only returns false when we have a PID and can confirm the process + * is gone (ESRCH). */ function isProcessAlive(shortId: string): boolean { const pidPath = join(config.pidsDir, shortId); + let pid: number; + try { + pid = Number(readFileSync(pidPath, "utf-8").trim()); + } catch (e) { + if ((e as NodeJS.ErrnoException).code === "ENOENT") return true; // no PID file → assume alive + console.error(`[codex] Warning: could not read PID file for ${shortId}: ${e instanceof Error ? e.message : String(e)}`); + return true; + } + if (!Number.isFinite(pid) || pid <= 0) return false; try { - const pid = Number(readFileSync(pidPath, "utf-8").trim()); - if (!Number.isFinite(pid) || pid <= 0) return false; process.kill(pid, 0); // signal 0 = existence check return true; } catch (e) { const code = (e as NodeJS.ErrnoException).code; - if (code === "ESRCH" || code === "ENOENT") return false; + if (code === "ESRCH") return false; // process confirmed dead if (code === "EPERM") return true; // process exists but we can't signal it // Unexpected error — assume alive to avoid incorrectly marking live threads as dead console.error(`[codex] Warning: could not check process for ${shortId}: ${e instanceof Error ? e.message : String(e)}`); diff --git a/src/turns.test.ts b/src/turns.test.ts index 2b70c74..13d9e84 100644 --- a/src/turns.test.ts +++ b/src/turns.test.ts @@ -8,7 +8,7 @@ import type { TurnCompletedParams, TurnStartResponse, ReviewStartResponse, } from "./types"; -import { mkdirSync, rmSync, existsSync, writeFileSync } from "fs"; +import { mkdirSync, rmSync, existsSync, writeFileSync, utimesSync } from "fs"; import { join } from "path"; import { tmpdir } from "os"; import { @@ -526,8 +526,12 @@ describe("kill signal", () => { }); test("stale signal cleared at turn start — turn completes normally", async () => { - // Pre-write a stale signal file (simulates leftover from a previous kill) - writeFileSync(join(TEST_KILL_DIR, "thr-1"), "", { mode: 0o600 }); + // Pre-write a stale signal file (simulates leftover from a previous kill). + // Backdate its mtime so it looks like it predates the current process. + const stalePath = join(TEST_KILL_DIR, "thr-1"); + writeFileSync(stalePath, "", { mode: 0o600 }); + const old = new Date(Date.now() - 60_000); + utimesSync(stalePath, old, old); const { client, emit } = buildMockClient((method) => { if (method === "turn/start") { diff --git a/src/turns.ts b/src/turns.ts index a6dd3c0..544a91f 100644 --- a/src/turns.ts +++ b/src/turns.ts @@ -1,6 +1,6 @@ // src/turns.ts — Turn lifecycle (runTurn, runReview) -import { existsSync, unlinkSync } from "fs"; +import { existsSync, statSync, unlinkSync } from "fs"; import { join } from "path"; import type { AppServerClient } from "./protocol"; import type { @@ -115,11 +115,16 @@ async function executeTurn( // the turn completes normally or on timeout so the poll interval stops. const killAbort = new AbortController(); - // Clear stale signal file immediately before starting the poll to minimize - // the TOCTOU window between cleanup and detection. - try { unlinkSync(signalPath); } catch (e) { + // Remove signal files left over from a previous (crashed) run, but preserve + // fresh signals written by a concurrent `kill` targeting this thread. + // Heuristic: files created before this process started are stale. + const processStartMs = Date.now() - process.uptime() * 1000; + try { + const st = statSync(signalPath); + if (st.mtimeMs < processStartMs) unlinkSync(signalPath); + } catch (e) { if ((e as NodeJS.ErrnoException).code !== "ENOENT") { - console.error(`[codex] Warning: could not remove stale kill signal: ${e instanceof Error ? e.message : String(e)}`); + console.error(`[codex] Warning: could not check/remove stale kill signal: ${e instanceof Error ? e.message : String(e)}`); } } From d69fad0cd9b4b5d316424f0eae78b4c36f540354 Mon Sep 17 00:00:00 2001 From: Yingjie Qi Date: Mon, 2 Mar 2026 17:39:57 +0800 Subject: [PATCH 4/5] fix: improve kill signal polling resilience and test robustness 1. Kill signal poll: keep retrying on transient errors instead of clearing the interval and leaving a never-settling promise that silently disables kill monitoring for the rest of the turn. 2. isProcessAlive: log warning when PID file contains invalid content so corrupted files leave a diagnostic trail. 3. Add boundary test: fresh signal file (current mtime) is preserved at turn start and triggers interrupted status. 4. Fix stale signal test: backdate relative to process.uptime() instead of fixed 60s offset to avoid flakiness when process has been running longer than 60s (CI, watch mode). --- src/cli.ts | 5 ++++- src/turns.test.ts | 32 +++++++++++++++++++++++++++++--- src/turns.ts | 4 ++-- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/src/cli.ts b/src/cli.ts index e9b4c5d..97d2a0f 100755 --- a/src/cli.ts +++ b/src/cli.ts @@ -451,7 +451,10 @@ function isProcessAlive(shortId: string): boolean { console.error(`[codex] Warning: could not read PID file for ${shortId}: ${e instanceof Error ? e.message : String(e)}`); return true; } - if (!Number.isFinite(pid) || pid <= 0) return false; + if (!Number.isFinite(pid) || pid <= 0) { + console.error(`[codex] Warning: PID file for ${shortId} contains invalid value`); + return false; + } try { process.kill(pid, 0); // signal 0 = existence check return true; diff --git a/src/turns.test.ts b/src/turns.test.ts index 13d9e84..5da7933 100644 --- a/src/turns.test.ts +++ b/src/turns.test.ts @@ -527,11 +527,11 @@ describe("kill signal", () => { test("stale signal cleared at turn start — turn completes normally", async () => { // Pre-write a stale signal file (simulates leftover from a previous kill). - // Backdate its mtime so it looks like it predates the current process. + // Backdate its mtime to before process start so it's treated as stale. const stalePath = join(TEST_KILL_DIR, "thr-1"); writeFileSync(stalePath, "", { mode: 0o600 }); - const old = new Date(Date.now() - 60_000); - utimesSync(stalePath, old, old); + const beforeProcessStart = new Date(Date.now() - process.uptime() * 1000 - 10_000); + utimesSync(stalePath, beforeProcessStart, beforeProcessStart); const { client, emit } = buildMockClient((method) => { if (method === "turn/start") { @@ -553,6 +553,32 @@ describe("kill signal", () => { expect(result.status).toBe("completed"); }); + test("fresh signal NOT cleared at turn start — turn is interrupted", async () => { + // Write a signal file with current mtime (simulates a concurrent kill). + // No backdating — mtime is after process start, so it should be preserved. + writeFileSync(join(TEST_KILL_DIR, "thr-1"), "", { mode: 0o600 }); + + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + setTimeout(() => emit("turn/completed", completedTurn("turn-1")), 50); + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-fresh-kill", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("interrupted"); + expect(result.error).toBe("Thread killed by user"); + }); + test("normal completion wins race — no kill signal", async () => { const { client, emit } = buildMockClient((method) => { if (method === "turn/start") { diff --git a/src/turns.ts b/src/turns.ts index 544a91f..ff0ae11 100644 --- a/src/turns.ts +++ b/src/turns.ts @@ -295,8 +295,8 @@ function createKillSignalAwaiter( reject(new KillSignalError(threadId)); } } catch (e) { - clearInterval(timer); - console.error(`[codex] Warning: kill signal polling failed: ${e instanceof Error ? e.message : String(e)}`); + // Log but keep polling — the error may be transient (e.g. momentary EACCES). + console.error(`[codex] Warning: kill signal poll error (will retry): ${e instanceof Error ? e.message : String(e)}`); } }, pollIntervalMs); From 9cccb65d03dda531093dba6f1236c342c7c8142d Mon Sep 17 00:00:00 2001 From: Yingjie Qi Date: Mon, 2 Mar 2026 18:02:26 +0800 Subject: [PATCH 5/5] fix: only mark thread as interrupted when kill signal was delivered When both kill mechanisms fail (signal file write + server interrupt), keep the thread status as "running" so the user can retry. Previously the status was unconditionally set to "interrupted", which blocked subsequent kill attempts via the "already interrupted" guard. --- src/cli.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cli.ts b/src/cli.ts index 97d2a0f..53081db 100755 --- a/src/cli.ts +++ b/src/cli.ts @@ -789,12 +789,12 @@ async function cmdKill(positional: string[]) { } }); - updateThreadStatus(config.threadsFile, threadId, "interrupted"); - if (shortId) removePidFile(shortId); if (killSignalWritten || serverInterrupted) { + updateThreadStatus(config.threadsFile, threadId, "interrupted"); + if (shortId) removePidFile(shortId); progress(`Stopped thread ${id}`); } else { - progress(`Marked thread ${id} as interrupted, but could not signal the running process`); + progress(`Could not signal thread ${id} — try again.`); } }