diff --git a/README.md b/README.md index 1fc109a..f830c0f 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,17 @@ loop upgrade When running from source (`bun src/loop.ts`), auto-update is disabled — use `git pull` instead. +## Manual proxy reconnect E2E + +For a real tmux + real Codex app-server reconnect check, run: + +```bash +bun tests/loop/codex-tmux-proxy.manual.ts --model gpt-5.4-mini +``` + +This is a manual harness only. It is not part of `bun test`. +It restarts against a fresh Codex thread after the app-server drop. + ## Options - `claude-loop`: shorthand for `loop --claude-only` diff --git a/src/loop/codex-tmux-proxy.ts b/src/loop/codex-tmux-proxy.ts index bb6c8e0..25800f9 100644 --- a/src/loop/codex-tmux-proxy.ts +++ b/src/loop/codex-tmux-proxy.ts @@ -8,6 +8,7 @@ import { import { formatCodexBridgeMessage } from "./bridge-message-format"; import { clearStaleTmuxBridgeState } from "./bridge-runtime"; import type { BridgeMessage } from "./bridge-store"; +import { LOOP_VERSION } from "./constants"; import { findFreePort } from "./ports"; import { isActiveRunState, @@ -23,7 +24,14 @@ const DRAIN_DELAY_MS = 250; const HEALTH_POLL_DELAY_MS = 150; const HEALTH_POLL_RETRIES = 40; const PROXY_STARTUP_GRACE_MS = 10_000; +const PROXY_UPSTREAM_INIT_TIMEOUT_MS = 5000; +const PROXY_UPSTREAM_RECONNECT_BASE_DELAY_MS = 250; +const PROXY_UPSTREAM_RECONNECT_MAX_ATTEMPTS = 40; +const PROXY_UPSTREAM_RECONNECT_MAX_DELAY_MS = 2000; +const BRIDGE_REQUEST_ID_PREFIX = "proxy-bridge-"; const INITIALIZE_METHOD = "initialize"; +const INITIALIZED_METHOD = "initialized"; +const THREAD_READ_METHOD = "thread/read"; const THREAD_RESUME_METHOD = "thread/resume"; const THREAD_START_METHOD = "thread/start"; const TURN_COMPLETED_METHOD = "turn/completed"; @@ -31,6 +39,7 @@ const TURN_STARTED_METHOD = "turn/started"; const TURN_START_METHOD = "turn/start"; const TURN_STEER_METHOD = "turn/steer"; const USER_INPUT_TEXT_ELEMENTS = "text_elements"; +const DEBUG_PROXY = process.env.LOOP_DEBUG_PROXY === "1"; export const CODEX_TMUX_PROXY_SUBCOMMAND = "__codex-tmux-proxy"; @@ -58,11 +67,20 @@ interface BridgeRequest { method: string; } +interface PendingUpstreamRequest { + reject: (error: Error) => void; + resolve: (frame: JsonFrame) => void; + timeout: ReturnType; +} + type StopReason = "dead-tmux" | "inactive-run"; const isRecord = (value: unknown): value is Record => typeof value === "object" && value !== null; +const asRecord = (value: unknown): Record => + (isRecord(value) ? value : {}) as Record; + const asString = (value: unknown): string | undefined => typeof value === "string" && value.length > 0 ? value : undefined; @@ -95,9 +113,9 @@ const buildInput = (prompt: string): Record[] => [ const bridgeMessageId = ( value: number | string | undefined -): number | undefined => { - const numeric = asNumber(value); - return numeric !== undefined && numeric < 0 ? numeric : undefined; +): string | undefined => { + const text = asString(value); + return text?.startsWith(BRIDGE_REQUEST_ID_PREFIX) ? text : undefined; }; const buildProxyUrl = (port: number): string => `ws://127.0.0.1:${port}/`; @@ -108,6 +126,12 @@ const wait = async (ms: number): Promise => { }); }; +const debugProxy = (message: string): void => { + if (DEBUG_PROXY) { + console.error(`[loop-proxy] ${message}`); + } +}; + const extractTurnId = (value: unknown): string | undefined => { if (!isRecord(value)) { return undefined; @@ -124,6 +148,43 @@ const extractThreadId = (value: unknown): string | undefined => { return asString(thread?.id) ?? asString(value.threadId); }; +const extractActiveTurnId = (value: unknown): string | undefined => { + const thread = isRecord(asRecord(value).thread) ? asRecord(value).thread : {}; + if (!Array.isArray(thread.turns)) { + return undefined; + } + for (let index = thread.turns.length - 1; index >= 0; index -= 1) { + const turn = isRecord(thread.turns[index]) + ? thread.turns[index] + : undefined; + if (turn && asString(turn.status) === "inProgress") { + return asString(turn.id); + } + } + return undefined; +}; + +const parseErrorText = (value: unknown): string | undefined => { + const record = isRecord(value) ? value : {}; + const error = isRecord(record.error) ? record.error : {}; + return ( + asString(error.message) || + asString(record.message) || + asString(record.reason) + ); +}; + +const isBusyTurnError = (value: unknown): boolean => { + const message = parseErrorText(value)?.toLowerCase() ?? ""; + return ( + message.includes("active turn") || + message.includes("already active") || + message.includes("busy") || + message.includes("in progress") || + message.includes("turn still active") + ); +}; + const latestActiveTurnId = (turnIds: Set): string | undefined => { let latest: string | undefined; for (const turnId of turnIds) { @@ -162,7 +223,7 @@ const persistCodexThreadId = (runDir: string, threadId: string): void => { }; const buildBridgeInjectionFrame = ( - requestId: number, + requestId: string, threadId: string, message: BridgeMessage, activeTurnId?: string @@ -225,14 +286,11 @@ const shouldStopForTmuxSession = ( return true; }; -const patchInitializeError = (frame: JsonFrame): JsonFrame => { - const error = isRecord(frame.error) ? frame.error : undefined; - const message = asString(error?.message)?.toLowerCase() ?? ""; - if (!message.includes("already initialized")) { - return frame; - } +const proxyInitializeResponse = ( + id: number | string | undefined +): JsonFrame => { return { - id: frame.id, + id, result: { platformFamily: "unix", platformOs: process.platform === "darwin" ? "macos" : process.platform, @@ -241,19 +299,49 @@ const patchInitializeError = (frame: JsonFrame): JsonFrame => { }; }; +const proxyErrorFrame = ( + id: number | string, + message: string +): Record => ({ + error: { message }, + id, +}); + +const proxyHealth = ( + upstreamConnected: boolean, + reconnecting: boolean +): { body: string; status?: number } => { + if (upstreamConnected) { + return { body: "ok" }; + } + return reconnecting + ? { body: "reconnecting", status: 503 } + : { body: "not ready", status: 503 }; +}; + +const reconnectDelayMs = (attempt: number): number => + Math.min( + PROXY_UPSTREAM_RECONNECT_BASE_DELAY_MS * 2 ** Math.max(0, attempt - 1), + PROXY_UPSTREAM_RECONNECT_MAX_DELAY_MS + ); + class CodexTmuxProxy { private readonly activeTurnIds = new Set(); - private readonly bridgeRequests = new Map(); + private readonly bridgeRequests = new Map(); private readonly port: number; - private readonly remoteUrl: string; + private remoteUrl: string; private readonly routes = new Map(); private readonly runDir: string; + private readonly upstreamRequests = new Map(); private currentConnId = 0; private drainTimer: ReturnType | undefined; private initialized = false; - private nextBridgeRequestId = -1; + private nextBridgeRequestId = 1; private nextProxyId = 100_000; private proxyServer: ReturnType | undefined; + private reconnectAttemptCount = 0; + private reconnectTimer: ReturnType | undefined; + private reconnecting = false; private resolveStopped = () => undefined; private sawTmuxSession = false; private stopped = false; @@ -280,24 +368,15 @@ class CodexTmuxProxy { } async start(): Promise { - this.upstream = await connectWs(this.remoteUrl); - this.upstream.onmessage = (data) => { - for (const raw of data.split("\n")) { - if (raw.trim()) { - this.handleUpstreamFrame(raw); - } - } - }; - this.upstream.onclose = () => { - this.stop(); - }; + await this.connectUpstream(); this.proxyServer = serve({ fetch: (request, server) => { const path = new URL(request.url).pathname; if (path === "/healthz" || path === "/readyz") { + const health = proxyHealth(Boolean(this.upstream), this.reconnecting); return new Response( - this.upstream ? "ok" : "not ready", - this.upstream ? undefined : { status: 503 } + health.body, + health.status ? { status: health.status } : undefined ); } if (server.upgrade(request, { data: { connId: 0 } })) { @@ -327,6 +406,7 @@ class CodexTmuxProxy { }, open: (ws) => { this.currentConnId += 1; + this.initialized = false; ws.data.connId = this.currentConnId; this.tuiSocket = ws; }, @@ -347,6 +427,10 @@ class CodexTmuxProxy { return; } this.stopped = true; + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = undefined; + } if (this.drainTimer) { clearInterval(this.drainTimer); this.drainTimer = undefined; @@ -367,6 +451,257 @@ class CodexTmuxProxy { this.upstream?.send(`${JSON.stringify(frame)}\n`); } + private resolveRemoteUrl(): string { + const manifest = readRunManifest(join(this.runDir, "manifest.json")); + const nextUrl = manifest?.codexRemoteUrl || this.remoteUrl; + if (nextUrl) { + this.remoteUrl = nextUrl; + } + return this.remoteUrl; + } + + private resolveThreadId(): string { + const manifest = readRunManifest(join(this.runDir, "manifest.json")); + const nextThreadId = manifest?.codexThreadId || this.threadId; + if (nextThreadId) { + this.threadId = nextThreadId; + } + return this.threadId; + } + + private attachUpstream(ws: WsClient): void { + this.upstream = ws; + this.reconnecting = false; + this.reconnectAttemptCount = 0; + ws.onmessage = (data) => { + for (const raw of data.split("\n")) { + if (raw.trim()) { + this.handleUpstreamFrame(raw); + } + } + }; + ws.onclose = () => { + if (this.upstream !== ws) { + return; + } + this.handleUpstreamDisconnect(); + }; + } + + private async initializeUpstream(ws: WsClient): Promise { + const requestId = `proxy-initialize-${Date.now()}-${this.nextProxyId++}`; + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error("codex tmux proxy upstream initialize timed out")); + }, PROXY_UPSTREAM_INIT_TIMEOUT_MS); + const finish = (error?: Error): void => { + clearTimeout(timeout); + ws.onclose = undefined; + ws.onmessage = undefined; + if (error) { + reject(error); + return; + } + resolve(); + }; + ws.onclose = () => { + finish(new Error("codex tmux proxy upstream closed during initialize")); + }; + ws.onmessage = (data) => { + for (const raw of data.split("\n")) { + if (!raw.trim()) { + continue; + } + const frame = asJsonFrame(raw); + if (!frame || String(frame.id) !== requestId) { + continue; + } + if (frame.error) { + finish(new Error("codex tmux proxy upstream initialize failed")); + return; + } + ws.send( + `${JSON.stringify({ + jsonrpc: "2.0", + method: INITIALIZED_METHOD, + })}\n` + ); + finish(); + return; + } + }; + ws.send( + `${JSON.stringify({ + id: requestId, + method: INITIALIZE_METHOD, + params: { + capabilities: { experimentalApi: true }, + clientInfo: { + name: "loop-tmux-proxy", + title: "loop-tmux-proxy", + version: LOOP_VERSION, + }, + }, + })}\n` + ); + }); + } + + private async connectUpstream(): Promise { + const ws = await connectWs(this.resolveRemoteUrl()); + try { + await this.initializeUpstream(ws); + } catch (error) { + try { + ws.close(); + } catch { + // ignore close errors + } + throw error; + } + this.attachUpstream(ws); + await this.refreshActiveTurnState().catch(() => undefined); + } + + private sendUpstreamRequest( + method: string, + params: Record + ): Promise { + if (!this.upstream) { + throw new Error("codex app-server upstream is not connected"); + } + const requestId = `proxy-${method}-${Date.now()}-${this.nextProxyId++}`; + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + this.upstreamRequests.delete(requestId); + reject(new Error(`codex tmux proxy upstream ${method} timed out`)); + }, PROXY_UPSTREAM_INIT_TIMEOUT_MS); + this.upstreamRequests.set(requestId, { reject, resolve, timeout }); + this.upstream?.send( + `${JSON.stringify({ + id: requestId, + method, + params, + })}\n` + ); + }); + } + + private async refreshActiveTurnState(): Promise { + if (!(this.threadId && this.upstream)) { + this.activeTurnIds.clear(); + this.turnInProgress = false; + return; + } + const frame = await this.sendUpstreamRequest(THREAD_READ_METHOD, { + includeTurns: true, + threadId: this.threadId, + }); + this.activeTurnIds.clear(); + const activeTurnId = extractActiveTurnId(frame.result); + debugProxy( + `thread/read thread=${this.threadId} activeTurn=${activeTurnId ?? "none"}` + ); + if (activeTurnId) { + this.activeTurnIds.add(activeTurnId); + this.turnInProgress = true; + return; + } + this.turnInProgress = false; + } + + private async refreshActiveTurnStateAfterBusyError(): Promise { + try { + await this.refreshActiveTurnState(); + } catch { + this.turnInProgress = false; + } + } + + private failPendingRoutes(message: string): void { + for (const route of this.routes.values()) { + if (route.connId !== this.currentConnId) { + continue; + } + this.forwardToTui( + JSON.stringify(proxyErrorFrame(route.clientId, message)) + ); + } + this.routes.clear(); + } + + private clearUpstreamState(): void { + this.failPendingRoutes("codex app-server upstream disconnected"); + this.bridgeRequests.clear(); + for (const request of this.upstreamRequests.values()) { + clearTimeout(request.timeout); + request.reject(new Error("codex app-server upstream disconnected")); + } + this.upstreamRequests.clear(); + this.activeTurnIds.clear(); + this.turnInProgress = false; + } + + private scheduleReconnect(): void { + if ( + this.stopped || + this.upstream || + this.reconnectTimer || + this.reconnectAttemptCount >= PROXY_UPSTREAM_RECONNECT_MAX_ATTEMPTS + ) { + if ( + !(this.stopped || this.upstream) && + this.reconnectAttemptCount >= PROXY_UPSTREAM_RECONNECT_MAX_ATTEMPTS + ) { + this.stop(); + } + return; + } + this.reconnecting = true; + this.reconnectAttemptCount += 1; + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = undefined; + this.tryReconnect().catch(() => undefined); + }, reconnectDelayMs(this.reconnectAttemptCount)); + this.reconnectTimer.unref?.(); + } + + private async tryReconnect(): Promise { + if (this.stopped || this.upstream) { + return; + } + const stopReason = this.stopReason(); + if (stopReason) { + if (stopReason === "dead-tmux") { + clearStaleTmuxBridgeState(this.runDir); + } + this.stop(); + return; + } + try { + await this.connectUpstream(); + } catch { + this.scheduleReconnect(); + } + } + + private handleUpstreamDisconnect(): void { + if (this.stopped) { + return; + } + this.upstream = undefined; + this.clearUpstreamState(); + const stopReason = this.stopReason(); + if (stopReason) { + if (stopReason === "dead-tmux") { + clearStaleTmuxBridgeState(this.runDir); + } + this.stop(); + return; + } + this.scheduleReconnect(); + } + private rememberThreadId(threadId: string | undefined): void { if (!threadId || threadId === this.threadId) { return; @@ -381,6 +716,19 @@ class CodexTmuxProxy { this.upstream?.send(raw); return; } + if (frame.method === INITIALIZE_METHOD) { + this.initialized = true; + this.forwardToTui(JSON.stringify(proxyInitializeResponse(frame.id))); + return; + } + if (!this.upstream) { + this.forwardToTui( + JSON.stringify( + proxyErrorFrame(frame.id, "codex app-server is reconnecting") + ) + ); + return; + } const proxyId = this.nextProxyId++; this.routes.set(proxyId, { @@ -425,8 +773,31 @@ class CodexTmuxProxy { return; } + const upstreamRequestId = asString(frame.id); + if (upstreamRequestId) { + const request = this.upstreamRequests.get(upstreamRequestId); + if (request) { + this.upstreamRequests.delete(upstreamRequestId); + clearTimeout(request.timeout); + if (frame.error) { + request.reject( + new Error( + parseErrorText(frame.error) ?? + `codex tmux proxy upstream ${upstreamRequestId} failed` + ) + ); + } else { + request.resolve(frame); + } + return; + } + } + const bridgeId = bridgeMessageId(frame.id); if (bridgeId !== undefined) { + debugProxy( + `bridge response id=${bridgeId} error=${parseErrorText(frame.error) ?? "none"}` + ); this.handleBridgeResponse(bridgeId, frame); return; } @@ -449,9 +820,7 @@ class CodexTmuxProxy { this.handleTrackedResponse(route, frame); frame.id = route.clientId; - const response = - route.method === INITIALIZE_METHOD ? patchInitializeError(frame) : frame; - this.forwardToTui(JSON.stringify(response)); + this.forwardToTui(JSON.stringify(frame)); } private handleTrackedResponse(route: ProxyRoute, frame: JsonFrame): void { @@ -460,11 +829,6 @@ class CodexTmuxProxy { return; } - if (route.method === INITIALIZE_METHOD && !frame.error) { - this.initialized = true; - return; - } - if ( !frame.error && (route.method === THREAD_START_METHOD || @@ -483,13 +847,26 @@ class CodexTmuxProxy { } } - private handleBridgeResponse(id: number, frame: JsonFrame): void { + private handleBridgeResponse(id: string, frame: JsonFrame): void { const request = this.bridgeRequests.get(id); if (!request) { return; } this.bridgeRequests.delete(id); if (frame.error) { + if (request.method === TURN_STEER_METHOD) { + this.activeTurnIds.clear(); + this.turnInProgress = false; + return; + } + if ( + request.method === TURN_START_METHOD && + isBusyTurnError(frame.error) + ) { + this.turnInProgress = true; + this.refreshActiveTurnStateAfterBusyError(); + return; + } this.turnInProgress = this.activeTurnIds.size > 0; return; } @@ -559,9 +936,8 @@ class CodexTmuxProxy { this.stop(); return; } - if ( - !(this.initialized && this.threadId && this.tuiSocket && this.upstream) - ) { + const threadId = this.resolveThreadId(); + if (!(this.initialized && threadId && this.tuiSocket && this.upstream)) { return; } const activeTurnId = latestActiveTurnId(this.activeTurnIds); @@ -579,10 +955,10 @@ class CodexTmuxProxy { return; } - const requestId = this.nextBridgeRequestId--; + const requestId = `${BRIDGE_REQUEST_ID_PREFIX}${this.nextBridgeRequestId++}`; const frame = buildBridgeInjectionFrame( requestId, - this.threadId, + threadId, message, activeTurnId ); @@ -591,6 +967,9 @@ class CodexTmuxProxy { method: frame.method ?? TURN_START_METHOD, }); this.turnInProgress = true; + debugProxy( + `bridge send id=${requestId} method=${frame.method ?? TURN_START_METHOD} thread=${this.threadId}` + ); this.forwardToUpstream(frame); } } @@ -632,10 +1011,12 @@ export const runCodexTmuxProxy = async ( export const codexTmuxProxyInternals = { buildBridgeInjectionFrame, + reconnectDelayMs, + proxyHealth, latestActiveTurnId, buildProxyUrl, noteStartedTurn, - patchInitializeError, + proxyInitializeResponse, persistCodexThreadId, shouldPauseBridgeDrain, shouldStopForTmuxSession, diff --git a/tests/loop/codex-tmux-proxy.integration.test.ts b/tests/loop/codex-tmux-proxy.integration.test.ts new file mode 100644 index 0000000..7a85eb4 --- /dev/null +++ b/tests/loop/codex-tmux-proxy.integration.test.ts @@ -0,0 +1,385 @@ +import { expect, test } from "bun:test"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { type ServerWebSocket, serve } from "bun"; +import { runCli } from "../../src/cli"; +import { appendBridgeMessage } from "../../src/loop/bridge-store"; +import { + CODEX_TMUX_PROXY_SUBCOMMAND, + waitForCodexTmuxProxy, +} from "../../src/loop/codex-tmux-proxy"; +import { findFreePort } from "../../src/loop/ports"; +import { + createRunManifest, + updateRunManifest, + writeRunManifest, +} from "../../src/loop/run-state"; + +const TEST_PORT_RANGE = 200; +const TEST_PORT_RETRY_LIMIT = 5; +const TEST_PORT_START = 24_000; +const TEST_PORT_WINDOW = 20_000; + +interface JsonFrame { + error?: unknown; + id?: number | string; + method?: string; + params?: unknown; + result?: unknown; +} + +const bridgeMessage = { + message: "Please review the latest diff.", + source: "claude" as const, + target: "codex" as const, +}; + +const makeTempDir = (): string => mkdtempSync(join(tmpdir(), "loop-proxy-")); + +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null; + +const asRecord = (value: unknown): Record => + (isRecord(value) ? value : {}) as Record; + +const asString = (value: unknown): string | undefined => + typeof value === "string" && value.length > 0 ? value : undefined; + +const isBridgeRequestId = (value: unknown): boolean => + typeof value === "string" && value.startsWith("proxy-bridge-"); + +const isAddressInUseError = (error: unknown): boolean => { + if (!isRecord(error)) { + return false; + } + const code = asString(error.code); + if (code === "EADDRINUSE") { + return true; + } + const message = asString(error.message)?.toLowerCase() ?? ""; + return message.includes("eaddrinuse"); +}; + +const randomTestPortBase = (): number => + TEST_PORT_START + Math.floor(Math.random() * TEST_PORT_WINDOW); + +const findTestPort = (): Promise => + findFreePort(randomTestPortBase(), TEST_PORT_RANGE); + +const startServerWithRetries = async ( + createServer: (port: number) => ReturnType +): Promise<{ port: number; server: ReturnType }> => { + let lastError: unknown; + for (let attempt = 0; attempt < TEST_PORT_RETRY_LIMIT; attempt += 1) { + const port = await findTestPort(); + try { + return { + port, + server: createServer(port), + }; + } catch (error) { + if (!isAddressInUseError(error)) { + throw error; + } + lastError = error; + } + } + throw lastError instanceof Error + ? lastError + : new Error("failed to start test server"); +}; + +const startCliProxyWithRetries = async ( + runDir: string, + remoteUrl: string, + threadId: string +): Promise<{ proxyTask: Promise; proxyUrl: string }> => { + let lastError: unknown; + for (let attempt = 0; attempt < TEST_PORT_RETRY_LIMIT; attempt += 1) { + const port = await findTestPort(); + const proxyTask = runCli([ + CODEX_TMUX_PROXY_SUBCOMMAND, + runDir, + remoteUrl, + threadId, + String(port), + ]); + try { + const proxyUrl = await Promise.race([ + waitForCodexTmuxProxy(port), + proxyTask.then(() => { + throw new Error("codex tmux proxy stopped before becoming ready"); + }), + ]); + return { proxyTask, proxyUrl }; + } catch (error) { + if (!isAddressInUseError(error)) { + throw error; + } + await proxyTask.catch(() => undefined); + lastError = error; + } + } + throw lastError instanceof Error + ? lastError + : new Error("failed to start codex tmux proxy"); +}; + +const waitFor = async ( + predicate: () => boolean, + timeoutMs = 5000 +): Promise => { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (predicate()) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 25)); + } + throw new Error("timed out waiting for condition"); +}; + +test("runCli reconnects the codex tmux proxy subcommand without dropping the tui socket", async () => { + const root = makeTempDir(); + const manifestPath = join(root, "manifest.json"); + const bridgeMethods: string[] = []; + const bridgeThreadIds: string[] = []; + const tuiTurnIds: string[] = []; + const upstreamSockets: ServerWebSocket<{ initialized: boolean }>[] = []; + const tuiMessages: JsonFrame[] = []; + let proxyTask: Promise | undefined; + let upstreamInitializeCount = 0; + let upstreamConnections = 0; + let tuiClosed = false; + + const upstreamStart = await startServerWithRetries((port) => + serve({ + fetch: (request, server) => { + if (server.upgrade(request, { data: { initialized: false } })) { + return undefined; + } + return new Response("upstream"); + }, + hostname: "127.0.0.1", + port, + websocket: { + close: (ws) => { + const index = upstreamSockets.indexOf(ws); + if (index !== -1) { + upstreamSockets.splice(index, 1); + } + }, + message: (ws, message) => { + const payload = + typeof message === "string" ? message : message.toString(); + for (const raw of payload.split("\n")) { + if (!raw.trim()) { + continue; + } + const frame = JSON.parse(raw) as JsonFrame; + if (frame.method === "initialize") { + upstreamInitializeCount += 1; + if (ws.data.initialized) { + ws.send( + JSON.stringify({ + error: { message: "already initialized" }, + id: frame.id, + }) + ); + } else { + ws.data.initialized = true; + ws.send(JSON.stringify({ id: frame.id, result: {} })); + } + continue; + } + if (frame.method === "initialized") { + continue; + } + if (frame.method === "thread/read") { + ws.send( + JSON.stringify({ + id: frame.id, + result: { + thread: { + turns: [], + }, + }, + }) + ); + continue; + } + if (frame.method === "turn/start") { + const threadId = asString(asRecord(frame.params).threadId); + const turnId = isBridgeRequestId(frame.id) + ? "bridge-turn-after-reconnect" + : `tui-turn-${tuiTurnIds.length + 1}`; + if (isBridgeRequestId(frame.id)) { + bridgeMethods.push(frame.method); + if (threadId) { + bridgeThreadIds.push(threadId); + } + } else { + tuiTurnIds.push(turnId); + } + ws.send( + JSON.stringify({ + id: frame.id, + result: { turn: { id: turnId } }, + }) + ); + } + } + }, + open: (ws) => { + upstreamConnections += 1; + upstreamSockets.push(ws); + }, + }, + }) + ); + const upstreamServer = upstreamStart.server; + const upstreamUrl = `ws://127.0.0.1:${upstreamStart.port}/`; + + writeRunManifest( + manifestPath, + createRunManifest({ + claudeSessionId: "claude-1", + codexRemoteUrl: upstreamUrl, + codexThreadId: "thread-1", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "proxy-cli-e2e", + state: "working", + status: "running", + }) + ); + + let tui: WebSocket | undefined; + try { + const proxyStart = await startCliProxyWithRetries( + root, + upstreamUrl, + "thread-1" + ); + proxyTask = proxyStart.proxyTask; + tui = new WebSocket(proxyStart.proxyUrl); + tui.onclose = () => { + tuiClosed = true; + }; + tui.onmessage = (event) => { + tuiMessages.push(JSON.parse(String(event.data)) as JsonFrame); + }; + + await new Promise((resolve, reject) => { + if (!tui) { + reject(new Error("missing tui websocket")); + return; + } + tui.onopen = () => resolve(); + tui.onerror = () => reject(new Error("failed to open tui websocket")); + }); + + tui.send(JSON.stringify({ id: 1, method: "initialize", params: {} })); + await waitFor( + () => tuiMessages.some((frame) => frame.id === 1 && frame.result), + 5000 + ); + expect(upstreamInitializeCount).toBe(1); + + tui.send( + JSON.stringify({ + id: 2, + method: "turn/start", + params: { + input: [ + { + text: "hello before reconnect", + text_elements: [], + type: "text", + }, + ], + threadId: "thread-1", + }, + }) + ); + await waitFor( + () => + tuiMessages.some( + (frame) => + frame.id === 2 && + (frame.result as { turn?: { id?: string } } | undefined)?.turn?.id + ), + 5000 + ); + + upstreamSockets[0]?.close(); + await waitFor(() => upstreamConnections >= 2, 5000); + expect(tuiClosed).toBe(false); + expect(upstreamInitializeCount).toBe(2); + updateRunManifest(manifestPath, (manifest) => + manifest + ? { + ...manifest, + codexThreadId: "thread-2", + } + : manifest + ); + + appendBridgeMessage( + root, + bridgeMessage.source, + bridgeMessage.target, + bridgeMessage.message + ); + await waitFor(() => bridgeMethods.length > 0, 5000); + expect(bridgeMethods).toEqual(["turn/start"]); + expect(bridgeThreadIds).toEqual(["thread-2"]); + + tui.send( + JSON.stringify({ + id: 3, + method: "turn/start", + params: { + input: [ + { + text: "hello after reconnect", + text_elements: [], + type: "text", + }, + ], + threadId: "thread-2", + }, + }) + ); + await waitFor( + () => + tuiMessages.some( + (frame) => + frame.id === 3 && + (frame.result as { turn?: { id?: string } } | undefined)?.turn?.id + ), + 5000 + ); + expect(tuiClosed).toBe(false); + } finally { + tui?.close(); + updateRunManifest(manifestPath, (manifest) => + manifest + ? { + ...manifest, + state: "completed", + status: "completed", + } + : manifest + ); + await Promise.race([ + proxyTask ?? Promise.resolve(), + new Promise((resolve) => setTimeout(resolve, 2000)), + ]); + upstreamServer.stop(true); + rmSync(root, { recursive: true, force: true }); + } +}); diff --git a/tests/loop/codex-tmux-proxy.manual.ts b/tests/loop/codex-tmux-proxy.manual.ts new file mode 100644 index 0000000..6b3f227 --- /dev/null +++ b/tests/loop/codex-tmux-proxy.manual.ts @@ -0,0 +1,496 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { spawn, spawnSync } from "bun"; +import { + appendBridgeMessage, + readPendingBridgeMessages, +} from "../../src/loop/bridge-store"; +import { + closeAppServer, + getCodexAppServerUrl, + getLastCodexThreadId, + startAppServer, +} from "../../src/loop/codex-app-server"; +import { + CODEX_TMUX_PROXY_SUBCOMMAND, + waitForCodexTmuxProxy, +} from "../../src/loop/codex-tmux-proxy"; +import { findFreePort } from "../../src/loop/ports"; +import { + createRunManifest, + updateRunManifest, + writeRunManifest, +} from "../../src/loop/run-state"; + +interface JsonFrame { + error?: unknown; + id?: number | string; + method?: string; + params?: unknown; + result?: unknown; +} + +const DEFAULT_MODEL = "gpt-5.4-mini"; +const APP_SERVER_RETRY_DELAY_MS = 500; +const APP_SERVER_RETRY_LIMIT = 3; +const PROXY_PORT_RANGE = 500; +const PROXY_PORT_RETRY_LIMIT = 5; +const PROXY_PORT_START = 26_000; +const PROXY_PORT_WINDOW = 20_000; +const SESSION_PREFIX = "loop-proxy-e2e"; +const TUI_INITIALIZE_PARAMS = { + capabilities: { experimentalApi: true }, + clientInfo: { + name: "loop-proxy-manual-e2e", + title: "loop-proxy-manual-e2e", + version: "1.0.0", + }, +}; +const TURN_TIMEOUT_MS = 60_000; + +const makeTempDir = (): string => mkdtempSync(join(tmpdir(), "loop-proxy-")); + +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null; + +const asString = (value: unknown): string | undefined => + typeof value === "string" && value.length > 0 ? value : undefined; + +const extractTurnId = (value: unknown): string | undefined => { + if (!isRecord(value)) { + return undefined; + } + const turn = isRecord(value.turn) ? value.turn : undefined; + return asString(value.turnId) ?? asString(turn?.id); +}; + +const randomProxyPortBase = (): number => + PROXY_PORT_START + Math.floor(Math.random() * PROXY_PORT_WINDOW); + +const findProxyPort = (): Promise => + findFreePort(randomProxyPortBase(), PROXY_PORT_RANGE); + +const waitFor = async ( + predicate: () => boolean, + timeoutMs: number, + errorMessage: string +): Promise => { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (predicate()) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 250)); + } + throw new Error(errorMessage); +}; + +const describeFrameError = (value: unknown): string => { + if (typeof value === "string" && value) { + return value; + } + if (isRecord(value) && typeof value.message === "string" && value.message) { + return value.message; + } + return JSON.stringify(value); +}; + +const waitForResponseFrame = async ( + messages: JsonFrame[], + id: number, + label: string, + isClosed: () => boolean +): Promise => { + await waitFor( + () => isClosed() || messages.some((frame) => frame.id === id), + TURN_TIMEOUT_MS, + `[manual e2e] ${label} did not complete` + ); + if (isClosed()) { + throw new Error( + `[manual e2e] ${label} failed because the tui socket closed` + ); + } + const frame = messages.find((entry) => entry.id === id); + if (!frame) { + throw new Error(`[manual e2e] ${label} did not complete`); + } + if (frame.error) { + throw new Error( + `[manual e2e] ${label} failed: ${describeFrameError(frame.error)}` + ); + } + return frame; +}; + +const parseModel = (argv: string[]): string => { + const modelIndex = argv.indexOf("--model"); + if (modelIndex === -1) { + return process.env.LOOP_E2E_CODEX_MODEL ?? DEFAULT_MODEL; + } + const value = argv[modelIndex + 1]; + if (!value) { + throw new Error( + "Usage: bun tests/loop/codex-tmux-proxy.manual.ts --model " + ); + } + return value; +}; + +const requireCommand = (args: string[], label: string): void => { + const result = spawnSync(args, { + stderr: "ignore", + stdout: "ignore", + }); + if (result.exitCode !== 0) { + throw new Error(`[manual e2e] missing prerequisite: ${label}`); + } +}; + +const createTmuxSession = (name: string): void => { + const result = spawnSync([ + "tmux", + "new-session", + "-d", + "-s", + name, + "sleep 600", + ]); + if (result.exitCode !== 0) { + throw new Error(`[manual e2e] failed to create tmux session "${name}"`); + } +}; + +const killTmuxSession = (name: string): void => { + spawnSync(["tmux", "kill-session", "-t", name], { + stderr: "ignore", + stdout: "ignore", + }); +}; + +const startProxyWithRetries = async ( + runDir: string, + remoteUrl: string, + threadId: string +): Promise<{ + proxyProcess: ReturnType; + proxyTask: Promise; + proxyUrl: string; +}> => { + let lastError: unknown; + for (let attempt = 0; attempt < PROXY_PORT_RETRY_LIMIT; attempt += 1) { + const port = await findProxyPort(); + const proxyProcess = spawn( + [ + process.execPath, + join(process.cwd(), "src", "cli.ts"), + CODEX_TMUX_PROXY_SUBCOMMAND, + runDir, + remoteUrl, + threadId, + String(port), + ], + { + cwd: process.cwd(), + env: process.env, + stderr: "inherit", + stdin: "ignore", + stdout: "inherit", + } + ); + const proxyTask = proxyProcess.exited.then((code) => { + if (code === 0) { + return; + } + throw new Error(`[manual e2e] proxy exited with code ${code}`); + }); + try { + const proxyUrl = await Promise.race([ + waitForCodexTmuxProxy(port), + proxyTask.then(() => { + throw new Error( + "[manual e2e] codex tmux proxy stopped before becoming ready" + ); + }), + ]); + return { proxyProcess, proxyTask, proxyUrl }; + } catch (error) { + proxyProcess.kill(); + await Promise.race([ + proxyTask.catch(() => undefined), + new Promise((resolve) => setTimeout(resolve, 1000)), + ]); + lastError = error; + } + } + throw lastError instanceof Error + ? lastError + : new Error("[manual e2e] failed to start the codex tmux proxy"); +}; + +const startManualAppServer = async ( + options: Parameters[0], + errorMessage: string +): Promise<{ remoteUrl: string; threadId: string }> => { + let lastError: unknown = new Error(errorMessage); + for (let attempt = 0; attempt < APP_SERVER_RETRY_LIMIT; attempt += 1) { + try { + await startAppServer(options); + await waitFor( + () => + Boolean(getCodexAppServerUrl()) && Boolean(getLastCodexThreadId()), + TURN_TIMEOUT_MS, + errorMessage + ); + const remoteUrl = getCodexAppServerUrl(); + const threadId = getLastCodexThreadId(); + if (remoteUrl && threadId) { + return { remoteUrl, threadId }; + } + lastError = new Error(errorMessage); + } catch (error) { + lastError = error; + } + await closeAppServer(); + if (attempt + 1 < APP_SERVER_RETRY_LIMIT) { + await new Promise((resolve) => + setTimeout(resolve, APP_SERVER_RETRY_DELAY_MS) + ); + } + } + throw lastError instanceof Error ? lastError : new Error(errorMessage); +}; + +const waitForProxyReady = async (proxyUrl: string): Promise => { + const readyUrl = new URL(proxyUrl); + readyUrl.pathname = "/readyz"; + readyUrl.protocol = readyUrl.protocol === "wss:" ? "https:" : "http:"; + const deadline = Date.now() + TURN_TIMEOUT_MS; + while (Date.now() < deadline) { + try { + const response = await fetch(readyUrl); + if (response.ok) { + return; + } + } catch { + // keep polling + } + await new Promise((resolve) => setTimeout(resolve, 250)); + } + throw new Error("[manual e2e] codex tmux proxy did not reconnect"); +}; + +const sendTurn = async ( + socket: WebSocket, + messages: JsonFrame[], + id: number, + text: string, + threadId: string, + isClosed: () => boolean +): Promise => { + socket.send( + JSON.stringify({ + id, + method: "turn/start", + params: { + input: [ + { + text, + text_elements: [], + type: "text", + }, + ], + threadId, + }, + }) + ); + const frame = await waitForResponseFrame( + messages, + id, + `turn ${id}`, + isClosed + ); + const turnId = extractTurnId(frame.result); + if (!turnId) { + throw new Error(`[manual e2e] turn ${id} did not return a turn id`); + } +}; + +const main = async (): Promise => { + const model = parseModel(process.argv.slice(2)); + requireCommand(["tmux", "-V"], "tmux"); + requireCommand(["codex", "--version"], "codex"); + + const runDir = makeTempDir(); + const manifestPath = join(runDir, "manifest.json"); + const tmuxSession = `${SESSION_PREFIX}-${Date.now()}`; + let proxyTask: Promise | undefined; + let proxyProcess: ReturnType | undefined; + let tui: WebSocket | undefined; + let tuiClosed = false; + const tuiMessages: JsonFrame[] = []; + + console.log(`[manual e2e] using model: ${model}`); + console.log(`[manual e2e] run dir: ${runDir}`); + console.log(`[manual e2e] tmux session: ${tmuxSession}`); + createTmuxSession(tmuxSession); + + try { + const initialAppServer = await startManualAppServer( + { + persistentThread: true, + threadModel: model, + }, + "[manual e2e] failed to start the real Codex app-server" + ); + const initialRemoteUrl = initialAppServer.remoteUrl; + const threadId = initialAppServer.threadId; + + writeRunManifest( + manifestPath, + createRunManifest({ + claudeSessionId: "manual-e2e", + codexRemoteUrl: initialRemoteUrl, + codexThreadId: threadId, + cwd: process.cwd(), + mode: "paired", + pid: process.pid, + repoId: "manual-e2e", + runId: "manual-e2e", + state: "working", + status: "running", + tmuxSession, + }) + ); + + const proxyStart = await startProxyWithRetries( + runDir, + initialRemoteUrl, + threadId + ); + proxyProcess = proxyStart.proxyProcess; + proxyTask = proxyStart.proxyTask; + const proxyUrl = proxyStart.proxyUrl; + tui = new WebSocket(proxyUrl); + tui.onclose = () => { + tuiClosed = true; + }; + tui.onmessage = (event) => { + tuiMessages.push(JSON.parse(String(event.data)) as JsonFrame); + }; + + await new Promise((resolve, reject) => { + if (!tui) { + reject(new Error("[manual e2e] missing tui websocket")); + return; + } + tui.onopen = () => resolve(); + tui.onerror = () => + reject(new Error("[manual e2e] failed to open tui websocket")); + }); + + tui.send( + JSON.stringify({ + id: 1, + method: "initialize", + params: TUI_INITIALIZE_PARAMS, + }) + ); + const initializeFrame = await waitForResponseFrame( + tuiMessages, + 1, + "initialize", + () => tuiClosed + ); + if (!initializeFrame.result) { + throw new Error("[manual e2e] initialize did not return a result"); + } + console.log("[manual e2e] proxy initialized"); + + await sendTurn( + tui, + tuiMessages, + 2, + "Reply with exactly: before-reconnect", + threadId, + () => tuiClosed + ); + console.log("[manual e2e] first turn accepted"); + + await closeAppServer(); + console.log("[manual e2e] closed app-server to force reconnect"); + + const resumedAppServer = await startManualAppServer( + { + persistentThread: true, + threadModel: model, + }, + "[manual e2e] failed to restart the Codex app-server" + ); + const resumedRemoteUrl = resumedAppServer.remoteUrl; + const resumedThreadId = resumedAppServer.threadId; + updateRunManifest(manifestPath, (manifest) => + manifest + ? { + ...manifest, + codexRemoteUrl: resumedRemoteUrl, + codexThreadId: resumedThreadId, + } + : manifest + ); + await waitForProxyReady(proxyUrl); + console.log("[manual e2e] proxy reconnected"); + + appendBridgeMessage( + runDir, + "claude", + "codex", + "Reply with exactly: bridge-after-reconnect" + ); + await waitFor( + () => readPendingBridgeMessages(runDir).length === 0, + TURN_TIMEOUT_MS, + "[manual e2e] bridge message was not delivered after reconnect" + ); + console.log("[manual e2e] bridge delivery survived reconnect"); + + await sendTurn( + tui, + tuiMessages, + 3, + "Reply with exactly: after-reconnect", + resumedThreadId, + () => tuiClosed + ); + console.log("[manual e2e] second turn accepted"); + console.log("[manual e2e] success"); + } finally { + tui?.close(); + updateRunManifest(manifestPath, (manifest) => + manifest + ? { + ...manifest, + state: "completed", + status: "completed", + } + : manifest + ); + await Promise.race([ + proxyTask ?? Promise.resolve(), + new Promise((resolve) => setTimeout(resolve, 2000)), + ]); + proxyProcess?.kill(); + await closeAppServer(); + killTmuxSession(tmuxSession); + rmSync(runDir, { force: true, recursive: true }); + } +}; + +if (import.meta.main) { + main().catch((error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + console.error(message); + process.exit(1); + }); +} diff --git a/tests/loop/codex-tmux-proxy.test.ts b/tests/loop/codex-tmux-proxy.test.ts index 92fe25c..0b5093e 100644 --- a/tests/loop/codex-tmux-proxy.test.ts +++ b/tests/loop/codex-tmux-proxy.test.ts @@ -2,10 +2,21 @@ import { expect, test } from "bun:test"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import { codexTmuxProxyInternals } from "../../src/loop/codex-tmux-proxy"; +import { type ServerWebSocket, serve } from "bun"; +import { + appendBridgeMessage, + readPendingBridgeMessages, +} from "../../src/loop/bridge-store"; +import { + codexTmuxProxyInternals, + runCodexTmuxProxy, + waitForCodexTmuxProxy, +} from "../../src/loop/codex-tmux-proxy"; +import { findFreePort } from "../../src/loop/ports"; import { createRunManifest, readRunManifest, + updateRunManifest, writeRunManifest, } from "../../src/loop/run-state"; @@ -18,8 +29,115 @@ const bridgeMessage = { target: "codex" as const, }; +const TEST_PORT_RANGE = 200; +const TEST_PORT_RETRY_LIMIT = 5; +const TEST_PORT_START = 20_000; +const TEST_PORT_WINDOW = 20_000; + +interface JsonFrame { + error?: unknown; + id?: number | string; + method?: string; + params?: unknown; + result?: unknown; +} + const makeTempDir = (): string => mkdtempSync(join(tmpdir(), "loop-proxy-")); +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null; + +const asString = (value: unknown): string | undefined => + typeof value === "string" && value.length > 0 ? value : undefined; + +const isBridgeRequestId = (value: unknown): boolean => + typeof value === "string" && value.startsWith("proxy-bridge-"); + +const isAddressInUseError = (error: unknown): boolean => { + if (!isRecord(error)) { + return false; + } + const code = asString(error.code); + if (code === "EADDRINUSE") { + return true; + } + const message = asString(error.message)?.toLowerCase() ?? ""; + return message.includes("eaddrinuse"); +}; + +const randomTestPortBase = (): number => + TEST_PORT_START + Math.floor(Math.random() * TEST_PORT_WINDOW); + +const findTestPort = (): Promise => + findFreePort(randomTestPortBase(), TEST_PORT_RANGE); + +const startServerWithRetries = async ( + createServer: (port: number) => ReturnType +): Promise<{ port: number; server: ReturnType }> => { + let lastError: unknown; + for (let attempt = 0; attempt < TEST_PORT_RETRY_LIMIT; attempt += 1) { + const port = await findTestPort(); + try { + return { + port, + server: createServer(port), + }; + } catch (error) { + if (!isAddressInUseError(error)) { + throw error; + } + lastError = error; + } + } + throw lastError instanceof Error + ? lastError + : new Error("failed to start test server"); +}; + +const startProxyWithRetries = async ( + runDir: string, + remoteUrl: string, + threadId: string +): Promise<{ proxyTask: Promise; proxyUrl: string }> => { + let lastError: unknown; + for (let attempt = 0; attempt < TEST_PORT_RETRY_LIMIT; attempt += 1) { + const port = await findTestPort(); + const proxyTask = runCodexTmuxProxy(runDir, remoteUrl, threadId, port); + try { + const proxyUrl = await Promise.race([ + waitForCodexTmuxProxy(port), + proxyTask.then(() => { + throw new Error("codex tmux proxy stopped before becoming ready"); + }), + ]); + return { proxyTask, proxyUrl }; + } catch (error) { + if (!isAddressInUseError(error)) { + throw error; + } + await proxyTask.catch(() => undefined); + lastError = error; + } + } + throw lastError instanceof Error + ? lastError + : new Error("failed to start codex tmux proxy"); +}; + +const waitFor = async ( + predicate: () => boolean, + timeoutMs = 5000 +): Promise => { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (predicate()) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 25)); + } + throw new Error("timed out waiting for condition"); +}; + test("codex tmux proxy waits briefly for the tmux session to appear", () => { const now = Date.now(); @@ -171,3 +289,688 @@ test("codex tmux proxy persists newer live thread ids to the run manifest", () = rmSync(root, { recursive: true, force: true }); }); + +test("codex tmux proxy reconnects to a live upstream without dropping the tui socket", async () => { + const root = makeTempDir(); + const manifestPath = join(root, "manifest.json"); + const bridgeMethods: string[] = []; + const tuiTurnIds: string[] = []; + const upstreamSockets: ServerWebSocket<{ initialized: boolean }>[] = []; + let upstreamInitializeCount = 0; + let upstreamConnections = 0; + let proxyTask: Promise | undefined; + let upstreamServer: ReturnType | undefined; + let upstreamPort = 0; + let proxyUrl = ""; + const upstreamStart = await startServerWithRetries((port) => + serve({ + fetch: (request, server) => { + if (server.upgrade(request, { data: { initialized: false } })) { + return undefined; + } + return new Response("upstream"); + }, + hostname: "127.0.0.1", + port, + websocket: { + close: (ws) => { + const index = upstreamSockets.indexOf(ws); + if (index !== -1) { + upstreamSockets.splice(index, 1); + } + }, + message: (ws, message) => { + const payload = + typeof message === "string" ? message : message.toString(); + for (const raw of payload.split("\n")) { + if (!raw.trim()) { + continue; + } + const frame = JSON.parse(raw) as JsonFrame; + if (frame.method === "initialize") { + upstreamInitializeCount += 1; + if (ws.data.initialized) { + ws.send( + JSON.stringify({ + error: { message: "already initialized" }, + id: frame.id, + }) + ); + } else { + ws.data.initialized = true; + ws.send(JSON.stringify({ id: frame.id, result: {} })); + } + continue; + } + if (frame.method === "initialized") { + continue; + } + if (frame.method === "thread/read") { + ws.send( + JSON.stringify({ + id: frame.id, + result: { + thread: { + turns: [], + }, + }, + }) + ); + continue; + } + if (frame.method === "turn/start") { + const turnId = isBridgeRequestId(frame.id) + ? "bridge-turn-after-reconnect" + : `tui-turn-${tuiTurnIds.length + 1}`; + if (isBridgeRequestId(frame.id)) { + bridgeMethods.push(frame.method); + } else { + tuiTurnIds.push(turnId); + } + ws.send( + JSON.stringify({ + id: frame.id, + result: { turn: { id: turnId } }, + }) + ); + continue; + } + if (isBridgeRequestId(frame.id) && frame.method) { + bridgeMethods.push(frame.method); + } + } + }, + open: (ws) => { + upstreamConnections += 1; + upstreamSockets.push(ws); + }, + }, + }) + ); + upstreamPort = upstreamStart.port; + upstreamServer = upstreamStart.server; + const upstreamUrl = `ws://127.0.0.1:${upstreamPort}/`; + const tuiMessages: JsonFrame[] = []; + let tuiClosed = false; + + writeRunManifest( + manifestPath, + createRunManifest({ + claudeSessionId: "claude-1", + codexRemoteUrl: upstreamUrl, + codexThreadId: "thread-1", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "9", + state: "working", + status: "running", + }) + ); + + let tui: WebSocket | undefined; + try { + const proxyStart = await startProxyWithRetries( + root, + upstreamUrl, + "thread-1" + ); + proxyTask = proxyStart.proxyTask; + proxyUrl = proxyStart.proxyUrl; + tui = new WebSocket(proxyUrl); + tui.onclose = () => { + tuiClosed = true; + }; + tui.onmessage = (event) => { + tuiMessages.push(JSON.parse(String(event.data)) as JsonFrame); + }; + + await new Promise((resolve, reject) => { + if (!tui) { + reject(new Error("missing tui websocket")); + return; + } + tui.onopen = () => resolve(); + tui.onerror = () => reject(new Error("failed to open tui websocket")); + }); + + tui.send(JSON.stringify({ id: 1, method: "initialize", params: {} })); + await waitFor( + () => tuiMessages.some((frame) => frame.id === 1 && frame.result), + 5000 + ); + expect(upstreamInitializeCount).toBe(1); + + tui.send( + JSON.stringify({ + id: 2, + method: "turn/start", + params: { + input: [ + { + text: "hello before reconnect", + text_elements: [], + type: "text", + }, + ], + threadId: "thread-1", + }, + }) + ); + await waitFor( + () => + tuiMessages.some( + (frame) => + frame.id === 2 && + (frame.result as { turn?: { id?: string } } | undefined)?.turn?.id + ), + 5000 + ); + + upstreamSockets[0]?.close(); + await waitFor(() => upstreamConnections >= 2, 5000); + expect(tuiClosed).toBe(false); + expect(upstreamInitializeCount).toBe(2); + + appendBridgeMessage( + root, + bridgeMessage.source, + bridgeMessage.target, + bridgeMessage.message + ); + await waitFor(() => bridgeMethods.length > 0, 5000); + expect(bridgeMethods).toEqual(["turn/start"]); + + tui.send( + JSON.stringify({ + id: 3, + method: "turn/start", + params: { + input: [ + { + text: "hello after reconnect", + text_elements: [], + type: "text", + }, + ], + threadId: "thread-1", + }, + }) + ); + + await waitFor( + () => + tuiMessages.some( + (frame) => + frame.id === 3 && + (frame.result as { turn?: { id?: string } } | undefined)?.turn?.id + ), + 5000 + ); + expect(tuiClosed).toBe(false); + } finally { + tui?.close(); + updateRunManifest(manifestPath, (manifest) => + manifest + ? { + ...manifest, + state: "completed", + status: "completed", + } + : manifest + ); + await Promise.race([ + proxyTask ?? Promise.resolve(), + new Promise((resolve) => setTimeout(resolve, 2000)), + ]); + upstreamServer?.stop(true); + rmSync(root, { recursive: true, force: true }); + } +}); + +test("codex tmux proxy steers bridge messages into the resumed active turn after reconnect", async () => { + const root = makeTempDir(); + const manifestPath = join(root, "manifest.json"); + const bridgeMethods: string[] = []; + const upstreamSockets: ServerWebSocket<{ initialized: boolean }>[] = []; + let activeTurnId = ""; + let proxyTask: Promise | undefined; + let threadReadCount = 0; + let upstreamConnections = 0; + + const upstreamStart = await startServerWithRetries((port) => + serve({ + fetch: (request, server) => { + if (server.upgrade(request, { data: { initialized: false } })) { + return undefined; + } + return new Response("upstream"); + }, + hostname: "127.0.0.1", + port, + websocket: { + close: (ws) => { + const index = upstreamSockets.indexOf(ws); + if (index !== -1) { + upstreamSockets.splice(index, 1); + } + }, + message: (ws, message) => { + const payload = + typeof message === "string" ? message : message.toString(); + for (const raw of payload.split("\n")) { + if (!raw.trim()) { + continue; + } + const frame = JSON.parse(raw) as JsonFrame; + if (frame.method === "initialize") { + if (ws.data.initialized) { + ws.send( + JSON.stringify({ + error: { message: "already initialized" }, + id: frame.id, + }) + ); + } else { + ws.data.initialized = true; + ws.send(JSON.stringify({ id: frame.id, result: {} })); + } + continue; + } + if (frame.method === "initialized") { + continue; + } + if (frame.method === "thread/read") { + threadReadCount += 1; + ws.send( + JSON.stringify({ + id: frame.id, + result: { + thread: { + turns: activeTurnId + ? [{ id: activeTurnId, status: "inProgress" }] + : [], + }, + }, + }) + ); + continue; + } + if (frame.method === "turn/start") { + if (isBridgeRequestId(frame.id)) { + bridgeMethods.push(frame.method); + ws.send( + JSON.stringify({ + error: { message: "turn still active" }, + id: frame.id, + }) + ); + continue; + } + activeTurnId = "tui-turn-1"; + ws.send( + JSON.stringify({ + id: frame.id, + result: { turn: { id: activeTurnId } }, + }) + ); + continue; + } + if (frame.method === "turn/steer") { + bridgeMethods.push(frame.method); + ws.send( + JSON.stringify({ + id: frame.id, + result: { turn: { id: activeTurnId } }, + }) + ); + } + } + }, + open: (ws) => { + upstreamConnections += 1; + upstreamSockets.push(ws); + }, + }, + }) + ); + const upstreamServer = upstreamStart.server; + const upstreamUrl = `ws://127.0.0.1:${upstreamStart.port}/`; + const tuiMessages: JsonFrame[] = []; + + writeRunManifest( + manifestPath, + createRunManifest({ + claudeSessionId: "claude-1", + codexRemoteUrl: upstreamUrl, + codexThreadId: "thread-1", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "10", + state: "working", + status: "running", + }) + ); + + let tui: WebSocket | undefined; + try { + const proxyStart = await startProxyWithRetries( + root, + upstreamUrl, + "thread-1" + ); + proxyTask = proxyStart.proxyTask; + tui = new WebSocket(proxyStart.proxyUrl); + tui.onmessage = (event) => { + tuiMessages.push(JSON.parse(String(event.data)) as JsonFrame); + }; + + await new Promise((resolve, reject) => { + if (!tui) { + reject(new Error("missing tui websocket")); + return; + } + tui.onopen = () => resolve(); + tui.onerror = () => reject(new Error("failed to open tui websocket")); + }); + + tui.send(JSON.stringify({ id: 1, method: "initialize", params: {} })); + await waitFor( + () => tuiMessages.some((frame) => frame.id === 1 && frame.result), + 5000 + ); + + tui.send( + JSON.stringify({ + id: 2, + method: "turn/start", + params: { + input: [ + { + text: "hello before reconnect", + text_elements: [], + type: "text", + }, + ], + threadId: "thread-1", + }, + }) + ); + await waitFor( + () => + tuiMessages.some( + (frame) => + frame.id === 2 && + (frame.result as { turn?: { id?: string } } | undefined)?.turn?.id + ), + 5000 + ); + + upstreamSockets[0]?.close(); + await waitFor(() => upstreamConnections >= 2, 5000); + await waitFor(() => threadReadCount >= 2, 5000); + + appendBridgeMessage( + root, + bridgeMessage.source, + bridgeMessage.target, + bridgeMessage.message + ); + await waitFor( + () => + bridgeMethods.length > 0 && + readPendingBridgeMessages(root).length === 0, + 5000 + ); + expect(bridgeMethods).toEqual(["turn/steer"]); + } finally { + tui?.close(); + updateRunManifest(manifestPath, (manifest) => + manifest + ? { + ...manifest, + state: "completed", + status: "completed", + } + : manifest + ); + await Promise.race([ + proxyTask ?? Promise.resolve(), + new Promise((resolve) => setTimeout(resolve, 2000)), + ]); + upstreamServer.stop(true); + rmSync(root, { recursive: true, force: true }); + } +}); + +test("codex tmux proxy falls back to a fresh bridge turn when steer fails after reconnect", async () => { + const root = makeTempDir(); + const manifestPath = join(root, "manifest.json"); + const bridgeMethods: string[] = []; + const upstreamSockets: ServerWebSocket<{ initialized: boolean }>[] = []; + let activeTurnId = ""; + let proxyTask: Promise | undefined; + let threadReadCount = 0; + let upstreamConnections = 0; + let steerAttempts = 0; + + const upstreamStart = await startServerWithRetries((port) => + serve({ + fetch: (request, server) => { + if (server.upgrade(request, { data: { initialized: false } })) { + return undefined; + } + return new Response("upstream"); + }, + hostname: "127.0.0.1", + port, + websocket: { + close: (ws) => { + const index = upstreamSockets.indexOf(ws); + if (index !== -1) { + upstreamSockets.splice(index, 1); + } + }, + message: (ws, message) => { + const payload = + typeof message === "string" ? message : message.toString(); + for (const raw of payload.split("\n")) { + if (!raw.trim()) { + continue; + } + const frame = JSON.parse(raw) as JsonFrame; + if (frame.method === "initialize") { + if (ws.data.initialized) { + ws.send( + JSON.stringify({ + error: { message: "already initialized" }, + id: frame.id, + }) + ); + } else { + ws.data.initialized = true; + ws.send(JSON.stringify({ id: frame.id, result: {} })); + } + continue; + } + if (frame.method === "initialized") { + continue; + } + if (frame.method === "thread/read") { + threadReadCount += 1; + ws.send( + JSON.stringify({ + id: frame.id, + result: { + thread: { + turns: activeTurnId + ? [{ id: activeTurnId, status: "inProgress" }] + : [], + }, + }, + }) + ); + continue; + } + if (frame.method === "turn/start") { + if (isBridgeRequestId(frame.id)) { + bridgeMethods.push(frame.method); + activeTurnId = "bridge-turn-after-reconnect"; + ws.send( + JSON.stringify({ + id: frame.id, + result: { turn: { id: activeTurnId } }, + }) + ); + continue; + } + activeTurnId = "tui-turn-1"; + ws.send( + JSON.stringify({ + id: frame.id, + result: { turn: { id: activeTurnId } }, + }) + ); + continue; + } + if (frame.method === "turn/steer") { + bridgeMethods.push(frame.method); + steerAttempts += 1; + if (steerAttempts === 1) { + ws.send( + JSON.stringify({ + error: { message: "cannot steer resumed turn" }, + id: frame.id, + }) + ); + continue; + } + ws.send( + JSON.stringify({ + id: frame.id, + result: { turn: { id: activeTurnId } }, + }) + ); + } + } + }, + open: (ws) => { + upstreamConnections += 1; + upstreamSockets.push(ws); + }, + }, + }) + ); + const upstreamServer = upstreamStart.server; + const upstreamUrl = `ws://127.0.0.1:${upstreamStart.port}/`; + const tuiMessages: JsonFrame[] = []; + + writeRunManifest( + manifestPath, + createRunManifest({ + claudeSessionId: "claude-1", + codexRemoteUrl: upstreamUrl, + codexThreadId: "thread-1", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "11", + state: "working", + status: "running", + }) + ); + + let tui: WebSocket | undefined; + try { + const proxyStart = await startProxyWithRetries( + root, + upstreamUrl, + "thread-1" + ); + proxyTask = proxyStart.proxyTask; + tui = new WebSocket(proxyStart.proxyUrl); + tui.onmessage = (event) => { + tuiMessages.push(JSON.parse(String(event.data)) as JsonFrame); + }; + + await new Promise((resolve, reject) => { + if (!tui) { + reject(new Error("missing tui websocket")); + return; + } + tui.onopen = () => resolve(); + tui.onerror = () => reject(new Error("failed to open tui websocket")); + }); + + tui.send(JSON.stringify({ id: 1, method: "initialize", params: {} })); + await waitFor( + () => tuiMessages.some((frame) => frame.id === 1 && frame.result), + 5000 + ); + + tui.send( + JSON.stringify({ + id: 2, + method: "turn/start", + params: { + input: [ + { + text: "hello before reconnect", + text_elements: [], + type: "text", + }, + ], + threadId: "thread-1", + }, + }) + ); + await waitFor( + () => + tuiMessages.some( + (frame) => + frame.id === 2 && + (frame.result as { turn?: { id?: string } } | undefined)?.turn?.id + ), + 5000 + ); + + upstreamSockets[0]?.close(); + await waitFor(() => upstreamConnections >= 2, 5000); + await waitFor(() => threadReadCount >= 2, 5000); + + appendBridgeMessage( + root, + bridgeMessage.source, + bridgeMessage.target, + bridgeMessage.message + ); + await waitFor( + () => + bridgeMethods.length > 1 && + readPendingBridgeMessages(root).length === 0, + 5000 + ); + expect(bridgeMethods).toEqual(["turn/steer", "turn/start"]); + } finally { + tui?.close(); + updateRunManifest(manifestPath, (manifest) => + manifest + ? { + ...manifest, + state: "completed", + status: "completed", + } + : manifest + ); + await Promise.race([ + proxyTask ?? Promise.resolve(), + new Promise((resolve) => setTimeout(resolve, 2000)), + ]); + upstreamServer.stop(true); + rmSync(root, { recursive: true, force: true }); + } +});