diff --git a/src/loop/bridge-claude-registration.ts b/src/loop/bridge-claude-registration.ts new file mode 100644 index 0000000..fdac7c5 --- /dev/null +++ b/src/loop/bridge-claude-registration.ts @@ -0,0 +1,88 @@ +import { buildClaudeChannelServerConfig } from "./bridge-config"; + +const CLAUDE_CHANNEL_SCOPE = "local"; +const MCP_ALREADY_EXISTS_RE = /already exists/i; + +interface BridgeCommandResult { + exitCode?: number | null; + stderr?: string | Uint8Array; +} + +type BridgeCommand = (args: string[]) => BridgeCommandResult; + +const stderrText = (value: BridgeCommandResult["stderr"]): string => { + if (!value) { + return ""; + } + if (typeof value === "string") { + return value.trim(); + } + return new TextDecoder().decode(value).trim(); +}; + +const logClaudeChannelServerRemovalFailure = ( + serverName: string, + detail: string, + log: (line: string) => void +): void => { + log( + `[loop] failed to remove Claude channel server "${serverName}": ${detail}` + ); +}; + +export const registerClaudeChannelServer = ( + launchArgv: string[], + serverName: string, + runDir: string, + runCommand: BridgeCommand +): void => { + const result = runCommand([ + "claude", + "mcp", + "add-json", + "--scope", + CLAUDE_CHANNEL_SCOPE, + serverName, + buildClaudeChannelServerConfig(launchArgv, runDir), + ]); + const stderr = stderrText(result.stderr); + if (result.exitCode === 0 || MCP_ALREADY_EXISTS_RE.test(stderr)) { + return; + } + const suffix = stderr ? `: ${stderr}` : "."; + throw new Error(`[loop] failed to register Claude channel server${suffix}`); +}; + +export const removeClaudeChannelServer = ( + serverName: string, + runCommand: BridgeCommand, + log: (line: string) => void = console.error +): void => { + if (!serverName) { + return; + } + try { + const result = runCommand([ + "claude", + "mcp", + "remove", + "--scope", + CLAUDE_CHANNEL_SCOPE, + serverName, + ]); + if (result.exitCode === 0) { + return; + } + logClaudeChannelServerRemovalFailure( + serverName, + stderrText(result.stderr) || `exit code ${result.exitCode ?? "unknown"}`, + log + ); + } catch (error: unknown) { + logClaudeChannelServerRemovalFailure( + serverName, + error instanceof Error ? error.message : String(error), + log + ); + } +}; diff --git a/src/loop/bridge-config.ts b/src/loop/bridge-config.ts index 7b46a39..193a7c0 100644 --- a/src/loop/bridge-config.ts +++ b/src/loop/bridge-config.ts @@ -1,6 +1,7 @@ import { mkdirSync, writeFileSync } from "node:fs"; import { dirname, join } from "node:path"; import { BRIDGE_SERVER, BRIDGE_SUBCOMMAND } from "./bridge-constants"; +import { sanitizeBase } from "./git"; import { buildLaunchArgv } from "./launch"; import type { Agent } from "./types"; @@ -10,6 +11,7 @@ const CODEX_AUTO_APPROVED_BRIDGE_TOOLS = [ "receive_messages", ] as const; const CODEX_BRIDGE_APPROVAL_MODE = "approve"; +const REPO_ID_HASH_SUFFIX_RE = /-[0-9a-f]{12}$/; const ensureParentDir = (path: string): void => { mkdirSync(dirname(path), { recursive: true }); @@ -17,12 +19,102 @@ const ensureParentDir = (path: string): void => { const stringifyToml = (value: string): string => JSON.stringify(value); +interface BridgeServerConfig { + args: string[]; + command: string; + type: "stdio"; +} + +const buildBridgeServerConfig = ( + runDir: string, + source: Agent, + launchArgv: string[] +): BridgeServerConfig => { + const [command, ...baseArgs] = launchArgv; + return { + args: [...baseArgs, BRIDGE_SUBCOMMAND, runDir, source], + command, + type: "stdio", + }; +}; + +const buildBridgeFileConfig = ( + serverName: string, + config: BridgeServerConfig +): { mcpServers: Record } => ({ + mcpServers: { + [serverName]: config, + }, +}); + +export const legacyClaudeChannelServerName = (runId: string): string => + `${BRIDGE_SERVER}-${sanitizeBase(runId)}`; + +const readableRepoSegment = (repoId?: string): string | undefined => { + const value = repoId?.trim(); + if (!value) { + return undefined; + } + return sanitizeBase(value.replace(REPO_ID_HASH_SUFFIX_RE, "")); +}; + +export const legacyRepoScopedClaudeChannelServerName = ( + runId: string, + repoId: string +): string => `${BRIDGE_SERVER}-${sanitizeBase(repoId)}-${sanitizeBase(runId)}`; + +export const claudeChannelServerName = ( + runId: string, + repoId?: string +): string => { + const repoSegment = readableRepoSegment(repoId); + return repoSegment + ? `${BRIDGE_SERVER}-${repoSegment}-${sanitizeBase(runId)}` + : legacyClaudeChannelServerName(runId); +}; + +export const generatedClaudeChannelServerNames = ( + runId: string, + repoId?: string +): string[] => { + const names = [legacyClaudeChannelServerName(runId)]; + if (!repoId?.trim()) { + return names; + } + return Array.from( + new Set([ + claudeChannelServerName(runId, repoId), + legacyRepoScopedClaudeChannelServerName(runId, repoId), + ...names, + ]) + ); +}; + +export const resolveClaudeChannelServerName = ( + runId: string, + repoId: string | undefined, + storedName?: string +): string => { + const nextServer = claudeChannelServerName(runId, repoId); + if (!storedName?.trim()) { + return nextServer; + } + return generatedClaudeChannelServerNames(runId, repoId).includes(storedName) + ? nextServer + : storedName; +}; + +export const buildClaudeChannelServerConfig = ( + launchArgv: string[], + runDir: string +): string => + JSON.stringify(buildBridgeServerConfig(runDir, "claude", launchArgv)); + export const buildCodexBridgeConfigArgs = ( runDir: string, source: Agent ): string[] => { - const [command, ...baseArgs] = buildLaunchArgv(); - const args = [...baseArgs, BRIDGE_SUBCOMMAND, runDir, source]; + const config = buildBridgeServerConfig(runDir, source, buildLaunchArgv()); const approvalArgs = CODEX_AUTO_APPROVED_BRIDGE_TOOLS.flatMap((tool) => [ "-c", `mcp_servers.${BRIDGE_SERVER}.tools.${tool}.approval_mode=${stringifyToml( @@ -31,32 +123,27 @@ export const buildCodexBridgeConfigArgs = ( ]); return [ "-c", - `mcp_servers.${BRIDGE_SERVER}.command=${stringifyToml(command)}`, + `mcp_servers.${BRIDGE_SERVER}.command=${stringifyToml(config.command)}`, "-c", - `mcp_servers.${BRIDGE_SERVER}.args=${JSON.stringify(args)}`, + `mcp_servers.${BRIDGE_SERVER}.args=${JSON.stringify(config.args)}`, ...approvalArgs, ]; }; export const ensureClaudeBridgeConfig = ( runDir: string, - source: Agent + source: Agent, + serverName = BRIDGE_SERVER ): string => { - const [command, ...baseArgs] = buildLaunchArgv(); const path = join(runDir, `${source}-mcp.json`); ensureParentDir(path); writeFileSync( path, `${JSON.stringify( - { - mcpServers: { - [BRIDGE_SERVER]: { - args: [...baseArgs, BRIDGE_SUBCOMMAND, runDir, source], - command, - type: "stdio", - }, - }, - }, + buildBridgeFileConfig( + serverName, + buildBridgeServerConfig(runDir, source, buildLaunchArgv()) + ), null, 2 )}\n`, diff --git a/src/loop/bridge-dispatch.ts b/src/loop/bridge-dispatch.ts index c6eead5..5df4c6b 100644 --- a/src/loop/bridge-dispatch.ts +++ b/src/loop/bridge-dispatch.ts @@ -24,9 +24,6 @@ export const bridgeChatId = (runDir: string): string => { return `codex_${runId}`; }; -export const isActiveBridgeChatId = (runDir: string, chatId: string): boolean => - chatId === bridgeChatId(runDir); - export const acknowledgeBridgeDelivery = ( runDir: string, message: BridgeMessage, diff --git a/src/loop/bridge-guidance.ts b/src/loop/bridge-guidance.ts index 37d90a7..23b2cf6 100644 --- a/src/loop/bridge-guidance.ts +++ b/src/loop/bridge-guidance.ts @@ -3,12 +3,6 @@ import type { Agent } from "./types"; const bridgeTargetLiteral = (agent: Agent): string => `target: "${agent}"`; -export const claudeReplyGuidance = - 'When you are replying to an inbound channel message, use the "reply" tool and pass back the same chat_id.'; - -export const claudeTmuxReplyGuidance = - 'Reply to inbound Codex channel messages with the "reply" tool and the same chat_id.'; - export const bridgeStatusStuckGuidance = 'Use "bridge_status" only when direct delivery appears stuck.'; @@ -19,13 +13,12 @@ export const sendToClaudeGuidance = (): string => `Use "send_to_agent" with ${bridgeTargetLiteral("claude")} for Claude-facing messages, not a human-facing message.`; export const sendProactiveCodexGuidance = (): string => - `Use "send_to_agent" with ${bridgeTargetLiteral("codex")} only for new proactive messages to Codex; do not send Codex-facing responses as a human-facing message.`; + `Use "send_to_agent" with ${bridgeTargetLiteral("codex")} for Codex-facing messages, including replies to inbound Codex channel messages; do not send Codex-facing responses as a human-facing message.`; export const claudeChannelInstructions = (): string => [ - `Messages from the Codex agent arrive as .`, - claudeReplyGuidance, - "Never answer the human when the inbound message came from Codex. Send the response back through the bridge tools instead.", + `Messages from the Codex agent arrive as . The chat_id is informational only.`, sendProactiveCodexGuidance(), + "Never answer the human when the inbound message came from Codex. Send the response back through the bridge tools instead.", bridgeStatusStuckGuidance, ].join("\n"); diff --git a/src/loop/bridge-message-format.ts b/src/loop/bridge-message-format.ts new file mode 100644 index 0000000..35a6408 --- /dev/null +++ b/src/loop/bridge-message-format.ts @@ -0,0 +1,18 @@ +import type { Agent } from "./types"; + +const BRIDGE_PREFIX_RE = + /^(?:Message from (?:Claude|Codex) via the loop bridge:|(?:Claude|Codex):)\s*/i; + +export const formatCodexBridgeMessage = ( + source: Agent, + message: string +): string => { + const trimmed = message.trim(); + if (!trimmed) { + return ""; + } + return source === "claude" ? `Claude: ${trimmed}` : trimmed; +}; + +export const normalizeBridgeMessage = (message: string): string => + message.trim().replace(BRIDGE_PREFIX_RE, "").replace(/\s+/g, " "); diff --git a/src/loop/bridge-runtime.ts b/src/loop/bridge-runtime.ts index 0cfb99a..a46df79 100644 --- a/src/loop/bridge-runtime.ts +++ b/src/loop/bridge-runtime.ts @@ -1,21 +1,31 @@ +import { existsSync, readFileSync, rmSync, writeFileSync } from "node:fs"; import { join } from "node:path"; -import { spawnSync } from "bun"; -import { BRIDGE_SERVER, CLAUDE_CHANNEL_USER } from "./bridge-constants"; +import { spawn, spawnSync } from "bun"; +import { removeClaudeChannelServer } from "./bridge-claude-registration"; +import { generatedClaudeChannelServerNames } from "./bridge-config"; +import { + BRIDGE_WORKER_SUBCOMMAND, + CLAUDE_CHANNEL_USER, +} from "./bridge-constants"; import { acknowledgeBridgeDelivery, bridgeChatId, readNextPendingBridgeMessageForTarget, } from "./bridge-dispatch"; +import { formatCodexBridgeMessage } from "./bridge-message-format"; import { type BridgeMessage, + type BridgeStatus, readBridgeInbox, readBridgeStatus, } from "./bridge-store"; import { injectCodexMessage } from "./codex-app-server"; -import { sanitizeBase } from "./git"; +import { buildLaunchArgv } from "./launch"; +import { DETACH_CHILD_PROCESS } from "./process"; import { isActiveRunState, parseRunLifecycleState, + readRunManifest, touchRunManifest, updateRunManifest, } from "./run-state"; @@ -23,12 +33,65 @@ import { const CLAUDE_CHANNEL_METHOD = "notifications/claude/channel"; const CLAUDE_CHANNEL_SOURCE_TYPE = "codex"; const CLAUDE_CHANNEL_USER_ID = "codex"; +const BRIDGE_WORKER_FILE = "bridge-worker.json"; +const BRIDGE_WORKER_IDLE_DELAY_MS = 250; +const BRIDGE_WORKER_SUCCESS_DELAY_MS = 100; const CODEX_TMUX_PANE = "0.1"; const CODEX_TMUX_READY_DELAY_MS = 250; const CODEX_TMUX_READY_POLLS = 20; const CODEX_TMUX_SEND_FOOTER = "Ctrl+J newline"; -export const bridgeRuntimeCommandDeps = { spawnSync }; +export const bridgeRuntimeCommandDeps = { spawn, spawnSync }; + +const bridgeWorkerPath = (runDir: string): string => + join(runDir, BRIDGE_WORKER_FILE); + +const readBridgeWorkerPid = (runDir: string): number | undefined => { + const path = bridgeWorkerPath(runDir); + if (!existsSync(path)) { + return undefined; + } + try { + const parsed = JSON.parse(readFileSync(path, "utf8")) as unknown; + if ( + typeof parsed === "object" && + parsed !== null && + typeof (parsed as { pid?: unknown }).pid === "number" && + Number.isInteger((parsed as { pid: number }).pid) && + (parsed as { pid: number }).pid > 0 + ) { + return (parsed as { pid: number }).pid; + } + } catch { + // ignore malformed worker state + } + return undefined; +}; + +const writeBridgeWorkerPid = (runDir: string, pid: number): void => { + writeFileSync( + bridgeWorkerPath(runDir), + `${JSON.stringify({ pid })}\n`, + "utf8" + ); +}; + +const clearBridgeWorkerPid = (runDir: string, pid?: number): void => { + const current = readBridgeWorkerPid(runDir); + if (pid !== undefined && current !== pid) { + return; + } + rmSync(bridgeWorkerPath(runDir), { force: true }); +}; + +const isProcessAlive = (pid: number): boolean => { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +}; const wait = async (ms: number): Promise => { await new Promise((resolve) => { @@ -105,70 +168,95 @@ const injectCodexTmuxMessage = async ( }; const tmuxSessionExists = (session: string): boolean => { - const result = bridgeRuntimeCommandDeps.spawnSync( - ["tmux", "has-session", "-t", session], - { - stderr: "ignore", - stdout: "ignore", - } - ); - return result.exitCode === 0; -}; - -export const hasLiveCodexTmuxSession = (runDir: string): boolean => { - const { tmuxSession } = readBridgeStatus(runDir); - return Boolean(tmuxSession && tmuxSessionExists(tmuxSession)); + try { + const result = bridgeRuntimeCommandDeps.spawnSync( + ["tmux", "has-session", "-t", session], + { + stderr: "ignore", + stdout: "ignore", + } + ); + return result.exitCode === 0; + } catch { + return false; + } }; -export const claudeChannelServerName = (runId: string): string => - `${BRIDGE_SERVER}-${sanitizeBase(runId)}`; +export interface BridgeRuntimeStatus extends BridgeStatus { + codexDeliveryMode: "app-server" | "none" | "tmux"; + hasLiveTmuxSession: boolean; +} -const logClaudeChannelServerRemovalFailure = ( - serverName: string, - detail: string -): void => { - console.error( - `[loop] failed to remove Claude channel server "${serverName}": ${detail}` +export const readBridgeRuntimeStatus = ( + runDir: string +): BridgeRuntimeStatus => { + const status = readBridgeStatus(runDir); + const hasLiveTmuxSession = Boolean( + status.tmuxSession && tmuxSessionExists(status.tmuxSession) ); + let codexDeliveryMode: BridgeRuntimeStatus["codexDeliveryMode"] = "none"; + if (status.hasCodexRemote) { + codexDeliveryMode = "app-server"; + } else if (hasLiveTmuxSession) { + codexDeliveryMode = "tmux"; + } + return { + ...status, + codexDeliveryMode, + hasLiveTmuxSession, + }; }; -const removeClaudeChannelServer = (runId: string): void => { - if (!runId) { - return; +export const ensureBridgeWorker = (runDir: string): boolean => { + const status = readBridgeRuntimeStatus(runDir); + const state = parseRunLifecycleState(status.state); + if (!(status.hasCodexRemote && state && isActiveRunState(state))) { + return false; } - const serverName = claudeChannelServerName(runId); + const currentPid = readBridgeWorkerPid(runDir); + if (currentPid && isProcessAlive(currentPid)) { + return true; + } + clearBridgeWorkerPid(runDir); try { - const result = bridgeRuntimeCommandDeps.spawnSync( - ["claude", "mcp", "remove", "--scope", "local", serverName], + const child = bridgeRuntimeCommandDeps.spawn( + [...buildLaunchArgv(), BRIDGE_WORKER_SUBCOMMAND, runDir], { - stderr: "pipe", + detached: DETACH_CHILD_PROCESS, + env: process.env, + stderr: "ignore", + stdin: "ignore", stdout: "ignore", } ); - if (result.exitCode === 0) { - return; + if (!(typeof child.pid === "number" && child.pid > 0)) { + return false; } - const stderr = result.stderr ? decodeOutput(result.stderr).trim() : ""; - logClaudeChannelServerRemovalFailure( - serverName, - stderr || `exit code ${result.exitCode ?? "unknown"}` - ); - } catch (error: unknown) { - // Cleanup should not fail the bridge flow. - logClaudeChannelServerRemovalFailure( - serverName, - error instanceof Error ? error.message : String(error) - ); + writeBridgeWorkerPid(runDir, child.pid); + child.unref?.(); + return true; + } catch { + return false; } }; +export const hasLiveCodexTmuxSession = (runDir: string): boolean => { + const manifest = readRunManifest(join(runDir, "manifest.json")); + return Boolean( + manifest?.tmuxSession && tmuxSessionExists(manifest.tmuxSession) + ); +}; + export const clearStaleTmuxBridgeState = (runDir: string): boolean => { - let removedRunId = ""; + let removedServerNames: string[] = []; const next = updateRunManifest(join(runDir, "manifest.json"), (manifest) => { if (!manifest?.tmuxSession) { return manifest; } - removedRunId = manifest.runId; + removedServerNames = [ + manifest.claudeChannelServer, + ...generatedClaudeChannelServerNames(manifest.runId, manifest.repoId), + ].filter((name): name is string => Boolean(name)); return touchRunManifest( { ...manifest, @@ -177,10 +265,20 @@ export const clearStaleTmuxBridgeState = (runDir: string): boolean => { new Date().toISOString() ); }); - if (!(next && removedRunId)) { + if (!(next && removedServerNames.length > 0)) { return false; } - removeClaudeChannelServer(removedRunId); + for (const serverName of new Set(removedServerNames)) { + removeClaudeChannelServer( + serverName, + (args) => + bridgeRuntimeCommandDeps.spawnSync(args, { + stderr: "pipe", + stdout: "ignore", + }), + console.error + ); + } return true; }; @@ -220,24 +318,25 @@ export const deliverCodexBridgeMessage = async ( runDir: string, message: BridgeMessage ): Promise => { - const status = readBridgeStatus(runDir); - if (status.tmuxSession) { - if (tmuxSessionExists(status.tmuxSession)) { - return false; - } + const status = readBridgeRuntimeStatus(runDir); + if (status.tmuxSession && !status.hasLiveTmuxSession) { clearStaleTmuxBridgeState(runDir); } - if (!(status.codexRemoteUrl && status.codexThreadId)) { + if (!status.hasCodexRemote) { return false; } try { const delivered = await injectCodexMessage( status.codexRemoteUrl, status.codexThreadId, - message.message + formatCodexBridgeMessage(message.source, message.message) ); if (delivered) { - acknowledgeBridgeDelivery(runDir, message, "sent to codex app-server"); + acknowledgeBridgeDelivery( + runDir, + message, + "accepted by codex app-server" + ); } return delivered; } catch { @@ -248,11 +347,11 @@ export const deliverCodexBridgeMessage = async ( export const drainCodexTmuxMessages = async ( runDir: string ): Promise => { - const { tmuxSession } = readBridgeStatus(runDir); - if (!tmuxSession) { + const status = readBridgeRuntimeStatus(runDir); + if (!status.tmuxSession) { return false; } - if (!tmuxSessionExists(tmuxSession)) { + if (!status.hasLiveTmuxSession) { clearStaleTmuxBridgeState(runDir); return false; } @@ -260,7 +359,10 @@ export const drainCodexTmuxMessages = async ( if (!message) { return false; } - const delivered = await injectCodexTmuxMessage(tmuxSession, message.message); + const delivered = await injectCodexTmuxMessage( + status.tmuxSession, + formatCodexBridgeMessage(message.source, message.message) + ); if (!delivered) { return false; } @@ -268,21 +370,57 @@ export const drainCodexTmuxMessages = async ( return true; }; +export const drainCodexAppServerMessages = ( + runDir: string +): Promise => { + const status = readBridgeRuntimeStatus(runDir); + if (!status.hasCodexRemote) { + return Promise.resolve(false); + } + const message = readNextPendingBridgeMessageForTarget(runDir, "codex"); + if (!message) { + return Promise.resolve(false); + } + return deliverCodexBridgeMessage(runDir, message); +}; + +const clearStaleTmuxWorkerState = ( + runDir: string, + status: BridgeRuntimeStatus +): boolean => { + if (!(status.tmuxSession && !status.hasLiveTmuxSession)) { + return true; + } + clearStaleTmuxBridgeState(runDir); + return status.hasCodexRemote; +}; + export const runBridgeWorker = async (runDir: string): Promise => { - while (true) { - const status = readBridgeStatus(runDir); - const state = parseRunLifecycleState(status.state); - if (!(state && isActiveRunState(state))) { - return; - } - if (!status.tmuxSession) { - return; - } - if (!tmuxSessionExists(status.tmuxSession)) { - clearStaleTmuxBridgeState(runDir); - return; + try { + while (true) { + const claimedPid = readBridgeWorkerPid(runDir); + if (claimedPid && claimedPid !== process.pid) { + return; + } + const status = readBridgeRuntimeStatus(runDir); + const state = parseRunLifecycleState(status.state); + if (!(state && isActiveRunState(state))) { + return; + } + if (!clearStaleTmuxWorkerState(runDir, status)) { + return; + } + const delivered = status.hasCodexRemote + ? await drainCodexAppServerMessages(runDir) + : await drainCodexTmuxMessages(runDir); + if (!(status.hasCodexRemote || status.hasLiveTmuxSession)) { + return; + } + await wait( + delivered ? BRIDGE_WORKER_SUCCESS_DELAY_MS : BRIDGE_WORKER_IDLE_DELAY_MS + ); } - const delivered = await drainCodexTmuxMessages(runDir); - await wait(delivered ? 100 : CODEX_TMUX_READY_DELAY_MS); + } finally { + clearBridgeWorkerPid(runDir, process.pid); } }; diff --git a/src/loop/bridge-store.ts b/src/loop/bridge-store.ts index 2dd44be..565f6c3 100644 --- a/src/loop/bridge-store.ts +++ b/src/loop/bridge-store.ts @@ -1,6 +1,9 @@ import { createHash } from "node:crypto"; import { appendFileSync, existsSync, mkdirSync, readFileSync } from "node:fs"; import { dirname, join } from "node:path"; +import { resolveClaudeChannelServerName } from "./bridge-config"; +import { BRIDGE_SERVER } from "./bridge-constants"; +import { normalizeBridgeMessage } from "./bridge-message-format"; import { appendRunTranscriptEntry, buildTranscriptPath, @@ -11,8 +14,6 @@ import type { Agent } from "./types"; const BRIDGE_FILE = "bridge.jsonl"; const LINE_SPLIT_RE = /\r?\n/; const MAX_STATUS_MESSAGES = 100; -const BRIDGE_PREFIX_RE = - /^Message from (Claude|Codex) via the loop bridge:\s*/i; interface BridgeBaseEvent { at: string; @@ -36,9 +37,14 @@ interface BridgeAck extends BridgeBaseEvent { export type BridgeEvent = BridgeAck | BridgeMessage; export interface BridgeStatus { + bridgeServer: string; + claudeBridgeMode: "local-registration" | "mcp-config"; + claudeChannelServer: string; claudeSessionId: string; codexRemoteUrl: string; codexThreadId: string; + hasCodexRemote: boolean; + hasTmuxSession: boolean; pending: { claude: number; codex: number }; runId: string; state: string; @@ -59,9 +65,6 @@ export const normalizeAgent = (value: unknown): Agent | undefined => { return undefined; }; -const normalizeBridgeMessage = (message: string): string => - message.trim().replace(BRIDGE_PREFIX_RE, "").replace(/\s+/g, " "); - const orderedBridgePairKey = (source: Agent, target: Agent): string => `${source}>${target}`; @@ -231,15 +234,31 @@ const countPendingMessages = (runDir: string): BridgeStatus["pending"] => { export const readBridgeStatus = (runDir: string): BridgeStatus => { const manifest = readRunManifest(join(runDir, "manifest.json")); + const runId = manifest?.runId ?? ""; + const codexRemoteUrl = manifest?.codexRemoteUrl ?? ""; + const codexThreadId = manifest?.codexThreadId ?? ""; + const tmuxSession = manifest?.tmuxSession ?? ""; + const hasTmuxSession = Boolean(tmuxSession); return { + bridgeServer: BRIDGE_SERVER, + claudeBridgeMode: hasTmuxSession ? "local-registration" : "mcp-config", + claudeChannelServer: runId + ? resolveClaudeChannelServerName( + runId, + manifest?.repoId, + manifest?.claudeChannelServer + ) + : BRIDGE_SERVER, claudeSessionId: manifest?.claudeSessionId ?? "", - codexRemoteUrl: manifest?.codexRemoteUrl ?? "", - codexThreadId: manifest?.codexThreadId ?? "", + codexRemoteUrl, + codexThreadId, + hasCodexRemote: Boolean(codexRemoteUrl && codexThreadId), + hasTmuxSession, pending: countPendingMessages(runDir), - runId: manifest?.runId ?? "", + runId, state: manifest?.state ?? "unknown", status: manifest?.status ?? "unknown", - tmuxSession: manifest?.tmuxSession ?? "", + tmuxSession, }; }; diff --git a/src/loop/bridge.ts b/src/loop/bridge.ts index e202f8e..af3dcf2 100644 --- a/src/loop/bridge.ts +++ b/src/loop/bridge.ts @@ -1,19 +1,20 @@ +import { claudeChannelServerName } from "./bridge-config"; import { BRIDGE_SERVER as BRIDGE_SERVER_VALUE } from "./bridge-constants"; import { consumeBridgeInbox, dispatchBridgeMessage, formatDispatchResult, - isActiveBridgeChatId, } from "./bridge-dispatch"; import { claudeChannelInstructions } from "./bridge-guidance"; import { bridgeRuntimeCommandDeps, - claudeChannelServerName, clearStaleTmuxBridgeState, deliverCodexBridgeMessage, drainCodexTmuxMessages, + ensureBridgeWorker, flushClaudeChannelMessages, hasLiveCodexTmuxSession, + readBridgeRuntimeStatus, } from "./bridge-runtime"; import { appendBlockedBridgeMessage, @@ -23,7 +24,6 @@ import { formatBridgeInbox, normalizeAgent, readBridgeEvents, - readBridgeStatus, } from "./bridge-store"; import { LOOP_VERSION } from "./constants"; import type { Agent } from "./types"; @@ -117,7 +117,9 @@ const handleBridgeStatusTool = ( writeJsonRpc({ id, jsonrpc: "2.0", - result: toolContent(JSON.stringify(readBridgeStatus(runDir), null, 2)), + result: toolContent( + JSON.stringify(readBridgeRuntimeStatus(runDir), null, 2) + ), }); }; @@ -140,45 +142,6 @@ const handleReceiveMessagesTool = ( }); }; -const handleReplyTool = async ( - id: JsonRpcRequest["id"], - runDir: string, - source: Agent, - args: Record -): Promise => { - const chatId = asString(args.chat_id); - const text = asString(args.text); - if (!chatId) { - writeError(id, MCP_INVALID_PARAMS, "reply requires a chat_id"); - return; - } - if (!isActiveBridgeChatId(runDir, chatId)) { - writeError( - id, - MCP_INVALID_PARAMS, - "reply chat_id does not match the active bridge conversation" - ); - return; - } - if (!text) { - writeError(id, MCP_INVALID_PARAMS, "reply requires a non-empty text"); - return; - } - const result = await dispatchBridgeMessage( - runDir, - source, - "codex", - text, - (entry) => deliverCodexBridgeMessage(runDir, entry), - () => hasLiveCodexTmuxSession(runDir) - ); - writeJsonRpc({ - id, - jsonrpc: "2.0", - result: toolContent(formatDispatchResult(result)), - }); -}; - const handleSendToAgentTool = async ( id: JsonRpcRequest["id"], runDir: string, @@ -247,6 +210,13 @@ const handleSendToAgentTool = async ( : undefined, target === "codex" ? () => hasLiveCodexTmuxSession(runDir) : undefined ); + if ( + result.status === "queued" && + target === "codex" && + ensureBridgeWorker(runDir) + ) { + result.status = "accepted"; + } writeJsonRpc({ id, jsonrpc: "2.0", @@ -274,11 +244,6 @@ const handleToolCall = async ( return; } - if (source === "claude" && name === "reply") { - await handleReplyTool(id, runDir, source, args); - return; - } - if (name !== "send_to_agent") { writeError(id, MCP_INVALID_PARAMS, `Unknown tool: ${name}`); return; @@ -345,25 +310,6 @@ const handleBridgeRequest = async ( jsonrpc: "2.0", result: { tools: [ - ...(source === "claude" - ? [ - { - annotations: MUTATING_TOOL_ANNOTATIONS, - description: - "Reply to the active Codex channel conversation and deliver the response back to Codex.", - inputSchema: { - additionalProperties: false, - properties: { - chat_id: { type: "string" }, - text: { type: "string" }, - }, - required: ["chat_id", "text"], - type: "object", - }, - name: "reply", - }, - ] - : []), { annotations: MUTATING_TOOL_ANNOTATIONS, description: "Send an explicit message to the paired agent.", @@ -596,5 +542,6 @@ export const bridgeInternals = { commandDeps: bridgeRuntimeCommandDeps, drainCodexTmuxMessages, deliverCodexBridgeMessage, + ensureBridgeWorker, readBridgeEvents, }; diff --git a/src/loop/codex-app-server.ts b/src/loop/codex-app-server.ts index 3cf4333..b4abc07 100644 --- a/src/loop/codex-app-server.ts +++ b/src/loop/codex-app-server.ts @@ -34,6 +34,12 @@ interface PendingRequest { timeout: ReturnType; } +interface PendingTurnCompletion { + reject: (error: Error) => void; + resolve: () => void; + timeout: ReturnType; +} + interface TurnState { combined: string; lastChunk: string; @@ -61,8 +67,9 @@ export const DEFAULT_CODEX_TRANSPORT: TransportMode = const METHOD_INITIALIZE = "initialize"; const METHOD_INITIALIZED = "initialized"; const METHOD_THREAD_START = "thread/start"; +const METHOD_THREAD_READ = "thread/read"; const METHOD_TURN_START = "turn/start"; -const METHOD_TURN_STARTED = "turn/started"; +const METHOD_TURN_STEER = "turn/steer"; const METHOD_TURN_COMPLETED = "turn/completed"; const METHOD_ERROR = "error"; const METHOD_ITEM_COMPLETED = "item/completed"; @@ -80,11 +87,8 @@ const METHODS_TRIGGERING_FALLBACK = new Set([ METHOD_TURN_START, ]); const BRIDGE_OPT_OUT_NOTIFICATION_METHODS = [ - METHOD_ERROR, METHOD_ITEM_COMPLETED, METHOD_ITEM_DELTA, - METHOD_TURN_COMPLETED, - METHOD_TURN_STARTED, ] as const; type SpawnFn = (...args: Parameters) => ReturnType; @@ -208,6 +212,18 @@ const parseErrorText = (value: unknown): string | undefined => { ); }; +const isBusyTurnError = (value: unknown): boolean => { + const message = parseErrorText(value)?.toLowerCase(); + return !!( + message && + (message.includes("active turn") || + message.includes("already active") || + message.includes("busy") || + message.includes("in progress") || + message.includes("turn still active")) + ); +}; + const extractTurnId = (value: unknown): string | undefined => { const record = asRecord(value); const fromValue = asString(record.turnId) ?? asString(record.turn_id); @@ -235,6 +251,20 @@ const extractTurnFromStart = (value: unknown): { id?: string } => { return { id: asString(turn.id) || asString(record.id) }; }; +const extractActiveTurnId = (value: unknown): string | undefined => { + const thread = asRecord(asRecord(value).thread); + if (!Array.isArray(thread.turns)) { + return undefined; + } + for (let index = thread.turns.length - 1; index >= 0; index -= 1) { + const turn = asRecord(thread.turns[index]); + if (asString(turn.status) === "inProgress") { + return asString(turn.id); + } + } + return undefined; +}; + const buildInput = (prompt: string): Record[] => [ { type: "text", @@ -313,17 +343,126 @@ const handleWsServerRequest = ( }); }; +const extractNotificationTurnId = ( + params: Record +): string | undefined => + extractTurnId(params) || asString(asRecord(params.turn).id); + +const settlePendingTurn = ( + pendingTurns: Map, + turnId: string, + error?: Error +): void => { + const turn = pendingTurns.get(turnId); + if (!turn) { + return; + } + clearTimeout(turn.timeout); + pendingTurns.delete(turnId); + if (error) { + turn.reject(error); + return; + } + turn.resolve(); +}; + +const turnNotificationError = ( + method: string, + turnId: string, + params: Record +): Error | undefined => { + if (method === METHOD_ERROR) { + return new Error(parseErrorText(params) || `turn ${turnId} failed`); + } + if (method !== METHOD_TURN_COMPLETED) { + return undefined; + } + const turn = asRecord(params.turn); + const status = asString(params.status) ?? asString(turn.status); + if (status !== "failed") { + return undefined; + } + return new Error( + parseErrorText(params) || parseErrorText(turn) || `turn ${turnId} failed` + ); +}; + +const handleRemoteTurnNotification = ( + pendingTurns: Map, + method: string, + params: Record +): void => { + const turnId = extractNotificationTurnId(params); + if (!turnId) { + return; + } + if (method !== METHOD_ERROR && method !== METHOD_TURN_COMPLETED) { + return; + } + settlePendingTurn( + pendingTurns, + turnId, + turnNotificationError(method, turnId, params) + ); +}; + +const handleRemoteClientResponse = ( + pending: Map, + requestId: string, + frame: JsonFrame +): void => { + const request = pending.get(requestId); + if (!request) { + return; + } + clearTimeout(request.timeout); + pending.delete(requestId); + if (frame.error !== undefined) { + request.reject( + new Error( + parseErrorText(frame.error) || + `codex app-server request "${request.method}" failed` + ) + ); + return; + } + request.resolve(frame.result); +}; + +const handleRemoteClientFrame = ( + ws: import("./ws-client").WsClient, + pending: Map, + pendingTurns: Map, + frame: JsonFrame +): void => { + const frameId = asRequestId(frame.id); + const method = asString(frame.method); + if (frameId && method) { + handleWsServerRequest(ws, frameId, method); + return; + } + if (frameId) { + handleRemoteClientResponse(pending, frameId, frame); + return; + } + if (method) { + handleRemoteTurnNotification(pendingTurns, method, asRecord(frame.params)); + } +}; + const createRemoteAppServerClient = async ( url: string ): Promise<{ close(): void; sendNotification(method: string, params?: Record): void; sendRequest(method: string, params: unknown): Promise; + waitForTurnCompletion(turnId: string): Promise; }> => { const ws = await connectWsFn(url); let closed = false; let requestId = 1; const pending = new Map(); + const pendingTurns = new Map(); const failAll = (error: Error): void => { for (const request of pending.values()) { @@ -331,6 +470,11 @@ const createRemoteAppServerClient = async ( request.reject(error); } pending.clear(); + for (const turn of pendingTurns.values()) { + clearTimeout(turn.timeout); + turn.reject(error); + } + pendingTurns.clear(); }; ws.onmessage = (data) => { @@ -339,31 +483,7 @@ const createRemoteAppServerClient = async ( if (!frame) { continue; } - const frameId = asRequestId(frame.id); - const method = asString(frame.method); - if (frameId && method) { - handleWsServerRequest(ws, frameId, method); - continue; - } - if (!frameId) { - continue; - } - const request = pending.get(frameId); - if (!request) { - continue; - } - clearTimeout(request.timeout); - pending.delete(frameId); - if (frame.error !== undefined) { - request.reject( - new Error( - parseErrorText(frame.error) || - `codex app-server request "${request.method}" failed` - ) - ); - continue; - } - request.resolve(frame.result); + handleRemoteClientFrame(ws, pending, pendingTurns, frame); } }; @@ -417,6 +537,20 @@ const createRemoteAppServerClient = async ( pending.set(nextRequestId, { method, reject, resolve, timeout }); }); }, + waitForTurnCompletion: (turnId: string): Promise => { + if (closed) { + return Promise.reject( + new Error("codex app-server remote connection closed") + ); + } + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + pendingTurns.delete(turnId); + reject(new Error(`codex app-server turn ${turnId} timed out`)); + }, AGENT_TURN_TIMEOUT_MS); + pendingTurns.set(turnId, { reject, resolve, timeout }); + }); + }, }; }; @@ -1280,11 +1414,51 @@ export const injectCodexMessage = async ( }, }); client.sendNotification(METHOD_INITIALIZED); - await client.sendRequest(METHOD_TURN_START, { - input: buildInput(prompt), - threadId, - }); - return true; + + for (let attempt = 0; attempt < 2; attempt += 1) { + let activeTurnId: string | undefined; + try { + const thread = await client.sendRequest(METHOD_THREAD_READ, { + includeTurns: true, + threadId, + }); + activeTurnId = extractActiveTurnId(thread); + } catch { + activeTurnId = undefined; + } + + if (activeTurnId) { + try { + await client.sendRequest(METHOD_TURN_STEER, { + expectedTurnId: activeTurnId, + input: buildInput(prompt), + threadId, + }); + return true; + } catch { + // The active turn may have already completed. Fall through to start. + } + } + + try { + const response = await client.sendRequest(METHOD_TURN_START, { + input: buildInput(prompt), + threadId, + }); + const turn = extractTurnFromStart(response); + if (!turn.id) { + throw new Error( + "codex app-server returned turn/start without turn id" + ); + } + return true; + } catch (error) { + if (!(attempt === 0 && isBusyTurnError(error))) { + throw error; + } + } + } + return false; } finally { client.close(); } diff --git a/src/loop/codex-tmux-proxy.ts b/src/loop/codex-tmux-proxy.ts index d4ada35..bb6c8e0 100644 --- a/src/loop/codex-tmux-proxy.ts +++ b/src/loop/codex-tmux-proxy.ts @@ -5,10 +5,16 @@ import { acknowledgeBridgeDelivery, readNextPendingBridgeMessageForTarget, } from "./bridge-dispatch"; +import { formatCodexBridgeMessage } from "./bridge-message-format"; import { clearStaleTmuxBridgeState } from "./bridge-runtime"; import type { BridgeMessage } from "./bridge-store"; import { findFreePort } from "./ports"; -import { isActiveRunState, readRunManifest } from "./run-state"; +import { + isActiveRunState, + readRunManifest, + touchRunManifest, + updateRunManifest, +} from "./run-state"; import { connectWs, type WsClient } from "./ws-client"; const CODEX_PROXY_BASE_PORT = 4600; @@ -23,6 +29,7 @@ const THREAD_START_METHOD = "thread/start"; const TURN_COMPLETED_METHOD = "turn/completed"; const TURN_STARTED_METHOD = "turn/started"; const TURN_START_METHOD = "turn/start"; +const TURN_STEER_METHOD = "turn/steer"; const USER_INPUT_TEXT_ELEMENTS = "text_elements"; export const CODEX_TMUX_PROXY_SUBCOMMAND = "__codex-tmux-proxy"; @@ -46,6 +53,11 @@ interface ProxyRoute { threadId?: string; } +interface BridgeRequest { + message: BridgeMessage; + method: string; +} + type StopReason = "dead-tmux" | "inactive-run"; const isRecord = (value: unknown): value is Record => @@ -112,6 +124,81 @@ const extractThreadId = (value: unknown): string | undefined => { return asString(thread?.id) ?? asString(value.threadId); }; +const latestActiveTurnId = (turnIds: Set): string | undefined => { + let latest: string | undefined; + for (const turnId of turnIds) { + latest = turnId; + } + return latest; +}; + +const shouldPauseBridgeDrain = ( + turnInProgress: boolean, + activeTurnId: string | undefined, + pendingBridgeRequests: number +): boolean => { + if (pendingBridgeRequests > 0) { + return true; + } + return turnInProgress && !activeTurnId; +}; + +const persistCodexThreadId = (runDir: string, threadId: string): void => { + if (!threadId) { + return; + } + updateRunManifest(join(runDir, "manifest.json"), (manifest) => { + if (!manifest || manifest.codexThreadId === threadId) { + return manifest; + } + return touchRunManifest( + { + ...manifest, + codexThreadId: threadId, + }, + new Date().toISOString() + ); + }); +}; + +const buildBridgeInjectionFrame = ( + requestId: number, + threadId: string, + message: BridgeMessage, + activeTurnId?: string +): JsonFrame => { + if (activeTurnId) { + return { + id: requestId, + method: TURN_STEER_METHOD, + params: { + expectedTurnId: activeTurnId, + input: buildInput( + formatCodexBridgeMessage(message.source, message.message) + ), + threadId, + }, + }; + } + return { + id: requestId, + method: TURN_START_METHOD, + params: { + input: buildInput( + formatCodexBridgeMessage(message.source, message.message) + ), + threadId, + }, + }; +}; + +const noteStartedTurn = (turnIds: Set, value: unknown): void => { + const turnId = extractTurnId(value); + if (turnId) { + turnIds.add(turnId); + } +}; + const isTmuxSessionAlive = (session: string): boolean => { if (!session) { return false; @@ -156,7 +243,7 @@ const patchInitializeError = (frame: JsonFrame): JsonFrame => { class CodexTmuxProxy { private readonly activeTurnIds = new Set(); - private readonly bridgeRequests = new Map(); + private readonly bridgeRequests = new Map(); private readonly port: number; private readonly remoteUrl: string; private readonly routes = new Map(); @@ -280,6 +367,14 @@ class CodexTmuxProxy { this.upstream?.send(`${JSON.stringify(frame)}\n`); } + private rememberThreadId(threadId: string | undefined): void { + if (!threadId || threadId === this.threadId) { + return; + } + this.threadId = threadId; + persistCodexThreadId(this.runDir, threadId); + } + private handleTuiFrame(raw: string): void { const frame = asJsonFrame(raw); if (!(frame?.method && frame.id !== undefined)) { @@ -375,27 +470,38 @@ class CodexTmuxProxy { (route.method === THREAD_START_METHOD || route.method === THREAD_RESUME_METHOD) ) { - this.threadId = - extractThreadId(frame.result) ?? route.threadId ?? this.threadId; + this.rememberThreadId( + extractThreadId(frame.result) ?? route.threadId ?? this.threadId + ); return; } if (!frame.error && route.method === TURN_START_METHOD) { - this.threadId = route.threadId ?? this.threadId; + this.rememberThreadId(route.threadId ?? this.threadId); + noteStartedTurn(this.activeTurnIds, frame.result); + this.turnInProgress = true; } } private handleBridgeResponse(id: number, frame: JsonFrame): void { - const message = this.bridgeRequests.get(id); - if (!message) { + const request = this.bridgeRequests.get(id); + if (!request) { return; } this.bridgeRequests.delete(id); if (frame.error) { - this.turnInProgress = false; + this.turnInProgress = this.activeTurnIds.size > 0; return; } - acknowledgeBridgeDelivery(this.runDir, message, "sent to codex tmux proxy"); + if (request.method === TURN_START_METHOD) { + noteStartedTurn(this.activeTurnIds, frame.result); + this.turnInProgress = true; + } + acknowledgeBridgeDelivery( + this.runDir, + request.message, + "sent to codex tmux proxy" + ); } private handleNotification(frame: JsonFrame): void { @@ -458,7 +564,14 @@ class CodexTmuxProxy { ) { return; } - if (this.turnInProgress || this.bridgeRequests.size > 0) { + const activeTurnId = latestActiveTurnId(this.activeTurnIds); + if ( + shouldPauseBridgeDrain( + this.turnInProgress, + activeTurnId, + this.bridgeRequests.size + ) + ) { return; } const message = readNextPendingBridgeMessageForTarget(this.runDir, "codex"); @@ -467,16 +580,18 @@ class CodexTmuxProxy { } const requestId = this.nextBridgeRequestId--; - this.bridgeRequests.set(requestId, message); - this.turnInProgress = true; - this.forwardToUpstream({ - id: requestId, - method: TURN_START_METHOD, - params: { - input: buildInput(message.message), - threadId: this.threadId, - }, + const frame = buildBridgeInjectionFrame( + requestId, + this.threadId, + message, + activeTurnId + ); + this.bridgeRequests.set(requestId, { + message, + method: frame.method ?? TURN_START_METHOD, }); + this.turnInProgress = true; + this.forwardToUpstream(frame); } } @@ -516,7 +631,12 @@ export const runCodexTmuxProxy = async ( }; export const codexTmuxProxyInternals = { + buildBridgeInjectionFrame, + latestActiveTurnId, buildProxyUrl, + noteStartedTurn, patchInitializeError, + persistCodexThreadId, + shouldPauseBridgeDrain, shouldStopForTmuxSession, }; diff --git a/src/loop/paired-loop.ts b/src/loop/paired-loop.ts index 6c10751..0c5a414 100644 --- a/src/loop/paired-loop.ts +++ b/src/loop/paired-loop.ts @@ -3,6 +3,7 @@ import { acknowledgeBridgeDelivery, readNextPendingBridgeMessage, } from "./bridge-dispatch"; +import { formatCodexBridgeMessage } from "./bridge-message-format"; import { getLastClaudeSessionId } from "./claude-sdk-server"; import { getLastCodexThreadId } from "./codex-app-server"; import { @@ -151,13 +152,21 @@ const reviewBridgePrompt = ( .join("\n\n"); const forwardBridgePrompt = (source: Agent, message: string): string => - [ - `Message from ${capitalize(source)} via the loop bridge:`, - message.trim(), - "Treat this as direct agent-to-agent coordination. Do not reply to the human.", - 'Reply to the other agent with "send_to_agent" only when you have something useful for them to act on.', - "Do not acknowledge receipt without new information.", - ].join("\n\n"); + (source === "claude" + ? [ + formatCodexBridgeMessage(source, message), + "Treat this as direct agent-to-agent coordination. Do not reply to the human.", + 'Send a message to the other agent with "send_to_agent" only when you have something useful for them to act on.', + "Do not acknowledge receipt without new information.", + ] + : [ + `Message from ${capitalize(source)} via the loop bridge:`, + message.trim(), + "Treat this as direct agent-to-agent coordination. Do not reply to the human.", + 'Send a message to the other agent with "send_to_agent" only when you have something useful for them to act on.', + "Do not acknowledge receipt without new information.", + ] + ).join("\n\n"); const updateIds = (state: PairedState): void => { const next = touchRunManifest( diff --git a/src/loop/paired-options.ts b/src/loop/paired-options.ts index 15d5579..bb8159e 100644 --- a/src/loop/paired-options.ts +++ b/src/loop/paired-options.ts @@ -1,6 +1,8 @@ import { buildCodexBridgeConfigArgs, + claudeChannelServerName, ensureClaudeBridgeConfig, + resolveClaudeChannelServerName, } from "./bridge-config"; import { createRunManifest, @@ -39,6 +41,16 @@ export const canResumePairedManifest = (manifest?: RunManifest): boolean => { return manifest ? isActiveRunState(manifest.state) : false; }; +const resolveClaudeBridgeServer = ( + storage: RunStorage, + manifest?: RunManifest +): string => + resolveClaudeChannelServerName( + storage.runId, + storage.repoId, + manifest?.claudeChannelServer + ); + const resolveRequestedRunState = ( opts: Options, cwd: string @@ -116,6 +128,7 @@ export const resolvePreparedRunState = ( } const manifest = createRunManifest({ + claudeChannelServer: claudeChannelServerName(storage.runId, storage.repoId), claudeSessionId: "", codexThreadId: "", cwd, @@ -139,7 +152,11 @@ export const applyPairedOptions = ( manifest: RunManifest | undefined, allowRawSessionFallback = false ): void => { - opts.claudeMcpConfigPath = ensureClaudeBridgeConfig(storage.runDir, "claude"); + opts.claudeMcpConfigPath = ensureClaudeBridgeConfig( + storage.runDir, + "claude", + resolveClaudeBridgeServer(storage, manifest) + ); opts.claudePersistentSession = true; opts.codexMcpConfigArgs = buildCodexBridgeConfigArgs(storage.runDir, "codex"); opts.pairedMode = true; @@ -176,6 +193,7 @@ export const preparePairedRun = ( ? touchRunManifest( { ...existing, + claudeChannelServer: resolveClaudeBridgeServer(storage, existing), claudeSessionId: resumable?.claudeSessionId || opts.pairedSessionIds?.claude || "", codexThreadId: @@ -190,6 +208,10 @@ export const preparePairedRun = ( new Date().toISOString() ) : createRunManifest({ + claudeChannelServer: claudeChannelServerName( + storage.runId, + storage.repoId + ), claudeSessionId: opts.pairedSessionIds?.claude ?? "", codexThreadId: opts.pairedSessionIds?.codex ?? "", cwd, diff --git a/src/loop/run-state.ts b/src/loop/run-state.ts index 008e7d2..0fbcf01 100644 --- a/src/loop/run-state.ts +++ b/src/loop/run-state.ts @@ -54,6 +54,7 @@ export interface RunStorage { } export interface RunManifest { + claudeChannelServer?: string; claudeSessionId: string; codexRemoteUrl?: string; codexThreadId: string; @@ -122,6 +123,7 @@ interface RepoIdDeps { } interface RunManifestInput { + claudeChannelServer?: string; claudeSessionId?: string; codexRemoteUrl?: string; codexThreadId?: string; @@ -444,6 +446,9 @@ export const createRunManifest = ( parseRunLifecycleState(undefined, input.status) ?? "submitted"; return { + ...(input.claudeChannelServer + ? { claudeChannelServer: input.claudeChannelServer } + : {}), claudeSessionId: input.claudeSessionId ?? "", ...(input.codexRemoteUrl ? { codexRemoteUrl: input.codexRemoteUrl } : {}), codexThreadId: input.codexThreadId ?? "", @@ -524,6 +529,14 @@ export const readRunManifest = ( } return { + ...(firstString(parsed, ["claudeChannelServer", "claude_channel_server"]) + ? { + claudeChannelServer: firstString(parsed, [ + "claudeChannelServer", + "claude_channel_server", + ]), + } + : {}), claudeSessionId: firstString(parsed, ["claudeSessionId", "claude_session_id"]) ?? "", ...(firstString(parsed, ["codexRemoteUrl", "codex_remote_url"]) diff --git a/src/loop/tmux.ts b/src/loop/tmux.ts index fd81332..a456be0 100644 --- a/src/loop/tmux.ts +++ b/src/loop/tmux.ts @@ -2,9 +2,17 @@ import { randomUUID } from "node:crypto"; import { existsSync } from "node:fs"; import { basename, dirname, join } from "node:path"; import { spawn, spawnSync } from "bun"; -import { BRIDGE_SERVER, BRIDGE_SUBCOMMAND } from "./bridge-constants"; import { - claudeTmuxReplyGuidance, + registerClaudeChannelServer, + removeClaudeChannelServer, +} from "./bridge-claude-registration"; +import { + buildClaudeChannelServerConfig, + claudeChannelServerName, + legacyClaudeChannelServerName, + resolveClaudeChannelServerName, +} from "./bridge-config"; +import { receiveMessagesStuckGuidance, sendProactiveCodexGuidance, sendToClaudeGuidance, @@ -51,11 +59,9 @@ const CLAUDE_BYPASS_ACCEPT = "Yes, I accept"; const CLAUDE_EXIT_OPTION = "No, exit"; const CLAUDE_DEV_CHANNELS_PROMPT = "WARNING: Loading development channels"; const CLAUDE_DEV_CHANNELS_CONFIRM = "I am using this for local development"; -const CLAUDE_CHANNEL_SCOPE = "local"; const CLAUDE_PROMPT_MAX_POLLS = 8; const CLAUDE_PROMPT_POLL_DELAY_MS = 250; const CLAUDE_PROMPT_SETTLE_POLLS = 2; -const MCP_ALREADY_EXISTS_RE = /already exists/i; interface SpawnResult { exitCode: number; @@ -149,13 +155,14 @@ const appendProofPrompt = (parts: string[], proof: string): void => { parts.push(`Proof requirements:\n${trimmed}`); }; -const pairedBridgeGuidance = (agent: Agent, runId: string): string => { - const serverName = buildClaudeChannelServerName(runId); - +const pairedBridgeGuidance = ( + agent: Agent, + _runId: string, + serverName: string +): string => { if (agent === "claude") { return [ `Your bridge MCP server is "${serverName}". All bridge tool calls must use the mcp__${serverName}__ prefix.`, - claudeTmuxReplyGuidance, sendProactiveCodexGuidance(), receiveMessagesStuckGuidance, ].join("\n"); @@ -188,7 +195,8 @@ const pairedWorkflowGuidance = (opts: Options, agent: Agent): string => { const buildPrimaryPrompt = ( task: string, opts: Options, - runId: string + runId: string, + serverName: string ): string => { const peer = capitalize(peerAgent(opts.agent)); const parts = [ @@ -198,7 +206,7 @@ const buildPrimaryPrompt = ( ]; appendProofPrompt(parts, opts.proof); parts.push(SPAWN_TEAM_WITH_WORKTREE_ISOLATION); - parts.push(pairedBridgeGuidance(opts.agent, runId)); + parts.push(pairedBridgeGuidance(opts.agent, runId, serverName)); parts.push(pairedWorkflowGuidance(opts, opts.agent)); parts.push( `${peer} should send a short ready message. Wait briefly if it arrives, then inspect the repo and start. Ask ${peer} for review once you have concrete work or a specific question.` @@ -210,7 +218,8 @@ const buildPeerPrompt = ( task: string, opts: Options, agent: Agent, - runId: string + runId: string, + serverName: string ): string => { const primary = capitalize(opts.agent); const parts = [ @@ -219,7 +228,7 @@ const buildPeerPrompt = ( `You are ${capitalize(agent)}. Do not start implementing or verifying this task on your own.`, ]; appendProofPrompt(parts, opts.proof); - parts.push(pairedBridgeGuidance(agent, runId)); + parts.push(pairedBridgeGuidance(agent, runId, serverName)); parts.push(pairedWorkflowGuidance(opts, agent)); parts.push( `Wait for ${primary} to send you a targeted request or review ask.` @@ -229,7 +238,8 @@ const buildPeerPrompt = ( const buildInteractivePrimaryPrompt = ( opts: Options, - runId: string + runId: string, + serverName: string ): string => { const peer = capitalize(peerAgent(opts.agent)); const parts = [ @@ -241,7 +251,7 @@ const buildInteractivePrimaryPrompt = ( parts.push( `${SPAWN_TEAM_WITH_WORKTREE_ISOLATION} Apply that once the human gives you a concrete task.` ); - parts.push(pairedBridgeGuidance(opts.agent, runId)); + parts.push(pairedBridgeGuidance(opts.agent, runId, serverName)); parts.push(pairedWorkflowGuidance(opts, opts.agent)); parts.push( `If the human asks for plan mode, write PLAN.md first, ask ${peer} for a plan review, iterate on PLAN.md, then ask the human to review the plan before implementing.` @@ -255,7 +265,8 @@ const buildInteractivePrimaryPrompt = ( const buildInteractivePeerPrompt = ( opts: Options, agent: Agent, - runId: string + runId: string, + serverName: string ): string => { const primary = capitalize(opts.agent); const parts = [ @@ -264,7 +275,7 @@ const buildInteractivePeerPrompt = ( `You are ${capitalize(agent)}. Stay idle until ${primary} sends a specific request or the human clearly assigns you separate work.`, ]; appendProofPrompt(parts, opts.proof); - parts.push(pairedBridgeGuidance(agent, runId)); + parts.push(pairedBridgeGuidance(agent, runId, serverName)); parts.push(pairedWorkflowGuidance(opts, agent)); parts.push( `If ${primary} asks for a plan review, review PLAN.md only, suggest concrete fixes, and wait for the next request.` @@ -278,17 +289,18 @@ const buildInteractivePeerPrompt = ( const buildLaunchPrompt = ( launch: PairedTmuxLaunch, agent: Agent, - runId: string + runId: string, + serverName: string ): string => { const task = launch.task?.trim(); if (!task) { return launch.opts.agent === agent - ? buildInteractivePrimaryPrompt(launch.opts, runId) - : buildInteractivePeerPrompt(launch.opts, agent, runId); + ? buildInteractivePrimaryPrompt(launch.opts, runId, serverName) + : buildInteractivePeerPrompt(launch.opts, agent, runId, serverName); } return launch.opts.agent === agent - ? buildPrimaryPrompt(task, launch.opts, runId) - : buildPeerPrompt(task, launch.opts, agent, runId); + ? buildPrimaryPrompt(task, launch.opts, runId, serverName) + : buildPeerPrompt(task, launch.opts, agent, runId, serverName); }; const resolveTmuxModel = (agent: Agent, opts: Options): string => { @@ -302,21 +314,6 @@ const resolveTmuxModel = (agent: Agent, opts: Options): string => { : (opts.claudeReviewerModel ?? DEFAULT_CLAUDE_MODEL); }; -const buildClaudeChannelServerName = (runId: string): string => - `${BRIDGE_SERVER}-${sanitizeBase(runId)}`; - -const buildClaudeChannelServerConfig = ( - launchArgv: string[], - runDir: string -): string => { - const [command, ...baseArgs] = launchArgv; - return JSON.stringify({ - args: [...baseArgs, BRIDGE_SUBCOMMAND, runDir, "claude"], - command, - type: "stdio", - }); -}; - const buildClaudeCommand = ( sessionId: string, model: string, @@ -647,25 +644,35 @@ const updatePairedManifest = ( ); }; -const registerClaudeChannelServer = ( +const registerClaudeChannelServerForRun = ( deps: TmuxDeps, serverName: string, runDir: string ): void => { - const result = deps.spawn([ - "claude", - "mcp", - "add-json", - "--scope", - CLAUDE_CHANNEL_SCOPE, + registerClaudeChannelServer(deps.launchArgv, serverName, runDir, (args) => + deps.spawn(args) + ); +}; + +const cleanupFailedPairedSessionStart = ( + deps: TmuxDeps, + session: string, + serverName: string, + runId: string +): void => { + try { + if (sessionExists(session, deps.spawn)) { + deps.spawn(["tmux", "kill-session", "-t", session]); + } + } catch { + // Best-effort cleanup after a failed paired startup. + } + for (const name of new Set([ serverName, - buildClaudeChannelServerConfig(deps.launchArgv, runDir), - ]); - if (result.exitCode === 0 || MCP_ALREADY_EXISTS_RE.test(result.stderr)) { - return; + legacyClaudeChannelServerName(runId), + ])) { + removeClaudeChannelServer(name, (args) => deps.spawn(args), deps.log); } - const suffix = result.stderr ? `: ${result.stderr}` : "."; - throw new Error(`[loop] failed to register Claude channel server${suffix}`); }; const ensurePairedSessionIds = async ( @@ -835,70 +842,97 @@ const startPairedSession = async ( codexRemoteUrl, codexThreadId ); - const claudeChannelServer = buildClaudeChannelServerName(storage.runId); - registerClaudeChannelServer(deps, claudeChannelServer, storage.runDir); - const env = [`${RUN_BASE_ENV}=${runBase}`, `${RUN_ID_ENV}=${storage.runId}`]; - const claudePrompt = buildLaunchPrompt(launch, "claude", storage.runId); - const codexPrompt = buildLaunchPrompt(launch, "codex", storage.runId); - const claudeCommand = buildShellCommand([ - "env", - ...env, - ...buildClaudeCommand( - claudeSessionId, - resolveTmuxModel("claude", launch.opts), - claudeChannelServer, - hadClaudeSession, - hadClaudeSession ? undefined : claudePrompt - ), - ]); - const codexCommand = buildShellCommand([ - "env", - ...env, - ...buildCodexCommand( - codexProxyUrl, - resolveTmuxModel("codex", launch.opts), - launch.opts.codexMcpConfigArgs ?? [], - hadCodexThread ? undefined : codexPrompt - ), - ]); + const claudeChannelServer = resolveClaudeChannelServerName( + storage.runId, + storage.repoId, + manifest.claudeChannelServer + ); + registerClaudeChannelServerForRun(deps, claudeChannelServer, storage.runDir); + try { + const env = [ + `${RUN_BASE_ENV}=${runBase}`, + `${RUN_ID_ENV}=${storage.runId}`, + ]; + const claudePrompt = buildLaunchPrompt( + launch, + "claude", + storage.runId, + claudeChannelServer + ); + const codexPrompt = buildLaunchPrompt( + launch, + "codex", + storage.runId, + claudeChannelServer + ); + const claudeCommand = buildShellCommand([ + "env", + ...env, + ...buildClaudeCommand( + claudeSessionId, + resolveTmuxModel("claude", launch.opts), + claudeChannelServer, + hadClaudeSession, + hadClaudeSession ? undefined : claudePrompt + ), + ]); + const codexCommand = buildShellCommand([ + "env", + ...env, + ...buildCodexCommand( + codexProxyUrl, + resolveTmuxModel("codex", launch.opts), + launch.opts.codexMcpConfigArgs ?? [], + hadCodexThread ? undefined : codexPrompt + ), + ]); - runTmuxCommand(deps, [ - "tmux", - "new-session", - "-d", - ...buildSessionSizeArgs(deps), - "-s", - session, - "-c", - deps.cwd, - claudeCommand, - ]); - runTmuxCommand( - deps, - [ + runTmuxCommand(deps, [ "tmux", - "split-window", - "-h", - "-t", - `${session}:0`, + "new-session", + "-d", + ...buildSessionSizeArgs(deps), + "-s", + session, "-c", deps.cwd, - codexCommand, - ], - "Failed to split tmux window" - ); - deps.spawn([ - "tmux", - "select-layout", - "-t", - `${session}:0`, - "even-horizontal", - ]); - await unblockClaudePane(session, deps); - const primaryPane = - launch.opts.agent === "claude" ? `${session}:0.0` : `${session}:0.1`; - deps.spawn(["tmux", "select-pane", "-t", primaryPane]); - return session; + claudeCommand, + ]); + runTmuxCommand( + deps, + [ + "tmux", + "split-window", + "-h", + "-t", + `${session}:0`, + "-c", + deps.cwd, + codexCommand, + ], + "Failed to split tmux window" + ); + deps.spawn([ + "tmux", + "select-layout", + "-t", + `${session}:0`, + "even-horizontal", + ]); + await unblockClaudePane(session, deps); + const primaryPane = + launch.opts.agent === "claude" ? `${session}:0.0` : `${session}:0.1`; + deps.spawn(["tmux", "select-pane", "-t", primaryPane]); + return session; + } catch (error: unknown) { + cleanupFailedPairedSessionStart( + deps, + session, + claudeChannelServer, + storage.runId + ); + throw error; + } }; const startRequestedSession = ( @@ -1151,7 +1185,7 @@ export const runInTmux = async ( export const tmuxInternals = { buildClaudeCommand, buildClaudeChannelServerConfig, - buildClaudeChannelServerName, + buildClaudeChannelServerName: claudeChannelServerName, buildCodexCommand, buildInteractivePeerPrompt, buildInteractivePrimaryPrompt, diff --git a/tests/loop/bridge-config.test.ts b/tests/loop/bridge-config.test.ts new file mode 100644 index 0000000..0706834 --- /dev/null +++ b/tests/loop/bridge-config.test.ts @@ -0,0 +1,23 @@ +import { expect, test } from "bun:test"; +import { + claudeChannelServerName, + legacyClaudeChannelServerName, +} from "../../src/loop/bridge-config"; + +test("claudeChannelServerName drops the repo hash suffix from storage ids", () => { + expect(claudeChannelServerName("55", "loop-0d5b6b77c881")).toBe( + "loop-bridge-loop-55" + ); +}); + +test("claudeChannelServerName preserves readable repo ids", () => { + expect(claudeChannelServerName("55", "repo-123")).toBe( + "loop-bridge-repo-123-55" + ); +}); + +test("claudeChannelServerName falls back to the legacy run-scoped name", () => { + expect(claudeChannelServerName("55")).toBe( + legacyClaudeChannelServerName("55") + ); +}); diff --git a/tests/loop/bridge.test.ts b/tests/loop/bridge.test.ts index 21c7a6f..c26ad88 100644 --- a/tests/loop/bridge.test.ts +++ b/tests/loop/bridge.test.ts @@ -28,19 +28,23 @@ const loadBridge = ( const nonce = Date.now(); return Promise.all([ import(`../../src/loop/bridge?test=${nonce}`), + import(`../../src/loop/bridge-claude-registration?test=${nonce}`), import(`../../src/loop/bridge-dispatch?test=${nonce}`), import(`../../src/loop/bridge-config?test=${nonce}`), import(`../../src/loop/bridge-constants?test=${nonce}`), import(`../../src/loop/bridge-runtime?test=${nonce}`), import(`../../src/loop/bridge-store?test=${nonce}`), - ]).then(([bridge, dispatch, config, constants, runtime, store]) => ({ - ...bridge, - ...dispatch, - ...config, - ...constants, - ...runtime, - ...store, - })); + ]).then( + ([bridge, registration, dispatch, config, constants, runtime, store]) => ({ + ...bridge, + ...registration, + ...dispatch, + ...config, + ...constants, + ...runtime, + ...store, + }) + ); }; const makeTempDir = (): string => mkdtempSync(join(tmpdir(), "loop-bridge-")); @@ -64,15 +68,24 @@ const listedTools = (stdout: string): Record[] => { ?.tools ?? [] ); }; +const toolText = (stdout: string, id: number): string => { + const response = parseJsonLines(stdout).find((entry) => entry.id === id); + const content = ( + response?.result as { content?: Array<{ text?: string }> } | undefined + )?.content; + return content?.[0]?.text ?? ""; +}; const runBridgeProcess = async ( runDir: string, source: "claude" | "codex", - frames: string + frames: string, + env?: NodeJS.ProcessEnv ): Promise<{ code: number | null; stderr: string; stdout: string }> => { const cli = join(process.cwd(), "src", "cli.ts"); - const child = spawn("bun", [cli, "__bridge-mcp", runDir, source], { + const child = spawn(process.execPath, [cli, "__bridge-mcp", runDir, source], { cwd: process.cwd(), + env, stdio: ["pipe", "pipe", "pipe"], }); let stdout = ""; @@ -186,6 +199,126 @@ test("markBridgeMessage records acknowledgements and clears pending entries", as rmSync(root, { recursive: true, force: true }); }); +test("readBridgeStatus derives bridge naming and transport fields", async () => { + const bridge = await loadBridge(); + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + claudeSessionId: "claude-session-1", + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + createdAt: "2026-03-27T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "7", + state: "submitted", + status: "running", + updatedAt: "2026-03-27T10:00:00.000Z", + })}\n`, + "utf8" + ); + + expect(bridge.readBridgeStatus(runDir)).toMatchObject({ + bridgeServer: bridge.BRIDGE_SERVER, + claudeBridgeMode: "mcp-config", + claudeChannelServer: bridge.claudeChannelServerName("7", "repo-123"), + claudeSessionId: "claude-session-1", + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + hasCodexRemote: true, + hasTmuxSession: false, + pending: { claude: 0, codex: 0 }, + runId: "7", + state: "submitted", + status: "running", + tmuxSession: "", + }); + + rmSync(root, { recursive: true, force: true }); +}); + +test("readBridgeRuntimeStatus distinguishes live and stale tmux delivery", async () => { + const spawnSync = mock((args: string[]) => { + if (args[0] === "tmux" && args[1] === "has-session") { + const session = args[3]; + return { + exitCode: session === "repo-loop-live" ? 0 : 1, + stderr: Buffer.alloc(0), + stdout: Buffer.alloc(0), + }; + } + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + }); + const bridge = await loadBridge(); + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; + const root = makeTempDir(); + const liveRunDir = join(root, "live"); + const staleRunDir = join(root, "stale"); + mkdirSync(liveRunDir, { recursive: true }); + mkdirSync(staleRunDir, { recursive: true }); + + writeFileSync( + join(liveRunDir, "manifest.json"), + `${JSON.stringify({ + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-live", + createdAt: "2026-03-27T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "8", + state: "running", + status: "running", + tmuxSession: "repo-loop-live", + updatedAt: "2026-03-27T10:00:00.000Z", + })}\n`, + "utf8" + ); + writeFileSync( + join(staleRunDir, "manifest.json"), + `${JSON.stringify({ + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-stale", + createdAt: "2026-03-27T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "9", + state: "running", + status: "running", + tmuxSession: "repo-loop-stale", + updatedAt: "2026-03-27T10:00:00.000Z", + })}\n`, + "utf8" + ); + + expect(bridge.readBridgeRuntimeStatus(liveRunDir)).toMatchObject({ + claudeBridgeMode: "local-registration", + claudeChannelServer: bridge.claudeChannelServerName("8", "repo-123"), + codexDeliveryMode: "app-server", + hasCodexRemote: true, + hasLiveTmuxSession: true, + hasTmuxSession: true, + }); + expect(bridge.readBridgeRuntimeStatus(staleRunDir)).toMatchObject({ + claudeBridgeMode: "local-registration", + claudeChannelServer: bridge.claudeChannelServerName("9", "repo-123"), + codexDeliveryMode: "app-server", + hasCodexRemote: true, + hasLiveTmuxSession: false, + hasTmuxSession: true, + }); + + rmSync(root, { recursive: true, force: true }); +}); + test("readPendingBridgeMessages keeps repeated messages until each is acknowledged", async () => { const bridge = await loadBridge(); const root = makeTempDir(); @@ -245,6 +378,50 @@ test("readPendingBridgeMessages keeps repeated messages until each is acknowledg rmSync(root, { recursive: true, force: true }); }); +test("bridge normalization treats short and legacy Claude prefixes as equivalent", async () => { + const bridge = await loadBridge(); + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + const bridgeFile = bridge.bridgeInternals.bridgePath(runDir); + + writeFileSync( + bridgeFile, + `${[ + { + at: "2026-03-22T10:00:00.000Z", + id: "msg-1", + kind: "message", + message: + "Message from Claude via the loop bridge:\n\nPlease verify the final diff.", + source: "claude", + target: "codex", + }, + { + at: "2026-03-22T10:01:00.000Z", + id: "msg-1", + kind: "delivered", + source: "claude", + target: "codex", + }, + ] + .map((entry) => JSON.stringify(entry)) + .join("\n")}\n`, + "utf8" + ); + + expect( + bridge.blocksBridgeBounce( + runDir, + "codex", + "claude", + "Claude: Please verify the final diff." + ) + ).toBe(true); + + rmSync(root, { recursive: true, force: true }); +}); + test("bridge MCP send_to_agent queues a direct message through the CLI path", async () => { const bridge = await loadBridge(); const root = makeTempDir(); @@ -402,25 +579,10 @@ test("bridge MCP send_to_agent rejects an unknown normalized target", async () = rmSync(root, { recursive: true, force: true }); }); -test("bridge MCP reply accepts the manifest bridge chat_id", async () => { - const bridge = await loadBridge(); +test("bridge MCP send_to_agent rejects targeting the current agent", async () => { const root = makeTempDir(); const runDir = join(root, "run"); mkdirSync(runDir, { recursive: true }); - writeFileSync( - join(runDir, "manifest.json"), - `${JSON.stringify({ - createdAt: "2026-03-27T10:00:00.000Z", - cwd: "/repo", - mode: "paired", - pid: 1234, - repoId: "repo-123", - runId: "7", - status: "running", - updatedAt: "2026-03-27T10:00:00.000Z", - })}\n`, - "utf8" - ); const result = await runBridgeProcess( runDir, @@ -432,63 +594,10 @@ test("bridge MCP reply accepts the manifest bridge chat_id", async () => { method: "tools/call", params: { arguments: { - chat_id: "codex_7", - text: "ship it", - }, - name: "reply", - }, - }), - "\n", - ].join("") - ); - - expect(result.code).toBe(0); - expect(result.stderr).toBe(""); - expect(result.stdout).toContain("queued"); - expect(bridge.readPendingBridgeMessages(runDir)).toEqual([ - expect.objectContaining({ - message: "ship it", - source: "claude", - target: "codex", - }), - ]); - rmSync(root, { recursive: true, force: true }); -}); - -test("bridge MCP reply rejects a mismatched bridge chat_id", async () => { - const bridge = await loadBridge(); - const root = makeTempDir(); - const runDir = join(root, "run"); - mkdirSync(runDir, { recursive: true }); - writeFileSync( - join(runDir, "manifest.json"), - `${JSON.stringify({ - createdAt: "2026-03-27T10:00:00.000Z", - cwd: "/repo", - mode: "paired", - pid: 1234, - repoId: "repo-123", - runId: "7", - status: "running", - updatedAt: "2026-03-27T10:00:00.000Z", - })}\n`, - "utf8" - ); - - const result = await runBridgeProcess( - runDir, - "claude", - [ - encodeFrame({ - id: 1, - jsonrpc: "2.0", - method: "tools/call", - params: { - arguments: { - chat_id: "codex_999", - text: "ship it", + message: "ship it", + target: "claude", }, - name: "reply", + name: "send_to_agent", }, }), "\n", @@ -499,12 +608,11 @@ test("bridge MCP reply rejects a mismatched bridge chat_id", async () => { expect(JSON.parse(result.stdout)).toMatchObject({ error: { code: -32_602, - message: "reply chat_id does not match the active bridge conversation", + message: "send_to_agent cannot target the current agent", }, id: 1, jsonrpc: "2.0", }); - expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); rmSync(root, { recursive: true, force: true }); }); @@ -562,7 +670,7 @@ test("bridge MCP handles standard empty-list and ping requests through the Claud expect(result.stderr).toBe(""); expect(result.stdout).toContain('"claude/channel":{}'); expect(result.stdout).toContain( - '\\"reply\\" tool and pass back the same chat_id' + '\\"send_to_agent\\" with target: \\"codex\\" for Codex-facing messages' ); expect(result.stdout).toContain( "Never answer the human when the inbound message came from Codex" @@ -579,14 +687,6 @@ test("bridge MCP handles standard empty-list and ping requests through the Claud const tools = listedTools(result.stdout); expect(tools).toEqual( expect.arrayContaining([ - expect.objectContaining({ - annotations: { - destructiveHint: false, - openWorldHint: false, - readOnlyHint: false, - }, - name: "reply", - }), expect.objectContaining({ annotations: { destructiveHint: false, @@ -613,6 +713,8 @@ test("bridge MCP handles standard empty-list and ping requests through the Claud }), ]) ); + expect(tools).toHaveLength(3); + expect(tools.some((tool) => tool.name === "reply")).toBe(false); rmSync(root, { recursive: true, force: true }); }); @@ -713,6 +815,180 @@ test("bridge MCP writes line-delimited JSON responses", async () => { rmSync(root, { recursive: true, force: true }); }); +test("bridge runtime status reports app-server-backed config-file delivery", async () => { + const bridge = await loadBridge(); + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + claudeSessionId: "claude-session-1", + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "7", + state: "submitted", + status: "running", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + + expect(bridge.readBridgeRuntimeStatus(runDir)).toMatchObject({ + claudeBridgeMode: "mcp-config", + claudeChannelServer: bridge.claudeChannelServerName("7", "repo-123"), + codexDeliveryMode: "app-server", + hasCodexRemote: true, + hasLiveTmuxSession: false, + }); + + rmSync(root, { recursive: true, force: true }); +}); + +test("bridge runtime status reports live tmux delivery with a run-scoped Claude server", async () => { + const spawnSync = mock((args: string[]) => { + if (args[0] === "tmux" && args[1] === "has-session") { + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + } + return { exitCode: 1, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + }); + const bridge = await loadBridge(); + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "8", + state: "submitted", + status: "running", + tmuxSession: "repo-loop-8", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + + expect(bridge.readBridgeRuntimeStatus(runDir)).toMatchObject({ + claudeBridgeMode: "local-registration", + claudeChannelServer: "loop-bridge-repo-123-8", + codexDeliveryMode: "tmux", + hasCodexRemote: false, + hasLiveTmuxSession: true, + }); + + rmSync(root, { recursive: true, force: true }); +}); + +test("bridge MCP bridge_status includes runtime delivery fields", async () => { + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + claudeSessionId: "claude-session-1", + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "7", + state: "submitted", + status: "running", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + + const result = await runBridgeProcess( + runDir, + "codex", + encodeLine({ + id: 1, + jsonrpc: "2.0", + method: "tools/call", + params: { + arguments: {}, + name: "bridge_status", + }, + }) + ); + + expect(result.code).toBe(0); + expect(result.stderr).toBe(""); + const status = toolText(result.stdout, 1); + expect(status).toContain('"claudeBridgeMode": "mcp-config"'); + expect(status).toContain('"claudeChannelServer": "loop-bridge-repo-123-7"'); + expect(status).toContain('"codexDeliveryMode": "app-server"'); + expect(status).toContain('"hasCodexRemote": true'); + expect(status).toContain('"hasLiveTmuxSession": false'); + + rmSync(root, { recursive: true, force: true }); +}); + +test("bridge MCP bridge_status tolerates a missing tmux binary", async () => { + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + claudeSessionId: "claude-session-1", + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "7", + state: "submitted", + status: "running", + tmuxSession: "repo-loop-7", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + + const result = await runBridgeProcess( + runDir, + "codex", + encodeLine({ + id: 1, + jsonrpc: "2.0", + method: "tools/call", + params: { + arguments: {}, + name: "bridge_status", + }, + }), + { ...process.env, PATH: "/definitely-missing" } + ); + + expect(result.code).toBe(0); + expect(result.stderr).toBe(""); + const status = toolText(result.stdout, 1); + expect(status).toContain('"claudeChannelServer": "loop-bridge-repo-123-7"'); + expect(status).toContain('"codexDeliveryMode": "app-server"'); + expect(status).toContain('"hasLiveTmuxSession": false'); + expect(status).toContain('"hasTmuxSession": true'); + + rmSync(root, { recursive: true, force: true }); +}); + test("bridge MCP receive_messages returns and clears queued inbox items", async () => { const bridge = await loadBridge(); const root = makeTempDir(); @@ -731,24 +1007,74 @@ test("bridge MCP receive_messages returns and clears queued inbox items", async "utf8" ); - const result = await runBridgeProcess( - runDir, - "codex", - encodeLine({ - id: 1, - jsonrpc: "2.0", - method: "tools/call", - params: { - arguments: {}, - name: "receive_messages", - }, - }) - ); + const result = await runBridgeProcess( + runDir, + "codex", + encodeLine({ + id: 1, + jsonrpc: "2.0", + method: "tools/call", + params: { + arguments: {}, + name: "receive_messages", + }, + }) + ); + + expect(result.code).toBe(0); + expect(result.stderr).toBe(""); + expect(result.stdout).toContain("Please review the final result."); + expect(result.stdout).toContain('\\"from\\": \\"claude\\"'); + expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); + expect( + bridge.bridgeInternals + .readBridgeEvents(runDir) + .filter((event) => event.kind === "delivered") + ).toHaveLength(1); + + rmSync(root, { recursive: true, force: true }); +}); + +test("bridge delivers Claude replies directly to Codex when app-server state is available", async () => { + const injectCodexMessage = mock(async () => true); + const bridge = await loadBridge({ injectCodexMessage }); + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "7", + status: "running", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + const message = { + at: "2026-03-23T10:01:00.000Z", + id: "msg-1", + kind: "message" as const, + message: "The files look good to me.", + source: "claude" as const, + target: "codex" as const, + }; + + bridge.bridgeInternals.appendBridgeEvent(runDir, message); + const delivered = await bridge.deliverCodexBridgeMessage(runDir, message); - expect(result.code).toBe(0); - expect(result.stderr).toBe(""); - expect(result.stdout).toContain("Please review the final result."); - expect(result.stdout).toContain('\\"from\\": \\"claude\\"'); + expect(delivered).toBe(true); + expect(injectCodexMessage).toHaveBeenCalledWith( + "ws://127.0.0.1:4500", + "codex-thread-1", + "Claude: The files look good to me." + ); expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); expect( bridge.bridgeInternals @@ -759,9 +1085,16 @@ test("bridge MCP receive_messages returns and clears queued inbox items", async rmSync(root, { recursive: true, force: true }); }); -test("bridge delivers Claude replies directly to Codex when app-server state is available", async () => { +test("bridge prefers Codex app-server delivery even when tmux is live", async () => { const injectCodexMessage = mock(async () => true); + const spawnSync = mock((args: string[]) => { + if (args[0] === "tmux" && args[1] === "has-session") { + return { exitCode: 0 }; + } + return { exitCode: 1 }; + }); const bridge = await loadBridge({ injectCodexMessage }); + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; const root = makeTempDir(); const runDir = join(root, "run"); mkdirSync(runDir, { recursive: true }); @@ -777,15 +1110,16 @@ test("bridge delivers Claude replies directly to Codex when app-server state is repoId: "repo-123", runId: "7", status: "running", + tmuxSession: "repo-loop-7", updatedAt: "2026-03-23T10:00:00.000Z", })}\n`, "utf8" ); const message = { at: "2026-03-23T10:01:00.000Z", - id: "msg-1", + id: "msg-live", kind: "message" as const, - message: "The files look good to me.", + message: "Please steer this into the active turn.", source: "claude" as const, target: "codex" as const, }; @@ -797,7 +1131,7 @@ test("bridge delivers Claude replies directly to Codex when app-server state is expect(injectCodexMessage).toHaveBeenCalledWith( "ws://127.0.0.1:4500", "codex-thread-1", - "The files look good to me." + "Claude: Please steer this into the active turn." ); expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); expect( @@ -858,7 +1192,7 @@ test("bridge falls back to direct Codex delivery when the stored tmux session is expect(injectCodexMessage).toHaveBeenCalledWith( "ws://127.0.0.1:4500", "codex-thread-1", - "Please review the final state." + "Claude: Please review the final state." ); expect(readRunManifest(join(runDir, "manifest.json"))?.tmuxSession).toBe( undefined @@ -873,7 +1207,7 @@ test("bridge falls back to direct Codex delivery when the stored tmux session is "remove", "--scope", "local", - bridge.claudeChannelServerName("8"), + bridge.claudeChannelServerName("8", "repo-123"), ]); expect(removeCall?.[1]).toMatchObject({ stderr: "pipe", @@ -934,34 +1268,32 @@ test("bridge drains pending codex tmux messages through the injected command dep expect(delivered).toBe(true); expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); - expect(spawnSync.mock.calls).toEqual( - expect.arrayContaining([ - [ - ["tmux", "has-session", "-t", "repo-loop-8"], - expect.objectContaining({ stderr: "ignore", stdout: "ignore" }), - ], - [ - ["tmux", "capture-pane", "-p", "-t", "repo-loop-8:0.1"], - expect.objectContaining({ stderr: "ignore", stdout: "pipe" }), - ], - [ - [ - "tmux", - "send-keys", - "-t", - "repo-loop-8:0.1", - "-l", - "--", - "Please check the tmux path.", - ], - expect.objectContaining({ stderr: "ignore" }), - ], + expect(spawnSync.mock.calls).toEqual([ + [ + ["tmux", "has-session", "-t", "repo-loop-8"], + { stderr: "ignore", stdout: "ignore" }, + ], + [ + ["tmux", "capture-pane", "-p", "-t", "repo-loop-8:0.1"], + { stderr: "ignore", stdout: "pipe" }, + ], + [ [ - ["tmux", "send-keys", "-t", "repo-loop-8:0.1", "Enter"], - expect.objectContaining({ stderr: "ignore" }), + "tmux", + "send-keys", + "-t", + "repo-loop-8:0.1", + "-l", + "--", + "Claude: Please check the tmux path.", ], - ]) - ); + { stderr: "ignore" }, + ], + [ + ["tmux", "send-keys", "-t", "repo-loop-8:0.1", "Enter"], + { stderr: "ignore" }, + ], + ]); rmSync(root, { recursive: true, force: true }); }); @@ -1036,7 +1368,7 @@ test("bridge stale tmux cleanup logs non-zero Claude MCP remove exits", async () try { expect(bridge.clearStaleTmuxBridgeState(runDir)).toBe(true); expect(errorSpy).toHaveBeenCalledWith( - '[loop] failed to remove Claude channel server "loop-bridge-8": command failed' + '[loop] failed to remove Claude channel server "loop-bridge-repo-123-8": command failed' ); expect(readRunManifest(join(runDir, "manifest.json"))?.tmuxSession).toBe( undefined @@ -1081,7 +1413,7 @@ test("bridge stale tmux cleanup logs thrown Claude MCP remove errors", async () try { expect(bridge.clearStaleTmuxBridgeState(runDir)).toBe(true); expect(errorSpy).toHaveBeenCalledWith( - '[loop] failed to remove Claude channel server "loop-bridge-8": spawn failed' + '[loop] failed to remove Claude channel server "loop-bridge-repo-123-8": spawn failed' ); expect(readRunManifest(join(runDir, "manifest.json"))?.tmuxSession).toBe( undefined @@ -1092,6 +1424,52 @@ test("bridge stale tmux cleanup logs thrown Claude MCP remove errors", async () } }); +test("bridge stale tmux cleanup removes a persisted Claude server name", async () => { + const spawnSync = mock((args: string[]) => { + if (args[0] === "claude" && args[1] === "mcp" && args[2] === "remove") { + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + } + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + }); + const bridge = await loadBridge(); + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + claudeChannelServer: "loop-bridge-custom-8", + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "8", + status: "running", + tmuxSession: "repo-loop-8", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + + expect(bridge.clearStaleTmuxBridgeState(runDir)).toBe(true); + expect( + spawnSync.mock.calls.filter( + (call) => call[0]?.[0] === "claude" && call[0]?.[2] === "remove" + ) + ).toEqual( + expect.arrayContaining([ + [ + ["claude", "mcp", "remove", "--scope", "local", "loop-bridge-custom-8"], + expect.objectContaining({ stderr: "pipe", stdout: "ignore" }), + ], + ]) + ); + + rmSync(root, { recursive: true, force: true }); +}); + test("runBridgeWorker clears stale tmux routing and exits", async () => { const spawnSync = mock((args: string[]) => { if (args[0] === "tmux" && args[1] === "has-session") { @@ -1142,7 +1520,7 @@ test("runBridgeWorker clears stale tmux routing and exits", async () => { "remove", "--scope", "local", - bridge.claudeChannelServerName("8"), + bridge.claudeChannelServerName("8", "repo-123"), ], expect.objectContaining({ stderr: "pipe", stdout: "ignore" }), ], @@ -1152,6 +1530,198 @@ test("runBridgeWorker clears stale tmux routing and exits", async () => { rmSync(root, { recursive: true, force: true }); }); +test("runBridgeWorker falls back to app-server delivery after stale tmux cleanup", async () => { + let runDir = ""; + const injectCodexMessage = mock(() => { + const manifestPath = join(runDir, "manifest.json"); + const manifest = readRunManifest(manifestPath); + writeFileSync( + manifestPath, + `${JSON.stringify({ + ...manifest, + state: "completed", + status: "completed", + })}\n`, + "utf8" + ); + return true; + }); + const spawnSync = mock((args: string[]) => { + if (args[0] === "tmux" && args[1] === "has-session") { + return { exitCode: 1, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + } + if (args[0] === "claude" && args[1] === "mcp" && args[2] === "remove") { + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + } + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + }); + const bridge = await loadBridge({ injectCodexMessage }); + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; + const root = makeTempDir(); + runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "8", + state: "working", + status: "running", + tmuxSession: "repo-loop-8", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + bridge.bridgeInternals.appendBridgeEvent(runDir, { + at: "2026-03-23T10:01:00.000Z", + id: "msg-stale-fallback", + kind: "message", + message: "Please deliver this after tmux cleanup.", + source: "claude", + target: "codex", + }); + + await bridge.runBridgeWorker(runDir); + + expect(injectCodexMessage).toHaveBeenCalledWith( + "ws://127.0.0.1:4500", + "codex-thread-1", + "Claude: Please deliver this after tmux cleanup." + ); + expect(readRunManifest(join(runDir, "manifest.json"))?.tmuxSession).toBe( + undefined + ); + expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); + + rmSync(root, { recursive: true, force: true }); +}); + +test("ensureBridgeWorker launches one app-server worker per active run", async () => { + const bridge = await loadBridge(); + const spawn = mock(() => ({ + pid: process.pid, + unref: mock(() => undefined), + })); + bridge.bridgeRuntimeCommandDeps.spawn = spawn; + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "8", + state: "working", + status: "running", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + + expect(bridge.ensureBridgeWorker(runDir)).toBe(true); + expect(bridge.ensureBridgeWorker(runDir)).toBe(true); + expect(spawn).toHaveBeenCalledTimes(1); + expect(spawn.mock.calls[0]?.[0]).toEqual([ + "/opt/bun", + "src/loop/main.ts", + bridge.BRIDGE_WORKER_SUBCOMMAND, + runDir, + ]); + expect(spawn.mock.calls[0]?.[1]).toMatchObject({ + stderr: "ignore", + stdin: "ignore", + stdout: "ignore", + }); + + rmSync(root, { recursive: true, force: true }); +}); + +test("runBridgeWorker retries queued codex app-server messages", async () => { + let runDir = ""; + const injectCodexMessage = mock(() => { + if (injectCodexMessage.mock.calls.length === 1) { + throw new Error("turn still active"); + } + const manifestPath = join(runDir, "manifest.json"); + const manifest = readRunManifest(manifestPath); + writeFileSync( + manifestPath, + `${JSON.stringify({ + ...manifest, + state: "completed", + status: "completed", + })}\n`, + "utf8" + ); + return true; + }); + const bridge = await loadBridge({ injectCodexMessage }); + const root = makeTempDir(); + runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-1", + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "8", + state: "working", + status: "running", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + bridge.bridgeInternals.appendBridgeEvent(runDir, { + at: "2026-03-23T10:01:00.000Z", + id: "msg-4", + kind: "message", + message: "Please review the final diff.", + source: "claude", + target: "codex", + }); + + await bridge.runBridgeWorker(runDir); + + expect(injectCodexMessage).toHaveBeenCalledTimes(2); + expect(injectCodexMessage.mock.calls).toEqual([ + [ + "ws://127.0.0.1:4500", + "codex-thread-1", + "Claude: Please review the final diff.", + ], + [ + "ws://127.0.0.1:4500", + "codex-thread-1", + "Claude: Please review the final diff.", + ], + ]); + expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); + expect( + bridge.bridgeInternals + .readBridgeEvents(runDir) + .filter((event) => event.kind === "delivered") + ).toHaveLength(1); + + rmSync(root, { recursive: true, force: true }); +}); + test("bridge MCP delivers pending codex messages to Claude as channel notifications", async () => { const bridge = await loadBridge(); const root = makeTempDir(); @@ -1477,6 +2047,40 @@ test("bridge config helper writes the Claude MCP config file", async () => { rmSync(root, { recursive: true, force: true }); }); +test("bridge config helper writes the Claude MCP config file for a custom server", async () => { + const bridge = await loadBridge(); + const root = makeTempDir(); + const runDir = join(root, "run"); + const serverName = bridge.claudeChannelServerName("1", "repo-123"); + + const path = bridge.ensureClaudeBridgeConfig(runDir, "claude", serverName); + expect(path).toBe(join(runDir, "claude-mcp.json")); + expect(JSON.parse(readFileSync(path, "utf8"))).toEqual({ + mcpServers: { + [serverName]: { + args: ["src/loop/main.ts", bridge.BRIDGE_SUBCOMMAND, runDir, "claude"], + command: "/opt/bun", + type: "stdio", + }, + }, + }); + + rmSync(root, { recursive: true, force: true }); +}); + +test("bridge registration helper throws on unexpected Claude MCP add-json failures", async () => { + const bridge = await loadBridge(); + + expect(() => + bridge.registerClaudeChannelServer( + ["/opt/bun", "src/loop/main.ts"], + bridge.claudeChannelServerName("7", "repo-123"), + "/tmp/run", + () => ({ exitCode: 1, stderr: "command failed" }) + ) + ).toThrow("[loop] failed to register Claude channel server: command failed"); +}); + test("dispatchBridgeMessage reports delivered when direct codex delivery succeeds", async () => { const injectCodexMessage = mock(async () => true); const bridge = await loadBridge({ injectCodexMessage }); @@ -1513,7 +2117,7 @@ test("dispatchBridgeMessage reports delivered when direct codex delivery succeed expect(injectCodexMessage).toHaveBeenCalledWith( "ws://127.0.0.1:4500", "codex-thread-1", - "Please review the final diff." + "Claude: Please review the final diff." ); expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); diff --git a/tests/loop/codex-app-server.test.ts b/tests/loop/codex-app-server.test.ts index ba54696..d990dd1 100644 --- a/tests/loop/codex-app-server.test.ts +++ b/tests/loop/codex-app-server.test.ts @@ -372,6 +372,10 @@ test("injectCodexMessage sends the bridge handshake and notification opt-outs", write({ id: request.id, result: {} }); return; } + if (request.method === "thread/read") { + write({ id: request.id, result: { thread: { turns: [] } } }); + return; + } if (request.method === "turn/start") { write({ id: request.id, result: { turn: { id: "turn-1" } } }); } @@ -391,18 +395,13 @@ test("injectCodexMessage sends the bridge handshake and notification opt-outs", expect(frames.map((frame) => frame.method)).toEqual([ "initialize", "initialized", + "thread/read", "turn/start", ]); expect(frames[0]?.params).toMatchObject({ capabilities: { experimentalApi: true, - optOutNotificationMethods: [ - "error", - "item/completed", - "item/agentMessage/delta", - "turn/completed", - "turn/started", - ], + optOutNotificationMethods: ["item/completed", "item/agentMessage/delta"], }, clientInfo: { name: "loop-bridge", @@ -410,6 +409,10 @@ test("injectCodexMessage sends the bridge handshake and notification opt-outs", }, }); expect(frames[2]?.params).toMatchObject({ + includeTurns: true, + threadId: "thread-1", + }); + expect(frames[3]?.params).toMatchObject({ input: [ { text: "Please review the latest diff.", @@ -421,6 +424,157 @@ test("injectCodexMessage sends the bridge handshake and notification opt-outs", }); }); +test("injectCodexMessage steers an active turn instead of starting a new turn", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/read") { + write({ + id: request.id, + result: { + thread: { + turns: [{ id: "turn-active", status: "inProgress" }], + }, + }, + }); + return; + } + if (request.method === "turn/steer") { + write({ id: request.id, result: { turnId: "turn-active" } }); + } + }; + + await expect( + appServer.injectCodexMessage( + "ws://127.0.0.1:4500", + "thread-1", + "Please review the final diff." + ) + ).resolves.toBe(true); + const frames = wsWrites.map((line) => JSON.parse(line) as RequestFrame); + expect(frames.map((frame) => frame.method)).toEqual([ + "initialize", + "initialized", + "thread/read", + "turn/steer", + ]); + expect(frames[3]?.params).toMatchObject({ + expectedTurnId: "turn-active", + threadId: "thread-1", + }); +}); + +test("injectCodexMessage returns once turn/start is accepted", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/read") { + write({ id: request.id, result: { thread: { turns: [] } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, result: { turn: { id: "turn-1" } } }); + } + }; + + let resolved = false; + const pending = appServer + .injectCodexMessage( + "ws://127.0.0.1:4500", + "thread-1", + "Please review the final diff." + ) + .then((value) => { + resolved = true; + return value; + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(resolved).toBe(true); + await expect(pending).resolves.toBe(true); +}); + +test("injectCodexMessage retries a busy turn/start with turn/steer", async () => { + const appServer = await getModule(); + let threadReadCount = 0; + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/read") { + threadReadCount += 1; + write({ + id: request.id, + result: { + thread: { + turns: + threadReadCount === 1 + ? [] + : [{ id: "turn-active", status: "inProgress" }], + }, + }, + }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, error: { message: "turn still active" } }); + return; + } + if (request.method === "turn/steer") { + write({ id: request.id, result: { turnId: "turn-active" } }); + } + }; + + await expect( + appServer.injectCodexMessage( + "ws://127.0.0.1:4500", + "thread-1", + "Please review the final diff." + ) + ).resolves.toBe(true); + const frames = wsWrites.map((line) => JSON.parse(line) as RequestFrame); + expect(frames.map((frame) => frame.method)).toEqual([ + "initialize", + "initialized", + "thread/read", + "turn/start", + "thread/read", + "turn/steer", + ]); +}); + +test("injectCodexMessage rejects failed turn/start requests", async () => { + const appServer = await getModule(); + currentHandler = (request, write) => { + if (request.method === "initialize") { + write({ id: request.id, result: {} }); + return; + } + if (request.method === "thread/read") { + write({ id: request.id, result: { thread: { turns: [] } } }); + return; + } + if (request.method === "turn/start") { + write({ id: request.id, error: { message: "policy blocked" } }); + } + }; + + await expect( + appServer.injectCodexMessage( + "ws://127.0.0.1:4500", + "thread-1", + "Please review the final diff." + ) + ).rejects.toThrow("policy blocked"); +}); + test("runCodexTurn promotes thread/start unsupported errors to fallback errors", async () => { const appServer = await getModule(); currentHandler = (request, write) => { @@ -1108,6 +1262,10 @@ test("injectCodexMessage sends initialized and bridge notification opt-outs", as write({ id: request.id, result: {} }); return; } + if (request.method === "thread/read") { + write({ id: request.id, result: { thread: { turns: [] } } }); + return; + } if (request.method === "turn/start") { write({ id: request.id, result: { turn: { id: "turn-1" } } }); } @@ -1127,11 +1285,8 @@ test("injectCodexMessage sends initialized and bridge notification opt-outs", as capabilities: { experimentalApi: true, optOutNotificationMethods: [ - "error", "item/completed", "item/agentMessage/delta", - "turn/completed", - "turn/started", ], }, }, @@ -1141,6 +1296,13 @@ test("injectCodexMessage sends initialized and bridge notification opt-outs", as method: "initialized", }); expect(frames[2]).toMatchObject({ + method: "thread/read", + params: { + includeTurns: true, + threadId: "thread-1", + }, + }); + expect(frames[3]).toMatchObject({ method: "turn/start", params: { threadId: "thread-1", diff --git a/tests/loop/codex-tmux-proxy.test.ts b/tests/loop/codex-tmux-proxy.test.ts index 0a73886..92fe25c 100644 --- a/tests/loop/codex-tmux-proxy.test.ts +++ b/tests/loop/codex-tmux-proxy.test.ts @@ -1,5 +1,24 @@ import { expect, test } from "bun:test"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; import { codexTmuxProxyInternals } from "../../src/loop/codex-tmux-proxy"; +import { + createRunManifest, + readRunManifest, + writeRunManifest, +} from "../../src/loop/run-state"; + +const bridgeMessage = { + at: "2026-03-29T00:00:00.000Z", + id: "msg-1", + kind: "message" as const, + message: "Please review the latest diff.", + source: "claude" as const, + target: "codex" as const, +}; + +const makeTempDir = (): string => mkdtempSync(join(tmpdir(), "loop-proxy-")); test("codex tmux proxy waits briefly for the tmux session to appear", () => { const now = Date.now(); @@ -42,3 +61,113 @@ test("codex tmux proxy stops immediately after a seen tmux session disappears", ) ).toBe(false); }); + +test("codex tmux proxy records turn ids from turn/start responses", () => { + const turnIds = new Set(["turn-1"]); + + codexTmuxProxyInternals.noteStartedTurn(turnIds, { + turn: { id: "turn-2" }, + }); + + expect([...turnIds]).toEqual(["turn-1", "turn-2"]); + expect(codexTmuxProxyInternals.latestActiveTurnId(turnIds)).toBe("turn-2"); +}); + +test("codex tmux proxy keeps the newest active turn id", () => { + const activeTurns = new Set(["turn-1", "turn-2"]); + + expect(codexTmuxProxyInternals.latestActiveTurnId(activeTurns)).toBe( + "turn-2" + ); + expect(codexTmuxProxyInternals.latestActiveTurnId(new Set())).toBe(undefined); +}); + +test("codex tmux proxy steers bridge messages into an active turn", () => { + expect( + codexTmuxProxyInternals.buildBridgeInjectionFrame( + -1, + "thread-1", + bridgeMessage, + "turn-active" + ) + ).toEqual({ + id: -1, + method: "turn/steer", + params: { + expectedTurnId: "turn-active", + input: [ + { + text: "Claude: Please review the latest diff.", + text_elements: [], + type: "text", + }, + ], + threadId: "thread-1", + }, + }); +}); + +test("codex tmux proxy starts a new turn when no active turn exists", () => { + expect( + codexTmuxProxyInternals.buildBridgeInjectionFrame( + -1, + "thread-1", + bridgeMessage + ) + ).toEqual({ + id: -1, + method: "turn/start", + params: { + input: [ + { + text: "Claude: Please review the latest diff.", + text_elements: [], + type: "text", + }, + ], + threadId: "thread-1", + }, + }); +}); + +test("codex tmux proxy only pauses bridge drain when it cannot steer", () => { + expect( + codexTmuxProxyInternals.shouldPauseBridgeDrain(false, undefined, 0) + ).toBe(false); + expect( + codexTmuxProxyInternals.shouldPauseBridgeDrain(true, "turn-active", 0) + ).toBe(false); + expect( + codexTmuxProxyInternals.shouldPauseBridgeDrain(true, undefined, 0) + ).toBe(true); + expect( + codexTmuxProxyInternals.shouldPauseBridgeDrain(false, undefined, 1) + ).toBe(true); +}); + +test("codex tmux proxy persists newer live thread ids to the run manifest", () => { + const root = makeTempDir(); + const manifestPath = join(root, "manifest.json"); + writeRunManifest( + manifestPath, + createRunManifest({ + claudeSessionId: "claude-1", + codexRemoteUrl: "ws://127.0.0.1:4500", + codexThreadId: "codex-thread-startup", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "7", + tmuxSession: "loop-loop-7", + }) + ); + + codexTmuxProxyInternals.persistCodexThreadId(root, "codex-thread-live"); + + expect(readRunManifest(manifestPath)?.codexThreadId).toBe( + "codex-thread-live" + ); + + rmSync(root, { recursive: true, force: true }); +}); diff --git a/tests/loop/paired-loop.test.ts b/tests/loop/paired-loop.test.ts index 557eeeb..550b8bf 100644 --- a/tests/loop/paired-loop.test.ts +++ b/tests/loop/paired-loop.test.ts @@ -610,7 +610,7 @@ test("runPairedLoop delivers forwarded bridge messages to the target agent", asy expect(calls[0]?.prompt).toContain("Please review the Codex output."); expect(calls[0]?.prompt).toContain("Do not reply to the human."); expect(calls[0]?.prompt).toContain( - 'Reply to the other agent with "send_to_agent"' + 'Send a message to the other agent with "send_to_agent"' ); const events = bridgeInternals.readBridgeEvents(runDir); @@ -772,9 +772,7 @@ test("runPairedLoop delivers peer messages back to the primary agent", async () expect(calls).toHaveLength(3); expect(calls[0]?.agent).toBe("claude"); expect(calls[1]?.agent).toBe("codex"); - expect(calls[1]?.prompt).toContain( - "Message from Claude via the loop bridge:" - ); + expect(calls[1]?.prompt).toContain("Claude: Please verify"); expect(calls[1]?.prompt).toContain( "Please verify the implementation details." ); @@ -787,7 +785,7 @@ test("runPairedLoop delivers peer messages back to the primary agent", async () "Found one change to make before landing this." ); expect(calls[2]?.prompt).toContain( - 'Reply to the other agent with "send_to_agent"' + 'Send a message to the other agent with "send_to_agent"' ); }); }); @@ -816,9 +814,7 @@ test("runPairedLoop skips the default work turn after draining input for the pri expect(calls).toHaveLength(1); expect(calls[0]?.agent).toBe("codex"); - expect(calls[0]?.prompt).toContain( - "Message from Claude via the loop bridge:" - ); + expect(calls[0]?.prompt).toContain("Claude: Please verify"); expect(calls[0]?.prompt).toContain( "Please verify the implementation details." ); diff --git a/tests/loop/paired-options.test.ts b/tests/loop/paired-options.test.ts index 4914f2b..c06fe6e 100644 --- a/tests/loop/paired-options.test.ts +++ b/tests/loop/paired-options.test.ts @@ -1,7 +1,8 @@ import { expect, test } from "bun:test"; -import { mkdtempSync, rmSync } from "node:fs"; +import { mkdtempSync, readFileSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { claudeChannelServerName } from "../../src/loop/bridge-config"; import { preparePairedOptions, preparePairedRun, @@ -66,6 +67,93 @@ test("preparePairedOptions accepts a raw session id without creating a paired ma } }); +test("preparePairedOptions writes a readable Claude bridge server for fresh runs", () => { + const home = makeTempHome(); + const originalHome = process.env.HOME; + const originalRunId = process.env.LOOP_RUN_ID; + process.env.HOME = home; + Reflect.deleteProperty(process.env, "LOOP_RUN_ID"); + + try { + const opts = makeOptions({ agent: "claude", pairedMode: true }); + + preparePairedOptions(opts, process.cwd(), true); + + const storage = resolveRunStorage("1", process.cwd(), home); + const manifest = readRunManifest(storage.manifestPath); + const serverName = claudeChannelServerName(storage.runId, storage.repoId); + expect(manifest?.claudeChannelServer).toBe(serverName); + expect(manifest?.claudeChannelServer).not.toContain(storage.repoId); + const configPath = opts.claudeMcpConfigPath; + expect(configPath).toBeDefined(); + const config = JSON.parse(readFileSync(configPath ?? "", "utf8")); + expect(Object.keys(config.mcpServers)).toEqual([serverName]); + } finally { + if (originalHome === undefined) { + Reflect.deleteProperty(process.env, "HOME"); + } else { + process.env.HOME = originalHome; + } + if (originalRunId === undefined) { + Reflect.deleteProperty(process.env, "LOOP_RUN_ID"); + } else { + process.env.LOOP_RUN_ID = originalRunId; + } + rmSync(home, { recursive: true, force: true }); + } +}); + +test("preparePairedRun upgrades an older hashed Claude bridge server name", () => { + const home = makeTempHome(); + const originalHome = process.env.HOME; + const originalRunId = process.env.LOOP_RUN_ID; + process.env.HOME = home; + Reflect.deleteProperty(process.env, "LOOP_RUN_ID"); + + try { + const storage = resolveRunStorage("alpha", process.cwd(), home); + writeRunManifest( + storage.manifestPath, + createRunManifest( + { + claudeChannelServer: `loop-bridge-${storage.repoId}-alpha`, + claudeSessionId: "claude-session-1", + codexThreadId: "codex-thread-1", + cwd: process.cwd(), + mode: "paired", + pid: 1234, + repoId: storage.repoId, + runId: "alpha", + status: "running", + }, + "2026-03-29T10:00:00.000Z" + ) + ); + const opts = makeOptions({ resumeRunId: "alpha" }); + + const prepared = preparePairedRun(opts, process.cwd()); + const serverName = claudeChannelServerName(storage.runId, storage.repoId); + + expect(prepared.manifest.claudeChannelServer).toBe(serverName); + expect(readRunManifest(storage.manifestPath)?.claudeChannelServer).toBe( + serverName + ); + expect(serverName).not.toContain(storage.repoId); + } finally { + if (originalHome === undefined) { + Reflect.deleteProperty(process.env, "HOME"); + } else { + process.env.HOME = originalHome; + } + if (originalRunId === undefined) { + Reflect.deleteProperty(process.env, "LOOP_RUN_ID"); + } else { + process.env.LOOP_RUN_ID = originalRunId; + } + rmSync(home, { recursive: true, force: true }); + } +}); + test("preparePairedOptions ignores stored session ids from a completed paired run", () => { const home = makeTempHome(); const originalHome = process.env.HOME; diff --git a/tests/loop/tmux.test.ts b/tests/loop/tmux.test.ts index 505b524..6987ada 100644 --- a/tests/loop/tmux.test.ts +++ b/tests/loop/tmux.test.ts @@ -282,7 +282,10 @@ test("runInTmux starts paired tmux panes for Claude and Codex", async () => { ); const env = ["LOOP_RUN_BASE=repo", "LOOP_RUN_ID=1"]; - const claudeChannelServer = tmuxInternals.buildClaudeChannelServerName("1"); + const claudeChannelServer = tmuxInternals.buildClaudeChannelServerName( + "1", + storage.repoId + ); const claudeChannelConfig = tmuxInternals.buildClaudeChannelServerConfig( ["bun", "/repo/src/cli.ts"], storage.runDir @@ -291,7 +294,8 @@ test("runInTmux starts paired tmux panes for Claude and Codex", async () => { "Ship feature", opts, "claude", - "1" + "1", + claudeChannelServer ); const claudeCommand = tmuxInternals.buildShellCommand([ "env", @@ -311,7 +315,12 @@ test("runInTmux starts paired tmux panes for Claude and Codex", async () => { codexProxyUrl, "test-model", codexMcpConfigArgs, - tmuxInternals.buildPrimaryPrompt("Ship feature", opts, "1") + tmuxInternals.buildPrimaryPrompt( + "Ship feature", + opts, + "1", + claudeChannelServer + ) ), ]); @@ -628,11 +637,15 @@ test("runInTmux starts paired interactive tmux panes without a task", async () = expect(delegated).toBe(true); expect(calls[0]).toEqual(["tmux", "has-session", "-t", "repo-loop-1"]); const env = ["LOOP_RUN_BASE=repo", "LOOP_RUN_ID=1"]; - const claudeChannelServer = tmuxInternals.buildClaudeChannelServerName("1"); + const claudeChannelServer = tmuxInternals.buildClaudeChannelServerName( + "1", + storage.repoId + ); const claudePrompt = tmuxInternals.buildInteractivePeerPrompt( opts, "claude", - "1" + "1", + claudeChannelServer ); const claudeCommand = tmuxInternals.buildShellCommand([ "env", @@ -662,7 +675,11 @@ test("runInTmux starts paired interactive tmux panes without a task", async () = "ws://127.0.0.1:4600/", "test-model", ["-c", 'mcp_servers.loop-bridge.command="loop"'], - tmuxInternals.buildInteractivePrimaryPrompt(opts, "1") + tmuxInternals.buildInteractivePrimaryPrompt( + opts, + "1", + claudeChannelServer + ) ), ]); expect(calls[3]).toEqual([ @@ -748,16 +765,22 @@ test("runInTmux keeps the no-prompt Claude startup wait bounded", async () => { test("tmux prompts keep the paired review workflow explicit", () => { const opts = makePairedOptions(); + const claudeChannelServer = tmuxInternals.buildClaudeChannelServerName( + "1", + "repo-123" + ); const primaryPrompt = tmuxInternals.buildPrimaryPrompt( "Ship feature", opts, - "1" + "1", + claudeChannelServer ); const peerPrompt = tmuxInternals.buildPeerPrompt( "Ship feature", opts, "claude", - "1" + "1", + claudeChannelServer ); expect(primaryPrompt).toContain("Agent-to-agent pair programming"); @@ -776,21 +799,30 @@ test("tmux prompts keep the paired review workflow explicit", () => { expect(peerPrompt).toContain("You are the reviewer/support agent."); expect(peerPrompt).toContain("Do not take over the task or create the PR"); expect(peerPrompt).toContain("Wait for Codex to send you a targeted request"); - expect(peerPrompt).toContain('"reply"'); + expect(peerPrompt).not.toContain('"reply"'); expect(peerPrompt).toContain( - 'Use "send_to_agent" with target: "codex" only for new proactive messages to Codex; do not send Codex-facing responses as a human-facing message.' + 'Use "send_to_agent" with target: "codex" for Codex-facing messages, including replies to inbound Codex channel messages; do not send Codex-facing responses as a human-facing message.' ); - expect(primaryPrompt).not.toContain("mcp__loop-bridge-1__ prefix"); - expect(peerPrompt).toContain("mcp__loop-bridge-1__ prefix"); + expect(primaryPrompt).not.toContain("mcp__loop-bridge-repo-123-1__ prefix"); + expect(peerPrompt).toContain("mcp__loop-bridge-repo-123-1__ prefix"); }); test("interactive tmux prompts tell both agents to wait for the human", () => { const opts = makePairedOptions({ proof: "" }); - const primaryPrompt = tmuxInternals.buildInteractivePrimaryPrompt(opts, "1"); + const claudeChannelServer = tmuxInternals.buildClaudeChannelServerName( + "1", + "repo-123" + ); + const primaryPrompt = tmuxInternals.buildInteractivePrimaryPrompt( + opts, + "1", + claudeChannelServer + ); const peerPrompt = tmuxInternals.buildInteractivePeerPrompt( opts, "claude", - "1" + "1", + claudeChannelServer ); expect(primaryPrompt).toContain("Agent-to-agent pair programming"); @@ -809,15 +841,15 @@ test("interactive tmux prompts tell both agents to wait for the human", () => { ); expect(peerPrompt).toContain("Wait for Codex to provide a concrete task"); expect(peerPrompt).toContain("human clearly assigns you separate work"); - expect(peerPrompt).toContain('"reply"'); + expect(peerPrompt).not.toContain('"reply"'); expect(peerPrompt).toContain( - 'Use "send_to_agent" with target: "codex" only for new proactive messages to Codex; do not send Codex-facing responses as a human-facing message.' + 'Use "send_to_agent" with target: "codex" for Codex-facing messages, including replies to inbound Codex channel messages; do not send Codex-facing responses as a human-facing message.' ); expect(peerPrompt).toContain( "If you are answering Codex, use the bridge tools instead of a human-facing reply." ); - expect(primaryPrompt).not.toContain("mcp__loop-bridge-1__ prefix"); - expect(peerPrompt).toContain("mcp__loop-bridge-1__ prefix"); + expect(primaryPrompt).not.toContain("mcp__loop-bridge-repo-123-1__ prefix"); + expect(peerPrompt).toContain("mcp__loop-bridge-repo-123-1__ prefix"); }); test("runInTmux auto-confirms Claude startup prompts in paired mode", async () => { @@ -1406,8 +1438,10 @@ test("runInTmux reopens paired tmux panes without replaying the task", async () ); const env = ["LOOP_RUN_BASE=repo", "LOOP_RUN_ID=alpha"]; - const claudeChannelServer = - tmuxInternals.buildClaudeChannelServerName("alpha"); + const claudeChannelServer = tmuxInternals.buildClaudeChannelServerName( + "alpha", + storage.repoId + ); const claudeChannelConfig = tmuxInternals.buildClaudeChannelServerConfig( ["bun", "/repo/src/cli.ts"], storage.runDir @@ -2025,6 +2059,96 @@ test("runInTmux surfaces tmux startup errors", async () => { ).rejects.toThrow("Failed to start tmux session: boom"); }); +test("runInTmux removes the Claude bridge server when paired tmux startup fails", async () => { + const calls: string[][] = []; + let manifest = createRunManifest({ + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "1", + status: "running", + }); + const opts = makePairedOptions(); + const storage = { + manifestPath: "/repo/.loop/runs/1/manifest.json", + repoId: "repo-123", + runDir: "/repo/.loop/runs/1", + runId: "1", + storageRoot: "/repo/.loop/runs", + transcriptPath: "/repo/.loop/runs/1/transcript.jsonl", + }; + + await expect( + runInTmux( + ["--tmux", "--proof", "verify with tests"], + { + capturePane: () => "", + cwd: "/repo", + env: {}, + findBinary: () => true, + getCodexAppServerUrl: () => "ws://127.0.0.1:4500", + getLastCodexThreadId: () => "codex-thread-1", + isInteractive: () => false, + launchArgv: ["bun", "/repo/src/cli.ts"], + log: (): void => undefined, + makeClaudeSessionId: () => "claude-session-1", + preparePairedRun: (nextOpts) => { + nextOpts.codexMcpConfigArgs = [ + "-c", + 'mcp_servers.loop-bridge.command="loop"', + ]; + return { manifest, storage }; + }, + sendKeys: (): void => undefined, + sendText: (): void => undefined, + sleep: () => Promise.resolve(), + startCodexProxy: () => Promise.resolve("ws://127.0.0.1:4600/"), + startPersistentAgentSession: () => Promise.resolve(undefined), + spawn: (args: string[]) => { + calls.push(args); + if (args[0] === "tmux" && args[1] === "has-session") { + return { exitCode: 1, stderr: "" }; + } + if (args[0] === "tmux" && args[1] === "new-session") { + return { exitCode: 1, stderr: "boom" }; + } + return { exitCode: 0, stderr: "" }; + }, + updateRunManifest: (_path, update) => { + manifest = update(manifest) ?? manifest; + return manifest; + }, + }, + { opts, task: "Ship feature" } + ) + ).rejects.toThrow("Failed to start tmux session: boom"); + + expect(calls).toContainEqual([ + "claude", + "mcp", + "add-json", + "--scope", + "local", + tmuxInternals.buildClaudeChannelServerName("1", "repo-123"), + tmuxInternals.buildClaudeChannelServerConfig( + ["bun", "/repo/src/cli.ts"], + storage.runDir + ), + ]); + expect(calls).toContainEqual([ + "claude", + "mcp", + "remove", + "--scope", + "local", + tmuxInternals.buildClaudeChannelServerName("1", "repo-123"), + ]); + expect( + calls.some((args) => args[0] === "tmux" && args[1] === "kill-session") + ).toBe(false); +}); + test("runInTmux skips auto-attach for non-interactive sessions", async () => { const attaches: string[] = [];