diff --git a/SKILL.md b/SKILL.md index 7a035fd..72987f2 100644 --- a/SKILL.md +++ b/SKILL.md @@ -32,7 +32,11 @@ codex-collab run --resume "now check the error handling" --content-only codex-collab run "investigate the auth module" -d /path/to/project --content-only ``` -**IMPORTANT: Always use `run_in_background=true` and `dangerouslyDisableSandbox=true`** for all `codex-collab` Bash commands. Tasks take minutes, and the tool writes to `~/.codex-collab/` which is outside the sandbox allowlist. You will be notified automatically when the command finishes. After launching, tell the user it's running and end your turn. Do NOT use TaskOutput, block, poll, wait, or spawn an agent to monitor the result — the background task notification handles this automatically. +**IMPORTANT: Always use `dangerouslyDisableSandbox=true`** for all `codex-collab` Bash commands — the tool writes to `~/.codex-collab/` which is outside the sandbox allowlist. + +For **`run` and `review`** commands, also use `run_in_background=true` — these take minutes. You will be notified automatically when the command finishes. After launching, tell the user it's running and end your turn. Do NOT use TaskOutput, block, poll, wait, or spawn an agent to monitor the result — the background task notification handles this automatically. + +For **all other commands** (`kill`, `jobs`, `progress`, `output`, `approve`, `decline`, `clean`, `delete`, `models`, `health`), run in the **foreground** — they complete in seconds. If the user asks about progress mid-task, use `progress` to check the recent activity: @@ -67,7 +71,7 @@ codex-collab review --resume -d /path/to/project --content-only Review modes: `pr` (default), `uncommitted`, `commit` -**IMPORTANT: Always use `run_in_background=true` and `dangerouslyDisableSandbox=true`** — reviews typically take 5-20 minutes. You will be notified automatically when done. After launching, tell the user it's running and end your turn. Do NOT use TaskOutput, block, poll, wait, or spawn an agent to monitor the result — the background task notification handles this automatically. +**IMPORTANT: Use `run_in_background=true` and `dangerouslyDisableSandbox=true`** — reviews typically take 5-20 minutes. You will be notified automatically when done. After launching, tell the user it's running and end your turn. Do NOT use TaskOutput, block, poll, wait, or spawn an agent to monitor the result — the background task notification handles this automatically. ## Context Efficiency @@ -165,7 +169,7 @@ codex-collab progress # Recent activity (tail of log) ```bash codex-collab jobs # List threads codex-collab jobs --json # List threads (JSON) -codex-collab kill # Interrupt and archive thread +codex-collab kill # Stop a running thread codex-collab delete # Archive thread, delete local files codex-collab clean # Delete old logs and stale mappings ``` diff --git a/src/cli.test.ts b/src/cli.test.ts index d9a3738..8de19d0 100644 --- a/src/cli.test.ts +++ b/src/cli.test.ts @@ -40,9 +40,10 @@ describe("CLI valid commands", () => { }); it("health command runs without crashing", () => { - // May fail if codex not installed, but should not crash with unhandled exception + // May fail if codex not installed, but should not crash with unhandled exception. + // Exit code 143 = SIGTERM during app-server cleanup (our signal handler). const { exitCode } = run("health"); - expect([0, 1]).toContain(exitCode); + expect([0, 1, 143]).toContain(exitCode); }); }); diff --git a/src/cli.ts b/src/cli.ts index bf2f246..53081db 100755 --- a/src/cli.ts +++ b/src/cli.ts @@ -45,18 +45,34 @@ import type { } from "./types"; // --------------------------------------------------------------------------- -// SIGINT handler — clean up spawned app-server on Ctrl+C +// Signal handlers — clean up spawned app-server and update thread status // --------------------------------------------------------------------------- let activeClient: AppServerClient | undefined; +let activeThreadId: string | undefined; +let activeShortId: string | undefined; let shuttingDown = false; -process.on("SIGINT", async () => { +async function handleShutdownSignal(exitCode: number): Promise { if (shuttingDown) { - process.exit(130); + process.exit(exitCode); } shuttingDown = true; console.error("[codex] Shutting down..."); + + // Update thread status and clean up PID file synchronously before async + // cleanup — ensures the mapping is written even if client.close() hangs. + if (activeThreadId) { + try { + updateThreadStatus(config.threadsFile, activeThreadId, "interrupted"); + } catch (e) { + console.error(`[codex] Warning: could not update thread status during shutdown: ${e instanceof Error ? e.message : String(e)}`); + } + } + if (activeShortId) { + removePidFile(activeShortId); + } + try { if (activeClient) { await activeClient.close(); @@ -64,8 +80,11 @@ process.on("SIGINT", async () => { } catch (e) { console.error(`[codex] Warning: cleanup failed: ${e instanceof Error ? e.message : String(e)}`); } - process.exit(130); -}); + process.exit(exitCode); +} + +process.on("SIGINT", () => handleShutdownSignal(130)); +process.on("SIGTERM", () => handleShutdownSignal(143)); // --------------------------------------------------------------------------- // Argument parsing @@ -397,6 +416,58 @@ function pluralize(n: number, word: string): string { return `${n} ${word}${n === 1 ? "" : "s"}`; } +/** Write a PID file for the current process so cmdJobs can detect stale "running" status. */ +function writePidFile(shortId: string): void { + try { + writeFileSync(join(config.pidsDir, shortId), String(process.pid), { mode: 0o600 }); + } catch (e) { + console.error(`[codex] Warning: could not write PID file: ${e instanceof Error ? e.message : String(e)}`); + } +} + +/** Remove the PID file for a thread. */ +function removePidFile(shortId: string): void { + try { + unlinkSync(join(config.pidsDir, shortId)); + } catch (e) { + if ((e as NodeJS.ErrnoException).code !== "ENOENT") { + console.error(`[codex] Warning: could not remove PID file: ${e instanceof Error ? e.message : String(e)}`); + } + } +} + +/** Check if the process that owns a thread is still alive. + * Returns true (assume alive) when the PID file is missing — the thread may + * have been started before PID tracking existed, or PID file write may have + * failed. Only returns false when we have a PID and can confirm the process + * is gone (ESRCH). */ +function isProcessAlive(shortId: string): boolean { + const pidPath = join(config.pidsDir, shortId); + let pid: number; + try { + pid = Number(readFileSync(pidPath, "utf-8").trim()); + } catch (e) { + if ((e as NodeJS.ErrnoException).code === "ENOENT") return true; // no PID file → assume alive + console.error(`[codex] Warning: could not read PID file for ${shortId}: ${e instanceof Error ? e.message : String(e)}`); + return true; + } + if (!Number.isFinite(pid) || pid <= 0) { + console.error(`[codex] Warning: PID file for ${shortId} contains invalid value`); + return false; + } + try { + process.kill(pid, 0); // signal 0 = existence check + return true; + } catch (e) { + const code = (e as NodeJS.ErrnoException).code; + if (code === "ESRCH") return false; // process confirmed dead + if (code === "EPERM") return true; // process exists but we can't signal it + // Unexpected error — assume alive to avoid incorrectly marking live threads as dead + console.error(`[codex] Warning: could not check process for ${shortId}: ${e instanceof Error ? e.message : String(e)}`); + return true; + } +} + // --------------------------------------------------------------------------- // Commands // --------------------------------------------------------------------------- @@ -489,6 +560,9 @@ async function cmdRun(positional: string[], opts: Options) { } updateThreadStatus(config.threadsFile, threadId, "running"); + activeThreadId = threadId; + activeShortId = shortId; + writePidFile(shortId); const dispatcher = createDispatcher(shortId, opts); @@ -510,6 +584,10 @@ async function cmdRun(positional: string[], opts: Options) { } catch (e) { updateThreadStatus(config.threadsFile, threadId, "failed"); throw e; + } finally { + activeThreadId = undefined; + activeShortId = undefined; + removePidFile(shortId); } }); @@ -535,6 +613,9 @@ async function cmdReview(positional: string[], opts: Options) { } updateThreadStatus(config.threadsFile, threadId, "running"); + activeThreadId = threadId; + activeShortId = shortId; + writePidFile(shortId); const dispatcher = createDispatcher(shortId, opts); @@ -553,6 +634,10 @@ async function cmdReview(positional: string[], opts: Options) { } catch (e) { updateThreadStatus(config.threadsFile, threadId, "failed"); throw e; + } finally { + activeThreadId = undefined; + activeShortId = undefined; + removePidFile(shortId); } }); @@ -592,6 +677,18 @@ async function cmdJobs(opts: Options) { // Only show threads created by this CLI (those with a local short ID mapping) let localThreads = allThreads.filter((t) => reverseMap.has(t.id)); + + // Detect stale "running" status: if the owning process is dead, mark as interrupted. + for (const t of localThreads) { + const sid = reverseMap.get(t.id)!; + const entry = mapping[sid]; + if (entry?.lastStatus === "running" && !isProcessAlive(sid)) { + updateThreadStatus(config.threadsFile, t.id, "interrupted"); + entry.lastStatus = "interrupted"; + removePidFile(sid); + } + } + if (opts.limit !== Infinity) localThreads = localThreads.slice(0, opts.limit); if (opts.json) { @@ -634,19 +731,35 @@ async function cmdKill(positional: string[]) { validateIdOrDie(id); const threadId = resolveThreadId(config.threadsFile, id); - - // Check local status — skip server operations if already killed - const mapping = loadThreadMapping(config.threadsFile); const shortId = findShortId(config.threadsFile, threadId); - const localStatus = shortId ? mapping[shortId]?.lastStatus : undefined; - if (localStatus === "killed") { - progress(`Thread ${id} is already killed`); - return; + // Skip kill for threads that have already reached a terminal status + if (shortId) { + const mapping = loadThreadMapping(config.threadsFile); + const localStatus = mapping[shortId]?.lastStatus; + if (localStatus && localStatus !== "running") { + progress(`Thread ${id} is already ${localStatus}`); + return; + } } - const archived = await withClient(async (client) => { - // Try to read thread status first and interrupt active turn if any + // Write kill signal file so the running process can detect the kill + let killSignalWritten = false; + const signalPath = join(config.killSignalsDir, threadId); + try { + writeFileSync(signalPath, "", { mode: 0o600 }); + killSignalWritten = true; + } catch (e) { + console.error( + `[codex] Warning: could not write kill signal: ${e instanceof Error ? e.message : String(e)}. ` + + `The running process may not detect the kill.`, + ); + } + + // Try to interrupt the active turn on the server (immediate effect). + // The kill signal file handles the case where the run process is polling. + let serverInterrupted = false; + await withClient(async (client) => { try { const { thread } = await client.request<{ thread: { @@ -665,6 +778,7 @@ async function cmdKill(positional: string[]) { threadId, turnId: activeTurn.id, }); + serverInterrupted = true; progress(`Interrupted turn ${activeTurn.id}`); } } @@ -673,15 +787,14 @@ async function cmdKill(positional: string[]) { console.error(`[codex] Warning: could not read/interrupt thread: ${e.message}`); } } - - return tryArchive(client, threadId); }); - updateThreadStatus(config.threadsFile, threadId, "killed"); - if (archived === "failed") { - progress(`Killed thread ${id} (server archive failed)`); + if (killSignalWritten || serverInterrupted) { + updateThreadStatus(config.threadsFile, threadId, "interrupted"); + if (shortId) removePidFile(shortId); + progress(`Stopped thread ${id}`); } else { - progress(`Killed thread ${id}`); + progress(`Could not signal thread ${id} — try again.`); } } @@ -802,6 +915,8 @@ async function cmdClean() { const logsDeleted = deleteOldFiles(config.logsDir, sevenDaysMs); const approvalsDeleted = deleteOldFiles(config.approvalsDir, oneDayMs); + const killSignalsDeleted = deleteOldFiles(config.killSignalsDir, oneDayMs); + const pidsDeleted = deleteOldFiles(config.pidsDir, oneDayMs); // Clean stale thread mappings — use log file mtime as proxy for last // activity so recently-used threads aren't pruned just because they @@ -835,6 +950,10 @@ async function cmdClean() { if (logsDeleted > 0) parts.push(`${logsDeleted} log files deleted`); if (approvalsDeleted > 0) parts.push(`${approvalsDeleted} approval files deleted`); + if (killSignalsDeleted > 0) + parts.push(`${killSignalsDeleted} kill signal files deleted`); + if (pidsDeleted > 0) + parts.push(`${pidsDeleted} stale PID files deleted`); if (mappingsRemoved > 0) parts.push(`${mappingsRemoved} stale mappings removed`); @@ -853,9 +972,54 @@ async function cmdDelete(positional: string[]) { const threadId = resolveThreadId(config.threadsFile, id); const shortId = findShortId(config.threadsFile, threadId); + // If the thread is currently running, stop it first before archiving + const localStatus = shortId ? loadThreadMapping(config.threadsFile)[shortId]?.lastStatus : undefined; + if (localStatus === "running") { + const signalPath = join(config.killSignalsDir, threadId); + try { + writeFileSync(signalPath, "", { mode: 0o600 }); + } catch (e) { + console.error( + `[codex] Warning: could not write kill signal: ${e instanceof Error ? e.message : String(e)}. ` + + `The running process may not detect the delete.`, + ); + } + } + let archiveResult: "archived" | "already_done" | "failed" = "failed"; try { - archiveResult = await withClient((client) => tryArchive(client, threadId)); + archiveResult = await withClient(async (client) => { + // Interrupt active turn before archiving (only if running) + if (localStatus === "running") { + try { + const { thread } = await client.request<{ + thread: { + id: string; + status: { type: string }; + turns: Array<{ id: string; status: string }>; + }; + }>("thread/read", { threadId, includeTurns: true }); + + if (thread.status.type === "active") { + const activeTurn = thread.turns?.find( + (t) => t.status === "inProgress", + ); + if (activeTurn) { + await client.request("turn/interrupt", { + threadId, + turnId: activeTurn.id, + }); + } + } + } catch (e) { + if (e instanceof Error && !e.message.includes("not found") && !e.message.includes("archived")) { + console.error(`[codex] Warning: could not read/interrupt thread during delete: ${e.message}`); + } + } + } + + return tryArchive(client, threadId); + }); } catch (e) { if (e instanceof Error && !e.message.includes("not found")) { console.error(`[codex] Warning: could not archive on server: ${e.message}`); @@ -863,6 +1027,7 @@ async function cmdDelete(positional: string[]) { } if (shortId) { + removePidFile(shortId); const logPath = join(config.logsDir, `${shortId}.log`); if (existsSync(logPath)) unlinkSync(logPath); removeThread(config.threadsFile, shortId); @@ -912,7 +1077,7 @@ Commands: review [opts] Run code review (PR-style by default) review "instructions" Custom review with specific focus jobs [--json] [--all] List threads (--limit to cap) - kill Interrupt and archive thread + kill Stop a running thread output Read full log for thread progress Show recent activity for thread models List available models @@ -957,6 +1122,8 @@ Examples: function ensureDataDirs(): void { mkdirSync(config.logsDir, { recursive: true }); mkdirSync(config.approvalsDir, { recursive: true }); + mkdirSync(config.killSignalsDir, { recursive: true }); + mkdirSync(config.pidsDir, { recursive: true }); } async function main() { diff --git a/src/config.ts b/src/config.ts index 3d22ac0..896ecba 100644 --- a/src/config.ts +++ b/src/config.ts @@ -36,6 +36,8 @@ export const config = { get threadsFile() { return join(this.dataDir, "threads.json"); }, get logsDir() { return join(this.dataDir, "logs"); }, get approvalsDir() { return join(this.dataDir, "approvals"); }, + get killSignalsDir() { return join(this.dataDir, "kill-signals"); }, + get pidsDir() { return join(this.dataDir, "pids"); }, // Display jobsListLimit: 20, diff --git a/src/protocol.test.ts b/src/protocol.test.ts index 2d580fc..8be65ae 100644 --- a/src/protocol.test.ts +++ b/src/protocol.test.ts @@ -239,14 +239,8 @@ describe("parseMessage", () => { // Each test manages its own client lifecycle to avoid dangling-process races // when bun runs tests concurrently within a describe block. describe("AppServerClient", () => { - // On Windows, bun's test runner doesn't fully await async finally blocks before moving - // to the next test. close() takes ~400ms on Windows (dominated by taskkill process tree - // cleanup). This delay ensures close() completes before the next test spawns a mock server. - afterEach(async () => { - if (process.platform === "win32") { - await new Promise((r) => setTimeout(r, 1000)); - } - }); + // close() now properly awaits process exit on all platforms, so no + // inter-test delay is needed. test("connect performs initialize handshake and returns userAgent", async () => { const c = await connect({ diff --git a/src/protocol.ts b/src/protocol.ts index 4f9d510..7a61aae 100644 --- a/src/protocol.ts +++ b/src/protocol.ts @@ -383,10 +383,6 @@ export async function connect(opts?: ConnectOptions): Promise { // proc.kill(). This order matters: if codex is a .cmd wrapper, killing // the direct child first removes the PID that taskkill needs to traverse // the tree, potentially leaving the real app-server alive. - // Note: we intentionally do NOT await readLoop/proc.exited here because - // Bun's test runner on Windows doesn't fully await async finally blocks, - // and the extra latency causes inter-test process races. The process is - // already dead after taskkill+kill, so the dangling promise is benign. if (proc.pid) { try { const r = spawnSync("taskkill", ["/PID", String(proc.pid), "/T", "/F"], { stdio: "pipe", timeout: 5000 }); @@ -404,6 +400,10 @@ export async function connect(opts?: ConnectOptions): Promise { console.error(`[codex] Warning: proc.kill() failed: ${e instanceof Error ? e.message : String(e)}`); } } + // Wait for the process to fully exit so dangling readLoop / proc.exited + // promises don't keep the event loop alive (which blocks background tasks + // from reporting completion). + if (await waitForExit(3000)) { await readLoop; } return; } diff --git a/src/threads.ts b/src/threads.ts index 6b8528e..a7e93ba 100644 --- a/src/threads.ts +++ b/src/threads.ts @@ -3,6 +3,7 @@ import { readFileSync, writeFileSync, existsSync, mkdirSync, renameSync, openSync, closeSync, unlinkSync, statSync } from "fs"; import { randomBytes } from "crypto"; import { dirname } from "path"; +import { validateId } from "./config"; import type { ThreadMapping } from "./types"; /** @@ -127,6 +128,7 @@ export function registerThread( threadId: string, meta?: { model?: string; cwd?: string }, ): ThreadMapping { + validateId(threadId); // ensure safe for use as filename (kill signals, etc.) return withThreadLock(threadsFile, () => { const mapping = loadThreadMapping(threadsFile); let shortId = generateShortId(); @@ -171,16 +173,16 @@ export function findShortId(threadsFile: string, threadId: string): string | nul export function updateThreadStatus( threadsFile: string, threadId: string, - status: "running" | "completed" | "failed" | "interrupted" | "killed", + status: "running" | "completed" | "failed" | "interrupted", ): void { withThreadLock(threadsFile, () => { const mapping = loadThreadMapping(threadsFile); let found = false; for (const entry of Object.values(mapping)) { if (entry.threadId === threadId) { + found = true; entry.lastStatus = status; entry.updatedAt = new Date().toISOString(); - found = true; break; } } diff --git a/src/turns.test.ts b/src/turns.test.ts index 17ff893..5da7933 100644 --- a/src/turns.test.ts +++ b/src/turns.test.ts @@ -8,15 +8,23 @@ import type { TurnCompletedParams, TurnStartResponse, ReviewStartResponse, } from "./types"; -import { mkdirSync, rmSync, existsSync } from "fs"; +import { mkdirSync, rmSync, existsSync, writeFileSync, utimesSync } from "fs"; import { join } from "path"; import { tmpdir } from "os"; +import { + updateThreadStatus, + loadThreadMapping, + saveThreadMapping, +} from "./threads"; const TEST_LOG_DIR = join(tmpdir(), "codex-collab-test-turns"); +const TEST_KILL_DIR = join(tmpdir(), "codex-collab-test-kill-signals"); beforeEach(() => { if (existsSync(TEST_LOG_DIR)) rmSync(TEST_LOG_DIR, { recursive: true }); mkdirSync(TEST_LOG_DIR, { recursive: true }); + if (existsSync(TEST_KILL_DIR)) rmSync(TEST_KILL_DIR, { recursive: true }); + mkdirSync(TEST_KILL_DIR, { recursive: true }); }); // --------------------------------------------------------------------------- @@ -132,6 +140,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -158,6 +167,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -177,6 +187,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("failed"); @@ -202,6 +213,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -218,6 +230,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 200, + killSignalsDir: TEST_KILL_DIR, }); expect(true).toBe(false); // should not reach here } catch (e) { @@ -262,6 +275,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -291,6 +305,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, cwd: "/my/project", model: "gpt-5.3-codex", effort: "high", @@ -327,6 +342,7 @@ describe("runTurn", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(result.status).toBe("completed"); @@ -364,6 +380,7 @@ describe("runReview", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }, ); @@ -391,6 +408,7 @@ describe("runReview", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }, ); @@ -434,6 +452,7 @@ describe("review output via exitedReviewMode", () => { dispatcher, approvalHandler: autoApproveHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }, ); @@ -442,6 +461,233 @@ describe("review output via exitedReviewMode", () => { }); }); +// --------------------------------------------------------------------------- +// Kill signal tests +// --------------------------------------------------------------------------- + +describe("kill signal", () => { + + test("kill signal during slow turn/start returns interrupted result", async () => { + const { client } = buildMockClient((method) => { + if (method === "turn/start") { + // Simulate a slow/stuck turn/start — resolves after 2s + return new Promise((resolve) => + setTimeout(() => resolve(inProgressTurn("turn-1")), 2000), + ); + } + throw new Error(`Unexpected method: ${method}`); + }); + + // Kill signal arrives after 100ms — should win race 1 against the 2s request + setTimeout(() => { + writeFileSync(join(TEST_KILL_DIR, "thr-1"), "", { mode: 0o600 }); + }, 100); + + const dispatcher = new EventDispatcher("test-kill-request", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 10_000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("interrupted"); + expect(result.error).toBe("Thread killed by user"); + expect(existsSync(join(TEST_KILL_DIR, "thr-1"))).toBe(false); + }); + + test("kill signal during turn returns interrupted result", async () => { + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + // Write the kill signal file after a short delay (simulates `codex-collab kill`) + setTimeout(() => { + writeFileSync(join(TEST_KILL_DIR, "thr-1"), "", { mode: 0o600 }); + }, 100); + // Never fire turn/completed — the kill signal should end the turn + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-kill", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 10_000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("interrupted"); + expect(result.error).toBe("Thread killed by user"); + // Signal file should be cleaned up in finally block + expect(existsSync(join(TEST_KILL_DIR, "thr-1"))).toBe(false); + }); + + test("stale signal cleared at turn start — turn completes normally", async () => { + // Pre-write a stale signal file (simulates leftover from a previous kill). + // Backdate its mtime to before process start so it's treated as stale. + const stalePath = join(TEST_KILL_DIR, "thr-1"); + writeFileSync(stalePath, "", { mode: 0o600 }); + const beforeProcessStart = new Date(Date.now() - process.uptime() * 1000 - 10_000); + utimesSync(stalePath, beforeProcessStart, beforeProcessStart); + + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + setTimeout(() => emit("turn/completed", completedTurn("turn-1")), 50); + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-stale-kill", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("completed"); + }); + + test("fresh signal NOT cleared at turn start — turn is interrupted", async () => { + // Write a signal file with current mtime (simulates a concurrent kill). + // No backdating — mtime is after process start, so it should be preserved. + writeFileSync(join(TEST_KILL_DIR, "thr-1"), "", { mode: 0o600 }); + + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + setTimeout(() => emit("turn/completed", completedTurn("turn-1")), 50); + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-fresh-kill", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("interrupted"); + expect(result.error).toBe("Thread killed by user"); + }); + + test("normal completion wins race — no kill signal", async () => { + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + setTimeout(() => emit("turn/completed", completedTurn("turn-1")), 50); + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-no-kill", TEST_LOG_DIR, () => {}); + + const result = await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }); + + expect(result.status).toBe("completed"); + }); + + test("signal file cleaned up on normal completion", async () => { + const { client, emit } = buildMockClient((method) => { + if (method === "turn/start") { + setTimeout(() => emit("turn/completed", completedTurn("turn-1")), 50); + return inProgressTurn("turn-1"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-cleanup", TEST_LOG_DIR, () => {}); + + await runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }); + + // unlinkSync in finally should not error when no signal exists + expect(existsSync(join(TEST_KILL_DIR, "thr-1"))).toBe(false); + }); +}); + +// --------------------------------------------------------------------------- +// Non-kill error propagation +// --------------------------------------------------------------------------- + +describe("error propagation", () => { + test("non-kill errors propagate through the catch block", async () => { + const { client } = buildMockClient((method) => { + if (method === "turn/start") { + throw new Error("Server exploded"); + } + throw new Error(`Unexpected method: ${method}`); + }); + + const dispatcher = new EventDispatcher("test-propagate", TEST_LOG_DIR, () => {}); + + await expect( + runTurn(client, "thr-1", [{ type: "text", text: "hello" }], { + dispatcher, + approvalHandler: autoApproveHandler, + timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, + }), + ).rejects.toThrow("Server exploded"); + }); +}); + +// --------------------------------------------------------------------------- +// updateThreadStatus +// --------------------------------------------------------------------------- + +const TEST_THREADS_FILE = join(tmpdir(), "codex-collab-test-threads", "threads.json"); + +describe("updateThreadStatus", () => { + beforeEach(() => { + const dir = join(tmpdir(), "codex-collab-test-threads"); + if (existsSync(dir)) rmSync(dir, { recursive: true }); + mkdirSync(dir, { recursive: true }); + }); + + test("updates status and timestamp", () => { + saveThreadMapping(TEST_THREADS_FILE, { + abc12345: { + threadId: "thr-1", + createdAt: "2026-01-01T00:00:00Z", + lastStatus: "running", + }, + }); + + updateThreadStatus(TEST_THREADS_FILE, "thr-1", "completed"); + const loaded = loadThreadMapping(TEST_THREADS_FILE); + expect(loaded.abc12345.lastStatus).toBe("completed"); + expect(loaded.abc12345.updatedAt).toBeDefined(); + }); + + test("warns on unknown thread", () => { + saveThreadMapping(TEST_THREADS_FILE, {}); + const warnings: string[] = []; + const origError = console.error; + console.error = (msg: string) => warnings.push(msg); + updateThreadStatus(TEST_THREADS_FILE, "thr-unknown", "running"); + console.error = origError; + expect(warnings.some((w) => w.includes("unknown thread"))).toBe(true); + }); +}); + describe("approval wiring", () => { test("approval requests are routed through the handler", async () => { const approvalCalls: string[] = []; @@ -488,6 +734,7 @@ describe("approval wiring", () => { dispatcher, approvalHandler: mockApprovalHandler, timeoutMs: 5000, + killSignalsDir: TEST_KILL_DIR, }); expect(approvalCalls).toContain("cmd:sudo rm -rf"); diff --git a/src/turns.ts b/src/turns.ts index 61ff83e..ff0ae11 100644 --- a/src/turns.ts +++ b/src/turns.ts @@ -1,5 +1,7 @@ // src/turns.ts — Turn lifecycle (runTurn, runReview) +import { existsSync, statSync, unlinkSync } from "fs"; +import { join } from "path"; import type { AppServerClient } from "./protocol"; import type { UserInput, TurnStartParams, TurnStartResponse, TurnCompletedParams, @@ -11,6 +13,7 @@ import type { } from "./types"; import type { EventDispatcher } from "./events"; import type { ApprovalHandler } from "./approvals"; +import { config } from "./config"; export interface TurnOptions { dispatcher: EventDispatcher; @@ -20,6 +23,8 @@ export interface TurnOptions { model?: string; effort?: ReasoningEffort; approvalPolicy?: ApprovalPolicy; + /** Directory for kill signal files. Defaults to config.killSignalsDir. */ + killSignalsDir?: string; } export interface ReviewOptions extends TurnOptions { @@ -67,6 +72,14 @@ export async function runReview( return executeTurn(client, "review/start", params, opts); } +/** Error thrown when a kill signal file is detected during turn execution. */ +class KillSignalError extends Error { + constructor(public readonly threadId: string) { + super(`Thread ${threadId} killed by user`); + this.name = "KillSignalError"; + } +} + /** * Shared turn lifecycle: register handlers, send the start request, * wait for completion, collect results, and clean up. @@ -80,6 +93,10 @@ async function executeTurn( const startTime = Date.now(); opts.dispatcher.reset(); + const signalsDir = opts.killSignalsDir ?? config.killSignalsDir; + const threadId = params.threadId; + const signalPath = join(signalsDir, threadId); + // AbortController for cancelling in-flight approval polls on turn completion/timeout const abortController = new AbortController(); const unsubs = registerEventHandlers(client, opts, abortController.signal); @@ -94,9 +111,44 @@ async function executeTurn( const completion = createTurnCompletionAwaiter(client, opts.timeoutMs); unsubs.push(completion.unsubscribe); + // AbortController specifically for kill signal polling — aborted when + // the turn completes normally or on timeout so the poll interval stops. + const killAbort = new AbortController(); + + // Remove signal files left over from a previous (crashed) run, but preserve + // fresh signals written by a concurrent `kill` targeting this thread. + // Heuristic: files created before this process started are stale. + const processStartMs = Date.now() - process.uptime() * 1000; + try { + const st = statSync(signalPath); + if (st.mtimeMs < processStartMs) unlinkSync(signalPath); + } catch (e) { + if ((e as NodeJS.ErrnoException).code !== "ENOENT") { + console.error(`[codex] Warning: could not check/remove stale kill signal: ${e instanceof Error ? e.message : String(e)}`); + } + } + + // Start kill signal polling before the request so kills are detected even + // if turn/start is slow or stuck. + const killSignal = createKillSignalAwaiter( + threadId, signalsDir, 500, killAbort.signal, + ); + killSignal.catch((e) => { + if (!(e instanceof KillSignalError)) { + console.error(`[codex] Unexpected error in kill signal awaiter: ${e instanceof Error ? e.message : String(e)}`); + } + }); + try { - const { turn } = await client.request(method, params); - const completedTurn = await completion.waitFor(turn.id); + const { turn } = await Promise.race([ + client.request(method, params), + killSignal, + ]); + + const completedTurn = await Promise.race([ + completion.waitFor(turn.id), + killSignal, + ]); opts.dispatcher.flushOutput(); opts.dispatcher.flush(); @@ -115,9 +167,30 @@ async function executeTurn( error: completedTurn.turn.error?.message, durationMs: Date.now() - startTime, }; + } catch (e) { + if (e instanceof KillSignalError) { + opts.dispatcher.flushOutput(); + opts.dispatcher.flush(); + return { + status: "interrupted", + output: opts.dispatcher.getAccumulatedOutput(), + filesChanged: opts.dispatcher.getFilesChanged(), + commandsRun: opts.dispatcher.getCommandsRun(), + error: "Thread killed by user", + durationMs: Date.now() - startTime, + }; + } + throw e; } finally { + killAbort.abort(); abortController.abort(); for (const unsub of unsubs) unsub(); + // Clean up signal file + try { unlinkSync(signalPath); } catch (e) { + if ((e as NodeJS.ErrnoException).code !== "ENOENT") { + console.error(`[codex] Warning: could not clean up kill signal: ${e instanceof Error ? e.message : String(e)}`); + } + } } } @@ -193,6 +266,44 @@ function registerEventHandlers(client: AppServerClient, opts: TurnOptions, signa return unsubs; } +/** + * Create a promise that rejects with KillSignalError when a kill signal file + * appears for the given thread. Polls the filesystem at the given interval. + * Stops polling when the provided AbortSignal fires (i.e. when the turn finishes for any reason). + */ +function createKillSignalAwaiter( + threadId: string, + signalsDir: string, + pollIntervalMs: number, + signal: AbortSignal, +): Promise { + return new Promise((_resolve, reject) => { + // Check immediately + if (existsSync(join(signalsDir, threadId))) { + reject(new KillSignalError(threadId)); + return; + } + + const timer = setInterval(() => { + try { + if (signal.aborted) { + clearInterval(timer); + return; + } + if (existsSync(join(signalsDir, threadId))) { + clearInterval(timer); + reject(new KillSignalError(threadId)); + } + } catch (e) { + // Log but keep polling — the error may be transient (e.g. momentary EACCES). + console.error(`[codex] Warning: kill signal poll error (will retry): ${e instanceof Error ? e.message : String(e)}`); + } + }, pollIntervalMs); + + signal.addEventListener("abort", () => clearInterval(timer), { once: true }); + }); +} + /** * Create a turn/completed awaiter that buffers events from the moment it's * created. Call waitFor(turnId) after the request to resolve with the matching diff --git a/src/types.ts b/src/types.ts index cc656c6..d89d17d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -444,7 +444,7 @@ export interface ThreadMappingEntry { createdAt: string; model?: string; cwd?: string; - lastStatus?: "running" | "completed" | "failed" | "interrupted" | "killed"; + lastStatus?: "running" | "completed" | "failed" | "interrupted"; updatedAt?: string; }