Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions plugins/codex/scripts/app-server-broker.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,16 @@ async function main() {
}
}

// Tracks whether `shutdown()` is running so the upstream-exit watchdog
// below can tell intentional teardown (`broker/shutdown`, SIGTERM, SIGINT)
// apart from the upstream app-server dying on its own. Both paths resolve
// `appClient.exitPromise` via `appClient.close()`; without this flag the
// watchdog would re-enter `shutdown()` during graceful teardown and race
// `process.exit(0)` with `process.exit(1)`.
let shuttingDown = false;

async function shutdown(server) {
shuttingDown = true;
for (const socket of sockets) {
socket.end();
}
Expand All @@ -115,6 +124,26 @@ async function main() {

appClient.setNotificationHandler(routeNotification);

// Propagate upstream codex app-server exit so connected clients see a
// socket EOF instead of waiting forever for a turn/completed. Without
// this, a codex CLI crash (rate limit, OOM, internal error) leaves the
// broker idle-alive: the socket to each client stays open but no more
// notifications ever arrive, so `captureTurn` hangs. Closing the server
// triggers the client-side `exitPromise` resolution path that
// `captureTurn` now uses as a watchdog.
//
// The `shuttingDown` guard is required — `shutdown()` itself closes
// `appClient`, which resolves the same `exitPromise` this handler is
// listening on. Without the guard, every graceful termination would
// trigger this watchdog, re-enter `shutdown()`, and race the intentional
// `process.exit(0)` with `process.exit(1)`.
appClient.exitPromise.then(() => {
if (shuttingDown) {
return;
}
shutdown(server).catch(() => {}).finally(() => process.exit(1));
Comment on lines +140 to +144
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ignore intentional broker shutdowns in exit watcher

appClient.exitPromise resolves on both upstream crashes and normal appClient.close() calls. In this script, all intentional termination paths (broker/shutdown, SIGTERM, and SIGINT) call shutdown(server), which closes appClient, so this watcher also triggers and runs shutdown(server) again before forcing process.exit(1). That can turn graceful shutdowns into crash exits and cause shutdown reentrancy/races during normal broker teardown.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 14123d3. Added a shuttingDown flag set by shutdown() and checked in the watchdog, so intentional teardown paths no longer race. Regression test: broker exits cleanly on broker/shutdown without tripping the upstream-exit watchdog.

});

const server = net.createServer((socket) => {
sockets.add(socket);
socket.setEncoding("utf8");
Expand Down
45 changes: 45 additions & 0 deletions plugins/codex/scripts/lib/codex.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,51 @@ async function captureTurn(client, threadId, startRequest, options = {}) {
applyTurnNotification(state, message);
});

// Resolve captureTurn when the app-server disconnects before sending a
// terminal signal. The existing resolution paths both require server
// cooperation: (a) a `turn/completed` notification, or (b) a final_answer
// phase agentMessage that arms `scheduleInferredCompletion`. Neither fires
// when the Codex CLI exits mid-turn (rate limit, OOM, internal error) —
// observed in 2026-04-18 adversarial-review runs where the CLI emitted a
// plan-phase agentMessage, ran tool calls, then exited cleanly via IPC EOF.
// Without this watchdog, `state.completion` hangs, the companion script
// exits via its own IPC close handling, and runTrackedJob's try/catch is
// never reached — leaving a zombie job and no final verdict.
//
// Success preservation: if a `final_answer` agentMessage already arrived
// AND no subagent work is outstanding, the turn is authoritatively
// complete — the disconnect is just Codex closing the door behind a
// successful response. Mirror `scheduleInferredCompletion`'s success
// semantics (same flag, same pending-work check) without waiting for
// its 250ms debounce, which can't help a dead socket. Only when no
// terminal signal was captured do we synthesize a `failed` turn.
client.exitPromise.then(() => {
if (state.completed) {
return;
}
const hasFinalAnswer =
state.finalAnswerSeen &&
state.pendingCollaborations.size === 0 &&
state.activeSubagentTurns.size === 0;
if (hasFinalAnswer) {
completeTurn(state, null, { inferred: true });
return;
}
state.error = state.error ?? client.exitError ?? {
message: "codex app-server disconnected before the turn completed."
};
emitProgress(
state.onProgress,
"App-server disconnected before turn/completed; marking turn as failed.",
"failed"
);
completeTurn(
state,
{ id: state.turnId ?? "disconnected-turn", status: "failed" },
{ inferred: true }
);
});

try {
const response = await startRequest();
options.onResponse?.(response, state);
Expand Down
46 changes: 46 additions & 0 deletions tests/fake-codex-fixture.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,52 @@ rl.on("line", (line) => {
interruptibleTurns.set(turnId, { threadId: thread.id, timer });
} else if (BEHAVIOR === "slow-task") {
emitTurnCompletedLater(thread.id, turnId, items, 400);
} else if (BEHAVIOR === "disconnect-before-completion") {
// Reproduces the real-world Codex CLI exit path: plan-phase
// agentMessage (phase !== "final_answer") is emitted, then the
// server disconnects without ever sending turn/completed or a
// final_answer agentMessage. Neither existing captureTurn
// resolution path (turn/completed notification OR finalAnswerSeen
// inferred timer) fires → captureTurn hangs indefinitely.
send({ method: "turn/started", params: { threadId: thread.id, turn: buildTurn(turnId) } });
send({
method: "item/completed",
params: {
threadId: thread.id,
turnId,
item: {
type: "agentMessage",
id: "msg_" + turnId,
text: "I'm going to inspect the target and return a verdict.",
phase: "plan"
}
}
});
// Flush stdout then disconnect cleanly, matching observed upstream
// exit mode (exit 0 via IPC EOF; no protocol-level error).
process.stdout.write("", () => process.exit(0));
} else if (BEHAVIOR === "final-answer-then-exit") {
// The captureTurn watchdog must distinguish "disconnected with
// authoritative completion signal" (success) from "disconnected
// with no terminal signal" (failure). Emit a completed
// final_answer agentMessage and then exit before sending
// turn/completed — this is the success-path race the watchdog
// must preserve instead of demoting to failed.
send({ method: "turn/started", params: { threadId: thread.id, turn: buildTurn(turnId) } });
send({
method: "item/completed",
params: {
threadId: thread.id,
turnId,
item: {
type: "agentMessage",
id: "msg_" + turnId,
text: payload,
phase: "final_answer"
}
}
});
process.stdout.write("", () => process.exit(0));
} else {
emitTurnCompleted(thread.id, turnId, items);
}
Expand Down
136 changes: 134 additions & 2 deletions tests/runtime.test.mjs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import test from "node:test";
import assert from "node:assert/strict";
import { spawn } from "node:child_process";
import { spawn, spawnSync } from "node:child_process";
import { fileURLToPath } from "node:url";

import { buildEnv, installFakeCodex } from "./fake-codex-fixture.mjs";
import { initGitRepo, makeTempDir, run } from "./helpers.mjs";
import { loadBrokerSession, saveBrokerSession } from "../plugins/codex/scripts/lib/broker-lifecycle.mjs";
import {
loadBrokerSession,
saveBrokerSession,
sendBrokerShutdown,
waitForBrokerEndpoint
} from "../plugins/codex/scripts/lib/broker-lifecycle.mjs";
import { createBrokerEndpoint } from "../plugins/codex/scripts/lib/broker-endpoint.mjs";
import { resolveStateDir } from "../plugins/codex/scripts/lib/state.mjs";

const ROOT = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "..");
Expand Down Expand Up @@ -300,6 +307,131 @@ test("adversarial review asks Codex to inspect larger diffs itself", () => {
assert.doesNotMatch(state.lastTurnStart.prompt, /PROMPT_SELF_COLLECT_[ABC]/);
});

test("adversarial review resolves when app-server disconnects before turn/completed", () => {
// Regression: Codex CLI can exit cleanly (token limit / rate limit / internal
// error) after emitting a non-final_answer agentMessage but before sending
// turn/completed. Neither existing captureTurn resolution path fires — the
// Promise hangs and the companion script exits via IPC EOF without writing a
// final verdict, leaving a zombie job record.
//
// Observed in the wild on 2026-04-18 (MeeePtt favorites-fix commit). A passing
// test asserts captureTurn can handle "app-server closed unexpectedly".
const repo = makeTempDir();
const binDir = makeTempDir();
installFakeCodex(binDir, "disconnect-before-completion");
initGitRepo(repo);
fs.writeFileSync(path.join(repo, "README.md"), "hello\n");
run("git", ["add", "README.md"], { cwd: repo });
run("git", ["commit", "-m", "init"], { cwd: repo });
fs.writeFileSync(path.join(repo, "README.md"), "hello again\n");

// Explicit 10s timeout guards against the hang on unfixed code — without
// this, the test would block until node --test's outer limit and report
// "hung" instead of "disconnected".
const result = spawnSync("node", [SCRIPT, "adversarial-review"], {
cwd: repo,
env: buildEnv(binDir),
encoding: "utf8",
timeout: 10_000
});

assert.notEqual(result.signal, "SIGTERM",
`adversarial-review hung after app-server disconnect (stdout=${result.stdout}, stderr=${result.stderr})`);
assert.notEqual(result.status, 0,
"adversarial-review must surface disconnect as non-zero exit");
assert.match(
`${result.stdout}\n${result.stderr}`,
/disconnect|closed|unexpectedly/i,
"rendered output should mention the disconnect"
);
});

test("adversarial review preserves final_answer when app-server exits before turn/completed", () => {
// Companion to the disconnect-before-completion test: when the
// app-server emits a completed final_answer agentMessage and then exits
// cleanly (no turn/completed notification), the captureTurn watchdog
// must NOT demote the successful turn to failed. `finalAnswerSeen` is
// the authoritative "work finished" signal — the disconnect is just
// Codex closing the door, not a failure.
const repo = makeTempDir();
const binDir = makeTempDir();
installFakeCodex(binDir, "final-answer-then-exit");
initGitRepo(repo);
fs.mkdirSync(path.join(repo, "src"));
fs.writeFileSync(path.join(repo, "src", "app.js"), "export const value = items[0];\n");
run("git", ["add", "src/app.js"], { cwd: repo });
run("git", ["commit", "-m", "init"], { cwd: repo });
fs.writeFileSync(path.join(repo, "src", "app.js"), "export const value = items[0].id;\n");

const result = spawnSync("node", [SCRIPT, "adversarial-review"], {
cwd: repo,
env: buildEnv(binDir),
encoding: "utf8",
timeout: 10_000
});

assert.notEqual(result.signal, "SIGTERM",
`adversarial-review hung after final_answer + exit (stderr=${result.stderr})`);
assert.equal(result.status, 0,
`final_answer before disconnect must stay exit 0 (stdout=${result.stdout}, stderr=${result.stderr})`);
assert.match(
result.stdout,
/Missing empty-state guard/,
"structured findings from the final_answer payload must survive the disconnect"
);
});

test("broker exits cleanly on broker/shutdown without tripping the upstream-exit watchdog", async () => {
// Guards against a regression introduced while fixing the
// "disconnect-before-completion" hang: the upstream-exit watchdog added
// to app-server-broker.mjs listens on appClient.exitPromise, which is
// resolved both by Codex CLI crashes AND by shutdown()'s own
// appClient.close(). Without an `isShuttingDown` guard the watchdog
// re-enters shutdown() during broker/shutdown (or SIGTERM/SIGINT) and
// races the intentional process.exit(0) with process.exit(1).
const binDir = makeTempDir();
installFakeCodex(binDir);
const cwd = makeTempDir();

// Spawn the broker directly (not detached) so the test can observe its
// real exit code — the shared-broker lifecycle normally detaches the
// process which hides exit code from supervisors.
const brokerScript = path.join(PLUGIN_ROOT, "scripts", "app-server-broker.mjs");
const sessionDir = fs.mkdtempSync(path.join(os.tmpdir(), "broker-test-"));
const endpoint = createBrokerEndpoint(sessionDir);
const pidFile = path.join(sessionDir, "broker.pid");

const broker = spawn(
"node",
[brokerScript, "serve", "--endpoint", endpoint, "--cwd", cwd, "--pid-file", pidFile],
{ env: buildEnv(binDir), stdio: ["ignore", "pipe", "pipe"] }
);

const exited = new Promise((resolve) => {
broker.on("exit", (code, signal) => resolve({ code, signal }));
});

try {
const ready = await waitForBrokerEndpoint(endpoint, 5_000);
assert.ok(ready, "broker should become reachable");

await sendBrokerShutdown(endpoint);
const { code, signal } = await exited;

assert.equal(signal, null, "broker should exit without a signal");
assert.equal(
code,
0,
"broker/shutdown must return exit 0 — non-zero indicates the upstream-exit watchdog tripped during graceful teardown"
);
} finally {
if (broker.exitCode === null && broker.signalCode === null) {
broker.kill("SIGKILL");
}
fs.rmSync(sessionDir, { recursive: true, force: true });
}
});

test("review includes reasoning output when the app server returns it", () => {
const repo = makeTempDir();
const binDir = makeTempDir();
Expand Down