From 9086f04cc322da4ea76ee5931b032638156a4712 Mon Sep 17 00:00:00 2001 From: Axel Delafosse Date: Fri, 27 Mar 2026 00:43:49 -0700 Subject: [PATCH] Refactor loop bridge runtime and storage --- src/cli.ts | 6 +- src/loop/bridge-config.ts | 52 ++ src/loop/bridge-constants.ts | 4 + src/loop/bridge-runtime.ts | 354 +++++++++ src/loop/bridge-store.ts | 307 ++++++++ src/loop/bridge.ts | 702 ++---------------- src/loop/codex-tmux-proxy.ts | 19 +- src/loop/paired-loop.ts | 9 +- src/loop/paired-options.ts | 5 +- src/loop/tmux.ts | 2 +- tests/loop/00-paired-loop.integration.test.ts | 6 +- tests/loop/bridge.test.ts | 211 +++++- tests/loop/codex-app-server.test.ts | 2 +- tests/loop/paired-loop.test.ts | 6 +- 14 files changed, 979 insertions(+), 706 deletions(-) create mode 100644 src/loop/bridge-config.ts create mode 100644 src/loop/bridge-constants.ts create mode 100644 src/loop/bridge-runtime.ts create mode 100644 src/loop/bridge-store.ts diff --git a/src/cli.ts b/src/cli.ts index 64edc94..81fa310 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -1,10 +1,10 @@ #!/usr/bin/env bun +import { runBridgeMcpServer } from "./loop/bridge"; import { BRIDGE_SUBCOMMAND, BRIDGE_WORKER_SUBCOMMAND, - runBridgeMcpServer, - runBridgeWorker, -} from "./loop/bridge"; +} from "./loop/bridge-constants"; +import { runBridgeWorker } from "./loop/bridge-runtime"; import { closeClaudeSdk } from "./loop/claude-sdk-server"; import { closeAppServer } from "./loop/codex-app-server"; import { diff --git a/src/loop/bridge-config.ts b/src/loop/bridge-config.ts new file mode 100644 index 0000000..bb56c48 --- /dev/null +++ b/src/loop/bridge-config.ts @@ -0,0 +1,52 @@ +import { mkdirSync, writeFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { BRIDGE_SERVER, BRIDGE_SUBCOMMAND } from "./bridge-constants"; +import { buildLaunchArgv } from "./launch"; +import type { Agent } from "./types"; + +const ensureParentDir = (path: string): void => { + mkdirSync(dirname(path), { recursive: true }); +}; + +const stringifyToml = (value: string): string => JSON.stringify(value); + +export const buildCodexBridgeConfigArgs = ( + runDir: string, + source: Agent +): string[] => { + const [command, ...baseArgs] = buildLaunchArgv(); + const args = [...baseArgs, BRIDGE_SUBCOMMAND, runDir, source]; + return [ + "-c", + `mcp_servers.${BRIDGE_SERVER}.command=${stringifyToml(command)}`, + "-c", + `mcp_servers.${BRIDGE_SERVER}.args=${JSON.stringify(args)}`, + ]; +}; + +export const ensureClaudeBridgeConfig = ( + runDir: string, + source: Agent +): string => { + const [command, ...baseArgs] = buildLaunchArgv(); + const path = join(runDir, `${source}-mcp.json`); + ensureParentDir(path); + writeFileSync( + path, + `${JSON.stringify( + { + mcpServers: { + [BRIDGE_SERVER]: { + args: [...baseArgs, BRIDGE_SUBCOMMAND, runDir, source], + command, + type: "stdio", + }, + }, + }, + null, + 2 + )}\n`, + "utf8" + ); + return path; +}; diff --git a/src/loop/bridge-constants.ts b/src/loop/bridge-constants.ts new file mode 100644 index 0000000..a3c8199 --- /dev/null +++ b/src/loop/bridge-constants.ts @@ -0,0 +1,4 @@ +export const BRIDGE_SUBCOMMAND = "__bridge-mcp"; +export const BRIDGE_WORKER_SUBCOMMAND = "__bridge-worker"; +export const BRIDGE_SERVER = "loop-bridge"; +export const CLAUDE_CHANNEL_USER = "Codex"; diff --git a/src/loop/bridge-runtime.ts b/src/loop/bridge-runtime.ts new file mode 100644 index 0000000..9be19c4 --- /dev/null +++ b/src/loop/bridge-runtime.ts @@ -0,0 +1,354 @@ +import { join } from "node:path"; +import { spawnSync } from "bun"; +import { BRIDGE_SERVER, CLAUDE_CHANNEL_USER } from "./bridge-constants"; +import { + appendBridgeMessage, + type BridgeMessage, + markBridgeMessage, + readBridgeInbox, + readBridgeStatus, + readPendingBridgeMessages, +} from "./bridge-store"; +import { injectCodexMessage } from "./codex-app-server"; +import { sanitizeBase } from "./git"; +import { + isActiveRunState, + parseRunLifecycleState, + touchRunManifest, + updateRunManifest, +} from "./run-state"; +import type { Agent } from "./types"; + +const CLAUDE_CHANNEL_METHOD = "notifications/claude/channel"; +const CLAUDE_CHANNEL_SOURCE_TYPE = "codex"; +const CLAUDE_CHANNEL_USER_ID = "codex"; +const CODEX_TMUX_PANE = "0.1"; +const CODEX_TMUX_READY_DELAY_MS = 250; +const CODEX_TMUX_READY_POLLS = 20; +const CODEX_TMUX_SEND_FOOTER = "Ctrl+J newline"; + +export const bridgeRuntimeCommandDeps = { spawnSync }; + +const wait = async (ms: number): Promise => { + await new Promise((resolve) => { + setTimeout(resolve, ms); + }); +}; + +const decodeOutput = (value: Uint8Array): string => + new TextDecoder().decode(value); + +const codexPane = (session: string): string => `${session}:${CODEX_TMUX_PANE}`; + +const capturePane = (pane: string): string => { + const result = bridgeRuntimeCommandDeps.spawnSync( + ["tmux", "capture-pane", "-p", "-t", pane], + { + stderr: "ignore", + stdout: "pipe", + } + ); + if (result.exitCode !== 0) { + return ""; + } + return decodeOutput(result.stdout); +}; + +const sendPaneKeys = (pane: string, keys: string[]): void => { + bridgeRuntimeCommandDeps.spawnSync( + ["tmux", "send-keys", "-t", pane, ...keys], + { + stderr: "ignore", + } + ); +}; + +const sendPaneText = (pane: string, text: string): void => { + bridgeRuntimeCommandDeps.spawnSync( + ["tmux", "send-keys", "-t", pane, "-l", "--", text], + { + stderr: "ignore", + } + ); +}; + +const waitForCodexPane = async (session: string): Promise => { + const pane = codexPane(session); + for (let attempt = 0; attempt < CODEX_TMUX_READY_POLLS; attempt += 1) { + if (capturePane(pane).includes(CODEX_TMUX_SEND_FOOTER)) { + return true; + } + await wait(CODEX_TMUX_READY_DELAY_MS); + } + return false; +}; + +const injectCodexTmuxMessage = async ( + session: string, + message: string +): Promise => { + if (!(session && (await waitForCodexPane(session)))) { + return false; + } + const pane = codexPane(session); + const lines = message.split("\n"); + for (let index = 0; index < lines.length; index += 1) { + sendPaneText(pane, lines[index] ?? ""); + if (index < lines.length - 1) { + sendPaneKeys(pane, ["C-j"]); + } + } + await wait(100); + sendPaneKeys(pane, ["Enter"]); + return true; +}; + +const tmuxSessionExists = (session: string): boolean => { + const result = bridgeRuntimeCommandDeps.spawnSync( + ["tmux", "has-session", "-t", session], + { + stderr: "ignore", + stdout: "ignore", + } + ); + return result.exitCode === 0; +}; + +export const claudeChannelServerName = (runId: string): string => + `${BRIDGE_SERVER}-${sanitizeBase(runId)}`; + +const logClaudeChannelServerRemovalFailure = ( + serverName: string, + detail: string +): void => { + console.error( + `[loop] failed to remove Claude channel server "${serverName}": ${detail}` + ); +}; + +const removeClaudeChannelServer = (runId: string): void => { + if (!runId) { + return; + } + const serverName = claudeChannelServerName(runId); + try { + const result = bridgeRuntimeCommandDeps.spawnSync( + ["claude", "mcp", "remove", "--scope", "local", serverName], + { + stderr: "pipe", + stdout: "ignore", + } + ); + if (result.exitCode === 0) { + return; + } + const stderr = result.stderr ? decodeOutput(result.stderr).trim() : ""; + logClaudeChannelServerRemovalFailure( + serverName, + stderr || `exit code ${result.exitCode ?? "unknown"}` + ); + } catch (error: unknown) { + // Cleanup should not fail the bridge flow. + logClaudeChannelServerRemovalFailure( + serverName, + error instanceof Error ? error.message : String(error) + ); + } +}; + +export const clearStaleTmuxBridgeState = (runDir: string): boolean => { + let removedRunId = ""; + const next = updateRunManifest(join(runDir, "manifest.json"), (manifest) => { + if (!manifest?.tmuxSession) { + return manifest; + } + removedRunId = manifest.runId; + return touchRunManifest( + { + ...manifest, + tmuxSession: undefined, + }, + new Date().toISOString() + ); + }); + if (!(next && removedRunId)) { + return false; + } + removeClaudeChannelServer(removedRunId); + return true; +}; + +export const acknowledgeBridgeDelivery = ( + runDir: string, + message: BridgeMessage, + reason?: string +): void => { + markBridgeMessage(runDir, message, "delivered", reason); +}; + +export const consumeBridgeInbox = ( + runDir: string, + target: Agent, + reason: string +): BridgeMessage[] => { + const messages = readBridgeInbox(runDir, target); + for (const message of messages) { + acknowledgeBridgeDelivery(runDir, message, reason); + } + return messages; +}; + +export const readNextPendingBridgeMessage = ( + runDir: string +): BridgeMessage | undefined => readPendingBridgeMessages(runDir)[0]; + +export const readNextPendingBridgeMessageForTarget = ( + runDir: string, + target: Agent +): BridgeMessage | undefined => + readPendingBridgeMessages(runDir).find((entry) => entry.target === target); + +const claudeChannelSessionId = (runDir: string): string => { + const runId = readBridgeStatus(runDir).runId || "bridge"; + return `codex_${runId}`; +}; + +const writeChannelNotification = ( + runDir: string, + message: BridgeMessage, + writeJsonRpc: (payload: unknown) => void +): void => { + writeJsonRpc({ + jsonrpc: "2.0", + method: CLAUDE_CHANNEL_METHOD, + params: { + content: message.message, + meta: { + chat_id: claudeChannelSessionId(runDir), + message_id: message.id, + source_type: CLAUDE_CHANNEL_SOURCE_TYPE, + ts: new Date(message.at).toISOString(), + user: CLAUDE_CHANNEL_USER, + user_id: CLAUDE_CHANNEL_USER_ID, + }, + }, + }); +}; + +export const flushClaudeChannelMessages = ( + runDir: string, + writeJsonRpc: (payload: unknown) => void +): void => { + for (const message of readBridgeInbox(runDir, "claude")) { + writeChannelNotification(runDir, message, writeJsonRpc); + acknowledgeBridgeDelivery(runDir, message); + } +}; + +export const deliverCodexBridgeMessage = async ( + runDir: string, + message: BridgeMessage +): Promise => { + const status = readBridgeStatus(runDir); + if (status.tmuxSession) { + if (tmuxSessionExists(status.tmuxSession)) { + return false; + } + clearStaleTmuxBridgeState(runDir); + } + if (!(status.codexRemoteUrl && status.codexThreadId)) { + return false; + } + try { + const delivered = await injectCodexMessage( + status.codexRemoteUrl, + status.codexThreadId, + message.message + ); + if (delivered) { + acknowledgeBridgeDelivery(runDir, message, "sent to codex app-server"); + } + return delivered; + } catch { + return false; + } +}; + +export const drainCodexTmuxMessages = async ( + runDir: string +): Promise => { + const { tmuxSession } = readBridgeStatus(runDir); + if (!tmuxSession) { + return false; + } + if (!tmuxSessionExists(tmuxSession)) { + clearStaleTmuxBridgeState(runDir); + return false; + } + const message = readNextPendingBridgeMessageForTarget(runDir, "codex"); + if (!message) { + return false; + } + const delivered = await injectCodexTmuxMessage(tmuxSession, message.message); + if (!delivered) { + return false; + } + acknowledgeBridgeDelivery(runDir, message, "sent to codex tmux pane"); + return true; +}; + +export interface BridgeDispatchResult { + delivered: boolean; + entry: BridgeMessage; +} + +export const dispatchBridgeMessage = async ( + runDir: string, + source: Agent, + target: Agent, + message: string +): Promise => { + const entry = appendBridgeMessage(runDir, source, target, message); + const delivered = + target === "codex" ? await deliverCodexBridgeMessage(runDir, entry) : false; + return { delivered, entry }; +}; + +export const formatDispatchResult = ( + runDir: string, + target: Agent, + delivered: boolean, + entry: BridgeMessage +): string => { + if (delivered) { + return `delivered ${entry.id} to ${target}`; + } + const status = readBridgeStatus(runDir); + if ( + target === "codex" && + status.tmuxSession && + tmuxSessionExists(status.tmuxSession) + ) { + return `accepted ${entry.id} for codex delivery`; + } + return `queued ${entry.id} for ${target}`; +}; + +export const runBridgeWorker = async (runDir: string): Promise => { + while (true) { + const status = readBridgeStatus(runDir); + const state = parseRunLifecycleState(status.state); + if (!(state && isActiveRunState(state))) { + return; + } + if (!status.tmuxSession) { + return; + } + if (!tmuxSessionExists(status.tmuxSession)) { + clearStaleTmuxBridgeState(runDir); + return; + } + const delivered = await drainCodexTmuxMessages(runDir); + await wait(delivered ? 100 : CODEX_TMUX_READY_DELAY_MS); + } +}; diff --git a/src/loop/bridge-store.ts b/src/loop/bridge-store.ts new file mode 100644 index 0000000..2dd44be --- /dev/null +++ b/src/loop/bridge-store.ts @@ -0,0 +1,307 @@ +import { createHash } from "node:crypto"; +import { appendFileSync, existsSync, mkdirSync, readFileSync } from "node:fs"; +import { dirname, join } from "node:path"; +import { + appendRunTranscriptEntry, + buildTranscriptPath, + readRunManifest, +} from "./run-state"; +import type { Agent } from "./types"; + +const BRIDGE_FILE = "bridge.jsonl"; +const LINE_SPLIT_RE = /\r?\n/; +const MAX_STATUS_MESSAGES = 100; +const BRIDGE_PREFIX_RE = + /^Message from (Claude|Codex) via the loop bridge:\s*/i; + +interface BridgeBaseEvent { + at: string; + id: string; + signature?: string; + source: Agent; + target: Agent; +} + +export interface BridgeMessage extends BridgeBaseEvent { + kind: "message"; + message: string; +} + +interface BridgeAck extends BridgeBaseEvent { + kind: "blocked" | "delivered"; + message?: string; + reason?: string; +} + +export type BridgeEvent = BridgeAck | BridgeMessage; + +export interface BridgeStatus { + claudeSessionId: string; + codexRemoteUrl: string; + codexThreadId: string; + pending: { claude: number; codex: number }; + runId: string; + state: string; + status: string; + tmuxSession: string; +} + +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null; + +const asString = (value: unknown): string | undefined => + typeof value === "string" && value.trim() ? value : undefined; + +export const normalizeAgent = (value: unknown): Agent | undefined => { + if (value === "claude" || value === "codex") { + return value; + } + return undefined; +}; + +const normalizeBridgeMessage = (message: string): string => + message.trim().replace(BRIDGE_PREFIX_RE, "").replace(/\s+/g, " "); + +const orderedBridgePairKey = (source: Agent, target: Agent): string => + `${source}>${target}`; + +const bridgeSignature = ( + source: Agent, + target: Agent, + message: string +): string => { + return createHash("sha256") + .update( + `${orderedBridgePairKey(source, target)}\n${normalizeBridgeMessage(message)}`, + "utf8" + ) + .digest("hex"); +}; + +const eventSignature = (event: BridgeMessage): string => + bridgeSignature(event.source, event.target, event.message); + +export const bridgePath = (runDir: string): string => join(runDir, BRIDGE_FILE); + +const ensureParentDir = (path: string): void => { + mkdirSync(dirname(path), { recursive: true }); +}; + +export const appendBridgeEvent = (runDir: string, event: BridgeEvent): void => { + const path = bridgePath(runDir); + ensureParentDir(path); + appendFileSync(path, `${JSON.stringify(event)}\n`, "utf8"); +}; + +export const readBridgeEvents = (runDir: string): BridgeEvent[] => { + const path = bridgePath(runDir); + if (!existsSync(path)) { + return []; + } + + const events: BridgeEvent[] = []; + const messageById = new Map(); + for (const line of readFileSync(path, "utf8").split(LINE_SPLIT_RE)) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + try { + const parsed = JSON.parse(trimmed) as unknown; + if (!isRecord(parsed)) { + continue; + } + const kind = asString(parsed.kind); + const id = asString(parsed.id); + const at = asString(parsed.at); + const source = normalizeAgent(parsed.source); + const target = normalizeAgent(parsed.target); + const signature = asString(parsed.signature); + if (!(kind && id && at && source && target)) { + continue; + } + if (kind === "message") { + const message = asString(parsed.message); + if (!message) { + continue; + } + messageById.set(id, message); + events.push({ + at, + id, + kind, + message, + signature: bridgeSignature(source, target, message), + source, + target, + }); + continue; + } + if (kind === "blocked" || kind === "delivered") { + events.push({ + at, + id, + kind, + message: messageById.get(id), + reason: asString(parsed.reason), + signature, + source, + target, + }); + } + } catch { + // ignore malformed bridge lines + } + } + return events; +}; + +export const readPendingBridgeMessages = (runDir: string): BridgeMessage[] => { + const messages = new Map(); + + for (const event of readBridgeEvents(runDir)) { + if (event.kind === "message") { + messages.set(event.id, event); + continue; + } + const pending = messages.get(event.id); + if (!pending) { + continue; + } + messages.delete(event.id); + } + + return [...messages.values()].sort( + (a, b) => a.at.localeCompare(b.at) || a.id.localeCompare(b.id) + ); +}; + +export const markBridgeMessage = ( + runDir: string, + message: BridgeMessage, + kind: "blocked" | "delivered", + reason?: string +): void => { + appendBridgeEvent(runDir, { + at: new Date().toISOString(), + id: message.id, + kind, + reason, + signature: eventSignature(message), + source: message.source, + target: message.target, + }); +}; + +export const blocksBridgeBounce = ( + runDir: string, + source: Agent, + target: Agent, + message: string +): boolean => { + const normalized = normalizeBridgeMessage(message); + const events = readBridgeEvents(runDir); + for (let index = events.length - 1; index >= 0; index -= 1) { + const event = events[index]; + if (event.kind !== "delivered") { + continue; + } + if (!event.message) { + return false; + } + return ( + normalizeBridgeMessage(event.message) === normalized && + event.source === target && + event.target === source + ); + } + return false; +}; + +const countPendingMessages = (runDir: string): BridgeStatus["pending"] => { + const pending = { claude: 0, codex: 0 }; + for (const message of readPendingBridgeMessages(runDir).slice( + 0, + MAX_STATUS_MESSAGES + )) { + pending[message.target] += 1; + } + return pending; +}; + +export const readBridgeStatus = (runDir: string): BridgeStatus => { + const manifest = readRunManifest(join(runDir, "manifest.json")); + return { + claudeSessionId: manifest?.claudeSessionId ?? "", + codexRemoteUrl: manifest?.codexRemoteUrl ?? "", + codexThreadId: manifest?.codexThreadId ?? "", + pending: countPendingMessages(runDir), + runId: manifest?.runId ?? "", + state: manifest?.state ?? "unknown", + status: manifest?.status ?? "unknown", + tmuxSession: manifest?.tmuxSession ?? "", + }; +}; + +export const readBridgeInbox = ( + runDir: string, + target: Agent +): BridgeMessage[] => + readPendingBridgeMessages(runDir) + .filter((message) => message.target === target) + .slice(0, MAX_STATUS_MESSAGES); + +export const formatBridgeInbox = (messages: BridgeMessage[]): string => + JSON.stringify( + messages.map((message) => ({ + at: message.at, + from: message.source, + id: message.id, + message: message.message, + })), + null, + 2 + ); + +export const appendBridgeMessage = ( + runDir: string, + source: Agent, + target: Agent, + message: string +): BridgeMessage => { + const entry: BridgeMessage = { + at: new Date().toISOString(), + id: crypto.randomUUID(), + kind: "message", + message, + signature: bridgeSignature(source, target, message), + source, + target, + }; + appendBridgeEvent(runDir, entry); + appendRunTranscriptEntry(buildTranscriptPath(runDir), { + at: entry.at, + from: source, + message, + to: target, + }); + return entry; +}; + +export const appendBlockedBridgeMessage = ( + runDir: string, + source: Agent, + target: Agent, + message: string, + reason: string +): void => { + appendBridgeEvent(runDir, { + at: new Date().toISOString(), + id: crypto.randomUUID(), + kind: "blocked", + reason, + signature: bridgeSignature(source, target, message), + source, + target, + }); +}; diff --git a/src/loop/bridge.ts b/src/loop/bridge.ts index 93caac7..87a5b6f 100644 --- a/src/loop/bridge.ts +++ b/src/loop/bridge.ts @@ -1,89 +1,45 @@ -import { createHash } from "node:crypto"; import { - appendFileSync, - existsSync, - mkdirSync, - readFileSync, - writeFileSync, -} from "node:fs"; -import { dirname, join } from "node:path"; -import { spawnSync } from "bun"; -import { injectCodexMessage } from "./codex-app-server"; -import { LOOP_VERSION } from "./constants"; -import { sanitizeBase } from "./git"; -import { buildLaunchArgv } from "./launch"; + BRIDGE_SERVER as BRIDGE_SERVER_VALUE, + CLAUDE_CHANNEL_USER, +} from "./bridge-constants"; import { - appendRunTranscriptEntry, - buildTranscriptPath, - isActiveRunState, - parseRunLifecycleState, - readRunManifest, - touchRunManifest, - updateRunManifest, -} from "./run-state"; + bridgeRuntimeCommandDeps, + claudeChannelServerName, + clearStaleTmuxBridgeState, + consumeBridgeInbox, + deliverCodexBridgeMessage, + dispatchBridgeMessage, + drainCodexTmuxMessages, + flushClaudeChannelMessages, + formatDispatchResult, +} from "./bridge-runtime"; +import { + appendBlockedBridgeMessage, + appendBridgeEvent, + blocksBridgeBounce, + bridgePath, + formatBridgeInbox, + normalizeAgent, + readBridgeEvents, + readBridgeStatus, +} from "./bridge-store"; +import { LOOP_VERSION } from "./constants"; import type { Agent } from "./types"; -const BRIDGE_FILE = "bridge.jsonl"; const CHANNEL_POLL_DELAY_MS = 500; const CLAUDE_CHANNEL_CAPABILITY = "claude/channel"; -const CLAUDE_CHANNEL_METHOD = "notifications/claude/channel"; -const CLAUDE_CHANNEL_SOURCE_TYPE = "codex"; -const CLAUDE_CHANNEL_USER = "Codex"; -const CLAUDE_CHANNEL_USER_ID = "codex"; const CONTENT_LENGTH_RE = /Content-Length:\s*(\d+)/i; const CONTENT_LENGTH_PREFIX = "content-length:"; const DEFAULT_PROTOCOL_VERSION = "2024-11-05"; const HEADER_SEPARATOR = "\r\n\r\n"; -const LINE_SPLIT_RE = /\r?\n/; -const CODEX_TMUX_PANE = "0.1"; -const CODEX_TMUX_READY_DELAY_MS = 250; -const CODEX_TMUX_READY_POLLS = 20; -const CODEX_TMUX_SEND_FOOTER = "Ctrl+J newline"; -const MAX_STATUS_MESSAGES = 100; const MCP_INVALID_PARAMS = -32_602; const MCP_METHOD_NOT_FOUND = -32_601; -export const BRIDGE_SUBCOMMAND = "__bridge-mcp"; -export const BRIDGE_WORKER_SUBCOMMAND = "__bridge-worker"; -export const BRIDGE_SERVER = "loop-bridge"; - -interface BridgeBaseEvent { - at: string; - id: string; - signature?: string; - source: Agent; - target: Agent; -} - -export interface BridgeMessage extends BridgeBaseEvent { - kind: "message"; - message: string; -} - -interface BridgeAck extends BridgeBaseEvent { - kind: "blocked" | "delivered"; - message?: string; - reason?: string; -} - -type BridgeEvent = BridgeAck | BridgeMessage; - interface BridgeCallParams { arguments?: Record; name?: string; } -interface BridgeStatus { - claudeSessionId: string; - codexRemoteUrl: string; - codexThreadId: string; - pending: { claude: number; codex: number }; - runId: string; - state: string; - status: string; - tmuxSession: string; -} - interface JsonRpcRequest { id?: number | string; method?: string; @@ -104,385 +60,15 @@ const normalizeLowerString = (value: unknown): string | undefined => { return normalized || undefined; }; -const normalizeAgent = (value: unknown): Agent | undefined => { - if (value === "claude" || value === "codex") { - return value; - } - return undefined; -}; - -const BRIDGE_PREFIX_RE = - /^Message from (Claude|Codex) via the loop bridge:\s*/i; - -const normalizeBridgeMessage = (message: string): string => - message.trim().replace(BRIDGE_PREFIX_RE, "").replace(/\s+/g, " "); - -const orderedBridgePairKey = (source: Agent, target: Agent): string => - `${source}>${target}`; - -const bridgeSignature = ( - source: Agent, - target: Agent, - message: string -): string => { - return createHash("sha256") - .update( - `${orderedBridgePairKey(source, target)}\n${normalizeBridgeMessage(message)}`, - "utf8" - ) - .digest("hex"); -}; - -const eventSignature = (event: BridgeMessage): string => - bridgeSignature(event.source, event.target, event.message); - -const bridgePath = (runDir: string): string => join(runDir, BRIDGE_FILE); -const manifestPath = (runDir: string): string => join(runDir, "manifest.json"); -const bridgeCommandDeps = { spawnSync }; - -const ensureParentDir = (path: string): void => { - mkdirSync(dirname(path), { recursive: true }); -}; - -const appendBridgeEvent = (runDir: string, event: BridgeEvent): void => { - const path = bridgePath(runDir); - ensureParentDir(path); - appendFileSync(path, `${JSON.stringify(event)}\n`, "utf8"); -}; - -const readBridgeEvents = (runDir: string): BridgeEvent[] => { - const path = bridgePath(runDir); - if (!existsSync(path)) { - return []; - } - - const events: BridgeEvent[] = []; - const messageById = new Map(); - for (const line of readFileSync(path, "utf8").split(LINE_SPLIT_RE)) { - const trimmed = line.trim(); - if (!trimmed) { - continue; - } - try { - const parsed = JSON.parse(trimmed) as unknown; - if (!isRecord(parsed)) { - continue; - } - const kind = asString(parsed.kind); - const id = asString(parsed.id); - const at = asString(parsed.at); - const source = normalizeAgent(parsed.source); - const target = normalizeAgent(parsed.target); - const signature = asString(parsed.signature); - if (!(kind && id && at && source && target)) { - continue; - } - if (kind === "message") { - const message = asString(parsed.message); - if (!message) { - continue; - } - messageById.set(id, message); - events.push({ - at, - id, - kind, - message, - signature: bridgeSignature(source, target, message), - source, - target, - }); - continue; - } - if (kind === "blocked" || kind === "delivered") { - events.push({ - at, - id, - kind, - message: messageById.get(id), - reason: asString(parsed.reason), - signature, - source, - target, - }); - } - } catch { - // ignore malformed bridge lines - } - } - return events; -}; - -export const readPendingBridgeMessages = (runDir: string): BridgeMessage[] => { - const messages = new Map(); - - for (const event of readBridgeEvents(runDir)) { - if (event.kind === "message") { - messages.set(event.id, event); - continue; - } - const pending = messages.get(event.id); - if (!pending) { - continue; - } - messages.delete(event.id); - } - - return [...messages.values()].sort( - (a, b) => a.at.localeCompare(b.at) || a.id.localeCompare(b.id) - ); -}; - -export const markBridgeMessage = ( - runDir: string, - message: BridgeMessage, - kind: "blocked" | "delivered", - reason?: string -): void => { - appendBridgeEvent(runDir, { - at: new Date().toISOString(), - id: message.id, - kind, - reason, - signature: eventSignature(message), - source: message.source, - target: message.target, - }); -}; - -const blocksBridgeBounce = ( - runDir: string, - source: Agent, - target: Agent, - message: string -): boolean => { - const normalized = normalizeBridgeMessage(message); - const events = readBridgeEvents(runDir); - for (let index = events.length - 1; index >= 0; index -= 1) { - const event = events[index]; - if (event.kind !== "delivered") { - continue; - } - if (!event.message) { - return false; - } - return ( - normalizeBridgeMessage(event.message) === normalized && - event.source === target && - event.target === source - ); - } - return false; -}; - -const countPendingMessages = (runDir: string): BridgeStatus["pending"] => { - const pending = { claude: 0, codex: 0 }; - for (const message of readPendingBridgeMessages(runDir).slice( - 0, - MAX_STATUS_MESSAGES - )) { - pending[message.target] += 1; - } - return pending; -}; - -const readBridgeStatus = (runDir: string): BridgeStatus => { - const manifest = readRunManifest(join(runDir, "manifest.json")); - return { - claudeSessionId: manifest?.claudeSessionId ?? "", - codexRemoteUrl: manifest?.codexRemoteUrl ?? "", - codexThreadId: manifest?.codexThreadId ?? "", - pending: countPendingMessages(runDir), - runId: manifest?.runId ?? "", - state: manifest?.state ?? "unknown", - status: manifest?.status ?? "unknown", - tmuxSession: manifest?.tmuxSession ?? "", - }; -}; - -const wait = async (ms: number): Promise => { - await new Promise((resolve) => { - setTimeout(resolve, ms); - }); -}; - -const decodeOutput = (value: Uint8Array): string => - new TextDecoder().decode(value); - -const codexPane = (session: string): string => `${session}:${CODEX_TMUX_PANE}`; - -const capturePane = (pane: string): string => { - const result = spawnSync(["tmux", "capture-pane", "-p", "-t", pane], { - stderr: "ignore", - stdout: "pipe", - }); - if (result.exitCode !== 0) { - return ""; - } - return decodeOutput(result.stdout); -}; - -const sendPaneKeys = (pane: string, keys: string[]): void => { - spawnSync(["tmux", "send-keys", "-t", pane, ...keys], { stderr: "ignore" }); -}; - -const sendPaneText = (pane: string, text: string): void => { - spawnSync(["tmux", "send-keys", "-t", pane, "-l", "--", text], { - stderr: "ignore", - }); -}; - -const waitForCodexPane = async (session: string): Promise => { - const pane = codexPane(session); - for (let attempt = 0; attempt < CODEX_TMUX_READY_POLLS; attempt += 1) { - if (capturePane(pane).includes(CODEX_TMUX_SEND_FOOTER)) { - return true; - } - await wait(CODEX_TMUX_READY_DELAY_MS); - } - return false; -}; - -const injectCodexTmuxMessage = async ( - session: string, - message: string -): Promise => { - if (!(session && (await waitForCodexPane(session)))) { - return false; - } - const pane = codexPane(session); - const lines = message.split("\n"); - for (let index = 0; index < lines.length; index += 1) { - sendPaneText(pane, lines[index] ?? ""); - if (index < lines.length - 1) { - sendPaneKeys(pane, ["C-j"]); - } - } - await wait(100); - sendPaneKeys(pane, ["Enter"]); - return true; -}; - -const tmuxSessionExists = (session: string): boolean => { - const result = bridgeCommandDeps.spawnSync( - ["tmux", "has-session", "-t", session], - { - stderr: "ignore", - stdout: "ignore", - } - ); - return result.exitCode === 0; -}; - -const claudeChannelServerName = (runId: string): string => - `${BRIDGE_SERVER}-${sanitizeBase(runId)}`; - -const logClaudeChannelServerRemovalFailure = ( - serverName: string, - detail: string -): void => { - console.error( - `[loop] failed to remove Claude channel server "${serverName}": ${detail}` - ); -}; - -const removeClaudeChannelServer = (runId: string): void => { - if (!runId) { - return; - } - const serverName = claudeChannelServerName(runId); - try { - const result = bridgeCommandDeps.spawnSync( - ["claude", "mcp", "remove", "--scope", "local", serverName], - { - stderr: "pipe", - stdout: "ignore", - } - ); - if (result.exitCode === 0) { - return; - } - const stderr = result.stderr ? decodeOutput(result.stderr).trim() : ""; - logClaudeChannelServerRemovalFailure( - serverName, - stderr || `exit code ${result.exitCode ?? "unknown"}` - ); - } catch (error: unknown) { - // Cleanup should not fail the bridge flow. - logClaudeChannelServerRemovalFailure( - serverName, - error instanceof Error ? error.message : String(error) - ); - } -}; - -export const clearStaleTmuxBridgeState = (runDir: string): boolean => { - let removedRunId = ""; - const next = updateRunManifest(manifestPath(runDir), (manifest) => { - if (!manifest?.tmuxSession) { - return manifest; - } - removedRunId = manifest.runId; - return touchRunManifest( - { - ...manifest, - tmuxSession: undefined, - }, - new Date().toISOString() - ); - }); - if (!(next && removedRunId)) { - return false; - } - removeClaudeChannelServer(removedRunId); - return true; -}; - const claudeChannelInstructions = (): string => [ - `Messages from the Codex agent arrive as .`, + `Messages from the Codex agent arrive as .`, 'When you are replying to an inbound channel message, use the "reply" tool and pass back the same chat_id.', "Never answer the human when the inbound message came from Codex. Send the response back through the bridge tools instead.", 'Use the "send_to_agent" tool with target: "codex" for proactive messages that are not direct replies to a channel message.', 'Use "bridge_status" only when direct delivery appears stuck.', ].join("\n"); -const claudeChannelSessionId = (runDir: string): string => { - const runId = readBridgeStatus(runDir).runId || "bridge"; - return `codex_${runId}`; -}; - -const writeChannelNotification = ( - runDir: string, - message: BridgeMessage -): void => { - writeJsonRpc({ - jsonrpc: "2.0", - method: CLAUDE_CHANNEL_METHOD, - params: { - content: message.message, - meta: { - chat_id: claudeChannelSessionId(runDir), - message_id: message.id, - source_type: CLAUDE_CHANNEL_SOURCE_TYPE, - ts: new Date(message.at).toISOString(), - user: CLAUDE_CHANNEL_USER, - user_id: CLAUDE_CHANNEL_USER_ID, - }, - }, - }); -}; - -const flushClaudeChannelMessages = (runDir: string): void => { - for (const message of readPendingBridgeMessages(runDir)) { - if (message.target !== "claude") { - continue; - } - writeChannelNotification(runDir, message); - markBridgeMessage(runDir, message, "delivered"); - } -}; - // This bridge is launched under the agent CLIs' stdio MCP hooks, but those // runtimes expect newline-delimited JSON here so async channel notifications can // be pushed without Content-Length framing. @@ -508,23 +94,6 @@ const toolContent = ( content: [{ text, type: "text" }], }); -const inboxMessages = (runDir: string, target: Agent): BridgeMessage[] => - readPendingBridgeMessages(runDir) - .filter((message) => message.target === target) - .slice(0, MAX_STATUS_MESSAGES); - -const formatInbox = (messages: BridgeMessage[]): string => - JSON.stringify( - messages.map((message) => ({ - at: message.at, - from: message.source, - id: message.id, - message: message.message, - })), - null, - 2 - ); - const emptyResult = (id: JsonRpcRequest["id"], key: string): void => { writeJsonRpc({ id, @@ -533,123 +102,6 @@ const emptyResult = (id: JsonRpcRequest["id"], key: string): void => { }); }; -const appendBridgeMessage = ( - runDir: string, - source: Agent, - target: Agent, - message: string -): BridgeMessage => { - const signature = bridgeSignature(source, target, message); - const entry: BridgeMessage = { - at: new Date().toISOString(), - id: crypto.randomUUID(), - kind: "message", - message, - signature, - source, - target, - }; - appendBridgeEvent(runDir, entry); - appendRunTranscriptEntry(buildTranscriptPath(runDir), { - at: entry.at, - from: source, - message, - to: target, - }); - return entry; -}; - -const deliverCodexBridgeMessage = async ( - runDir: string, - message: BridgeMessage -): Promise => { - const status = readBridgeStatus(runDir); - // A stale tmux session entry should not block direct app-server delivery on a - // later non-tmux resume. - if (status.tmuxSession) { - if (tmuxSessionExists(status.tmuxSession)) { - return false; - } - clearStaleTmuxBridgeState(runDir); - } - if (!(status.codexRemoteUrl && status.codexThreadId)) { - return false; - } - try { - const delivered = await injectCodexMessage( - status.codexRemoteUrl, - status.codexThreadId, - message.message - ); - if (delivered) { - markBridgeMessage( - runDir, - message, - "delivered", - "sent to codex app-server" - ); - } - return delivered; - } catch { - return false; - } -}; - -const drainCodexTmuxMessages = async (runDir: string): Promise => { - const { tmuxSession } = readBridgeStatus(runDir); - if (!tmuxSession) { - return false; - } - if (!tmuxSessionExists(tmuxSession)) { - clearStaleTmuxBridgeState(runDir); - return false; - } - const message = readPendingBridgeMessages(runDir).find( - (entry) => entry.target === "codex" - ); - if (!message) { - return false; - } - const delivered = await injectCodexTmuxMessage(tmuxSession, message.message); - if (!delivered) { - return false; - } - markBridgeMessage(runDir, message, "delivered", "sent to codex tmux pane"); - return true; -}; - -const queueBridgeMessage = async ( - runDir: string, - source: Agent, - target: Agent, - message: string -): Promise<{ delivered: boolean; entry: BridgeMessage }> => { - const entry = appendBridgeMessage(runDir, source, target, message); - const delivered = - target === "codex" ? await deliverCodexBridgeMessage(runDir, entry) : false; - return { delivered, entry }; -}; - -const formatDispatchResult = ( - runDir: string, - target: Agent, - delivered: boolean, - entry: BridgeMessage -): string => { - if (delivered) { - return `delivered ${entry.id} to ${target}`; - } - const status = readBridgeStatus(runDir); - if ( - target === "codex" && - status.tmuxSession && - tmuxSessionExists(status.tmuxSession) - ) { - return `accepted ${entry.id} for codex delivery`; - } - return `queued ${entry.id} for ${target}`; -}; - const handleBridgeStatusTool = ( id: JsonRpcRequest["id"], runDir: string @@ -666,19 +118,17 @@ const handleReceiveMessagesTool = ( runDir: string, source: Agent ): void => { - const messages = inboxMessages(runDir, source); - for (const message of messages) { - markBridgeMessage( - runDir, - message, - "delivered", - "read via receive_messages" - ); - } + const messages = consumeBridgeInbox( + runDir, + source, + "read via receive_messages" + ); writeJsonRpc({ id, jsonrpc: "2.0", - result: toolContent(messages.length === 0 ? "[]" : formatInbox(messages)), + result: toolContent( + messages.length === 0 ? "[]" : formatBridgeInbox(messages) + ), }); }; @@ -698,7 +148,7 @@ const handleReplyTool = async ( writeError(id, MCP_INVALID_PARAMS, "reply requires a non-empty text"); return; } - const { delivered, entry } = await queueBridgeMessage( + const { delivered, entry } = await dispatchBridgeMessage( runDir, source, "codex", @@ -756,15 +206,13 @@ const handleSendToAgentTool = async ( } if (blocksBridgeBounce(runDir, source, target, message)) { - appendBridgeEvent(runDir, { - at: new Date().toISOString(), - id: crypto.randomUUID(), - kind: "blocked", - reason: "duplicate bridge message", - signature: bridgeSignature(source, target, message), + appendBlockedBridgeMessage( + runDir, source, target, - }); + message, + "duplicate bridge message" + ); writeJsonRpc({ id, jsonrpc: "2.0", @@ -773,7 +221,7 @@ const handleSendToAgentTool = async ( return; } - const { delivered, entry } = await queueBridgeMessage( + const { delivered, entry } = await dispatchBridgeMessage( runDir, source, target, @@ -846,7 +294,7 @@ const handleBridgeRequest = async ( : {}), protocolVersion: requestedProtocolVersion(request), serverInfo: { - name: BRIDGE_SERVER, + name: BRIDGE_SERVER_VALUE, version: LOOP_VERSION, }, }, @@ -1075,8 +523,8 @@ export const runBridgeMcpServer = async ( if (!(source === "claude" && channelReady)) { return Promise.resolve(); } - const next = () => { - flushClaudeChannelMessages(runDir); + const next = (): void => { + flushClaudeChannelMessages(runDir, writeJsonRpc); }; flushQueue = flushQueue.then(next, next); return flushQueue; @@ -1116,74 +564,12 @@ export const runBridgeMcpServer = async ( await poller; }; -export const runBridgeWorker = async (runDir: string): Promise => { - while (true) { - const status = readBridgeStatus(runDir); - const state = parseRunLifecycleState(status.state); - if (!(state && isActiveRunState(state))) { - return; - } - if (!status.tmuxSession) { - return; - } - if (!tmuxSessionExists(status.tmuxSession)) { - clearStaleTmuxBridgeState(runDir); - return; - } - const delivered = await drainCodexTmuxMessages(runDir); - await wait(delivered ? 100 : CODEX_TMUX_READY_DELAY_MS); - } -}; - -const stringifyToml = (value: string): string => JSON.stringify(value); - -export const buildCodexBridgeConfigArgs = ( - runDir: string, - source: Agent -): string[] => { - const [command, ...baseArgs] = buildLaunchArgv(); - const args = [...baseArgs, BRIDGE_SUBCOMMAND, runDir, source]; - return [ - "-c", - `mcp_servers.${BRIDGE_SERVER}.command=${stringifyToml(command)}`, - "-c", - `mcp_servers.${BRIDGE_SERVER}.args=${JSON.stringify(args)}`, - ]; -}; - -export const ensureClaudeBridgeConfig = ( - runDir: string, - source: Agent -): string => { - const [command, ...baseArgs] = buildLaunchArgv(); - const path = join(runDir, `${source}-mcp.json`); - ensureParentDir(path); - writeFileSync( - path, - `${JSON.stringify( - { - mcpServers: { - [BRIDGE_SERVER]: { - args: [...baseArgs, BRIDGE_SUBCOMMAND, runDir, source], - command, - type: "stdio", - }, - }, - }, - null, - 2 - )}\n`, - "utf8" - ); - return path; -}; - export const bridgeInternals = { appendBridgeEvent, bridgePath, clearStaleTmuxBridgeState, claudeChannelServerName, - commandDeps: bridgeCommandDeps, + commandDeps: bridgeRuntimeCommandDeps, drainCodexTmuxMessages, deliverCodexBridgeMessage, readBridgeEvents, diff --git a/src/loop/codex-tmux-proxy.ts b/src/loop/codex-tmux-proxy.ts index 35ef7c3..3c81191 100644 --- a/src/loop/codex-tmux-proxy.ts +++ b/src/loop/codex-tmux-proxy.ts @@ -2,11 +2,11 @@ import { join } from "node:path"; import type { ServerWebSocket } from "bun"; import { serve, spawnSync } from "bun"; import { - type BridgeMessage, + acknowledgeBridgeDelivery, clearStaleTmuxBridgeState, - markBridgeMessage, - readPendingBridgeMessages, -} from "./bridge"; + readNextPendingBridgeMessageForTarget, +} from "./bridge-runtime"; +import type { BridgeMessage } from "./bridge-store"; import { findFreePort } from "./ports"; import { isActiveRunState, readRunManifest } from "./run-state"; import { connectWs, type WsClient } from "./ws-client"; @@ -395,12 +395,7 @@ class CodexTmuxProxy { this.turnInProgress = false; return; } - markBridgeMessage( - this.runDir, - message, - "delivered", - "sent to codex tmux proxy" - ); + acknowledgeBridgeDelivery(this.runDir, message, "sent to codex tmux proxy"); } private handleNotification(frame: JsonFrame): void { @@ -466,9 +461,7 @@ class CodexTmuxProxy { if (this.turnInProgress || this.bridgeRequests.size > 0) { return; } - const message = readPendingBridgeMessages(this.runDir).find( - (entry) => entry.target === "codex" - ); + const message = readNextPendingBridgeMessageForTarget(this.runDir, "codex"); if (!message) { return; } diff --git a/src/loop/paired-loop.ts b/src/loop/paired-loop.ts index 9474174..20dea76 100644 --- a/src/loop/paired-loop.ts +++ b/src/loop/paired-loop.ts @@ -1,5 +1,8 @@ import { createInterface } from "node:readline/promises"; -import { markBridgeMessage, readPendingBridgeMessages } from "./bridge"; +import { + acknowledgeBridgeDelivery, + readNextPendingBridgeMessage, +} from "./bridge-runtime"; import { getLastClaudeSessionId } from "./claude-sdk-server"; import { getLastCodexThreadId } from "./codex-app-server"; import { @@ -263,7 +266,7 @@ const drainBridge = async ( let deliveredToPrimary = 0; for (let hop = 0; hop < MAX_BRIDGE_HOPS; hop += 1) { - const message = readPendingBridgeMessages(state.storage.runDir)[0]; + const message = readNextPendingBridgeMessage(state.storage.runDir); if (!message) { return { deliveredToPrimary }; } @@ -284,7 +287,7 @@ const drainBridge = async ( return { deliveredToPrimary }; } - markBridgeMessage(state.storage.runDir, message, "delivered"); + acknowledgeBridgeDelivery(state.storage.runDir, message); if (message.target === state.options.agent) { deliveredToPrimary += 1; } diff --git a/src/loop/paired-options.ts b/src/loop/paired-options.ts index 29e9ec0..15d5579 100644 --- a/src/loop/paired-options.ts +++ b/src/loop/paired-options.ts @@ -1,4 +1,7 @@ -import { buildCodexBridgeConfigArgs, ensureClaudeBridgeConfig } from "./bridge"; +import { + buildCodexBridgeConfigArgs, + ensureClaudeBridgeConfig, +} from "./bridge-config"; import { createRunManifest, ensureRunStorage, diff --git a/src/loop/tmux.ts b/src/loop/tmux.ts index 8ac0722..98b32de 100644 --- a/src/loop/tmux.ts +++ b/src/loop/tmux.ts @@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto"; import { existsSync } from "node:fs"; import { basename, dirname, join } from "node:path"; import { spawn, spawnSync } from "bun"; -import { BRIDGE_SERVER, BRIDGE_SUBCOMMAND } from "./bridge"; +import { BRIDGE_SERVER, BRIDGE_SUBCOMMAND } from "./bridge-constants"; import { getCodexAppServerUrl, getLastCodexThreadId } from "./codex-app-server"; import { CODEX_TMUX_PROXY_SUBCOMMAND, diff --git a/tests/loop/00-paired-loop.integration.test.ts b/tests/loop/00-paired-loop.integration.test.ts index 12492d8..c9c8a5b 100644 --- a/tests/loop/00-paired-loop.integration.test.ts +++ b/tests/loop/00-paired-loop.integration.test.ts @@ -2,10 +2,8 @@ import { afterEach, expect, mock, test } from "bun:test"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join, resolve } from "node:path"; -import { - bridgeInternals, - readPendingBridgeMessages, -} from "../../src/loop/bridge"; +import { bridgeInternals } from "../../src/loop/bridge"; +import { readPendingBridgeMessages } from "../../src/loop/bridge-store"; import { readRunManifest, resolveRunStorage } from "../../src/loop/run-state"; import type { Agent, Options, RunResult } from "../../src/loop/types"; diff --git a/tests/loop/bridge.test.ts b/tests/loop/bridge.test.ts index bd28bd3..45575c4 100644 --- a/tests/loop/bridge.test.ts +++ b/tests/loop/bridge.test.ts @@ -1,6 +1,12 @@ import { afterEach, expect, mock, test } from "bun:test"; import { spawn } from "node:child_process"; -import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { + mkdirSync, + mkdtempSync, + readFileSync, + rmSync, + writeFileSync, +} from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { readRunManifest } from "../../src/loop/run-state"; @@ -19,7 +25,20 @@ const loadBridge = ( injectCodexMessage: overrides.injectCodexMessage, })); } - return import(`../../src/loop/bridge?test=${Date.now()}`); + const nonce = Date.now(); + return Promise.all([ + import(`../../src/loop/bridge?test=${nonce}`), + import(`../../src/loop/bridge-config?test=${nonce}`), + import(`../../src/loop/bridge-constants?test=${nonce}`), + import(`../../src/loop/bridge-runtime?test=${nonce}`), + import(`../../src/loop/bridge-store?test=${nonce}`), + ]).then(([bridge, config, constants, runtime, store]) => ({ + ...bridge, + ...config, + ...constants, + ...runtime, + ...store, + })); }; const makeTempDir = (): string => mkdtempSync(join(tmpdir(), "loop-bridge-")); @@ -552,10 +571,7 @@ test("bridge delivers Claude replies directly to Codex when app-server state is }; bridge.bridgeInternals.appendBridgeEvent(runDir, message); - const delivered = await bridge.bridgeInternals.deliverCodexBridgeMessage( - runDir, - message - ); + const delivered = await bridge.deliverCodexBridgeMessage(runDir, message); expect(delivered).toBe(true); expect(injectCodexMessage).toHaveBeenCalledWith( @@ -585,7 +601,7 @@ test("bridge falls back to direct Codex delivery when the stored tmux session is return { exitCode: 0 }; }); const bridge = await loadBridge({ injectCodexMessage }); - bridge.bridgeInternals.commandDeps.spawnSync = spawnSync; + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; const root = makeTempDir(); const runDir = join(root, "run"); mkdirSync(runDir, { recursive: true }); @@ -616,10 +632,7 @@ test("bridge falls back to direct Codex delivery when the stored tmux session is }; bridge.bridgeInternals.appendBridgeEvent(runDir, message); - const delivered = await bridge.bridgeInternals.deliverCodexBridgeMessage( - runDir, - message - ); + const delivered = await bridge.deliverCodexBridgeMessage(runDir, message); expect(delivered).toBe(true); expect(injectCodexMessage).toHaveBeenCalledWith( @@ -640,7 +653,7 @@ test("bridge falls back to direct Codex delivery when the stored tmux session is "remove", "--scope", "local", - bridge.bridgeInternals.claudeChannelServerName("8"), + bridge.claudeChannelServerName("8"), ]); expect(removeCall?.[1]).toMatchObject({ stderr: "pipe", @@ -651,10 +664,92 @@ test("bridge falls back to direct Codex delivery when the stored tmux session is rmSync(root, { recursive: true, force: true }); }); +test("bridge drains pending codex tmux messages through the injected command deps", async () => { + const spawnSync = mock((args: string[]) => { + if (args[0] === "tmux" && args[1] === "has-session") { + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + } + if (args[0] === "tmux" && args[1] === "capture-pane") { + return { + exitCode: 0, + stderr: Buffer.alloc(0), + stdout: Buffer.from("Ctrl+J newline", "utf8"), + }; + } + if (args[0] === "tmux" && args[1] === "send-keys") { + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + } + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + }); + const bridge = await loadBridge(); + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "8", + status: "running", + tmuxSession: "repo-loop-8", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + bridge.bridgeInternals.appendBridgeEvent(runDir, { + at: "2026-03-23T10:01:00.000Z", + id: "msg-3", + kind: "message", + message: "Please check the tmux path.", + source: "claude", + target: "codex", + }); + + const delivered = await bridge.drainCodexTmuxMessages(runDir); + + expect(delivered).toBe(true); + expect(bridge.readPendingBridgeMessages(runDir)).toEqual([]); + expect(spawnSync.mock.calls).toEqual( + expect.arrayContaining([ + [ + ["tmux", "has-session", "-t", "repo-loop-8"], + expect.objectContaining({ stderr: "ignore", stdout: "ignore" }), + ], + [ + ["tmux", "capture-pane", "-p", "-t", "repo-loop-8:0.1"], + expect.objectContaining({ stderr: "ignore", stdout: "pipe" }), + ], + [ + [ + "tmux", + "send-keys", + "-t", + "repo-loop-8:0.1", + "-l", + "--", + "Please check the tmux path.", + ], + expect.objectContaining({ stderr: "ignore" }), + ], + [ + ["tmux", "send-keys", "-t", "repo-loop-8:0.1", "Enter"], + expect.objectContaining({ stderr: "ignore" }), + ], + ]) + ); + + rmSync(root, { recursive: true, force: true }); +}); + test("bridge stale tmux cleanup is a no-op when the manifest has no tmux session", async () => { const spawnSync = mock(() => ({ exitCode: 0 })); const bridge = await loadBridge(); - bridge.bridgeInternals.commandDeps.spawnSync = spawnSync; + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; const root = makeTempDir(); const runDir = join(root, "run"); mkdirSync(runDir, { recursive: true }); @@ -675,7 +770,7 @@ test("bridge stale tmux cleanup is a no-op when the manifest has no tmux session "utf8" ); - expect(bridge.bridgeInternals.clearStaleTmuxBridgeState(runDir)).toBe(false); + expect(bridge.clearStaleTmuxBridgeState(runDir)).toBe(false); expect(spawnSync).not.toHaveBeenCalled(); expect(readRunManifest(join(runDir, "manifest.json"))?.tmuxSession).toBe( undefined @@ -695,7 +790,7 @@ test("bridge stale tmux cleanup logs non-zero Claude MCP remove exits", async () return { exitCode: 0, stderr: Buffer.alloc(0) }; }); const bridge = await loadBridge(); - bridge.bridgeInternals.commandDeps.spawnSync = spawnSync; + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; const errorSpy = mock(() => undefined); const originalError = console.error; console.error = errorSpy; @@ -719,7 +814,7 @@ test("bridge stale tmux cleanup logs non-zero Claude MCP remove exits", async () ); try { - expect(bridge.bridgeInternals.clearStaleTmuxBridgeState(runDir)).toBe(true); + expect(bridge.clearStaleTmuxBridgeState(runDir)).toBe(true); expect(errorSpy).toHaveBeenCalledWith( '[loop] failed to remove Claude channel server "loop-bridge-8": command failed' ); @@ -740,7 +835,7 @@ test("bridge stale tmux cleanup logs thrown Claude MCP remove errors", async () return { exitCode: 0, stderr: Buffer.alloc(0) }; }); const bridge = await loadBridge(); - bridge.bridgeInternals.commandDeps.spawnSync = spawnSync; + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; const errorSpy = mock(() => undefined); const originalError = console.error; console.error = errorSpy; @@ -764,7 +859,7 @@ test("bridge stale tmux cleanup logs thrown Claude MCP remove errors", async () ); try { - expect(bridge.bridgeInternals.clearStaleTmuxBridgeState(runDir)).toBe(true); + expect(bridge.clearStaleTmuxBridgeState(runDir)).toBe(true); expect(errorSpy).toHaveBeenCalledWith( '[loop] failed to remove Claude channel server "loop-bridge-8": spawn failed' ); @@ -777,6 +872,66 @@ test("bridge stale tmux cleanup logs thrown Claude MCP remove errors", async () } }); +test("runBridgeWorker clears stale tmux routing and exits", async () => { + const spawnSync = mock((args: string[]) => { + if (args[0] === "tmux" && args[1] === "has-session") { + return { exitCode: 1, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + } + if (args[0] === "claude" && args[1] === "mcp" && args[2] === "remove") { + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + } + return { exitCode: 0, stderr: Buffer.alloc(0), stdout: Buffer.alloc(0) }; + }); + const bridge = await loadBridge(); + bridge.bridgeRuntimeCommandDeps.spawnSync = spawnSync; + const root = makeTempDir(); + const runDir = join(root, "run"); + mkdirSync(runDir, { recursive: true }); + writeFileSync( + join(runDir, "manifest.json"), + `${JSON.stringify({ + createdAt: "2026-03-23T10:00:00.000Z", + cwd: "/repo", + mode: "paired", + pid: 1234, + repoId: "repo-123", + runId: "8", + state: "working", + status: "running", + tmuxSession: "repo-loop-8", + updatedAt: "2026-03-23T10:00:00.000Z", + })}\n`, + "utf8" + ); + + await bridge.runBridgeWorker(runDir); + + expect(readRunManifest(join(runDir, "manifest.json"))?.tmuxSession).toBe( + undefined + ); + expect(spawnSync.mock.calls).toEqual( + expect.arrayContaining([ + [ + ["tmux", "has-session", "-t", "repo-loop-8"], + expect.objectContaining({ stderr: "ignore", stdout: "ignore" }), + ], + [ + [ + "claude", + "mcp", + "remove", + "--scope", + "local", + bridge.claudeChannelServerName("8"), + ], + expect.objectContaining({ stderr: "pipe", stdout: "ignore" }), + ], + ]) + ); + + rmSync(root, { recursive: true, force: true }); +}); + test("bridge MCP delivers pending codex messages to Claude as channel notifications", async () => { const bridge = await loadBridge(); const root = makeTempDir(); @@ -1075,3 +1230,23 @@ test("bridge config helper builds the bridge MCP entry point for Codex", async ( rmSync(root, { recursive: true, force: true }); }); + +test("bridge config helper writes the Claude MCP config file", async () => { + const bridge = await loadBridge(); + const root = makeTempDir(); + const runDir = join(root, "run"); + + const path = bridge.ensureClaudeBridgeConfig(runDir, "claude"); + expect(path).toBe(join(runDir, "claude-mcp.json")); + expect(JSON.parse(readFileSync(path, "utf8"))).toEqual({ + mcpServers: { + [bridge.BRIDGE_SERVER]: { + args: ["src/loop/main.ts", bridge.BRIDGE_SUBCOMMAND, runDir, "claude"], + command: "/opt/bun", + type: "stdio", + }, + }, + }); + + rmSync(root, { recursive: true, force: true }); +}); diff --git a/tests/loop/codex-app-server.test.ts b/tests/loop/codex-app-server.test.ts index 3788d6c..e46907b 100644 --- a/tests/loop/codex-app-server.test.ts +++ b/tests/loop/codex-app-server.test.ts @@ -1,5 +1,5 @@ import { afterEach, expect, mock, test } from "bun:test"; -import { buildCodexBridgeConfigArgs } from "../../src/loop/bridge"; +import { buildCodexBridgeConfigArgs } from "../../src/loop/bridge-config"; import type { Options } from "../../src/loop/types"; interface RequestFrame { diff --git a/tests/loop/paired-loop.test.ts b/tests/loop/paired-loop.test.ts index 3fc6d64..e5f6b30 100644 --- a/tests/loop/paired-loop.test.ts +++ b/tests/loop/paired-loop.test.ts @@ -9,10 +9,8 @@ import { } from "node:fs"; import { tmpdir } from "node:os"; import { dirname, join } from "node:path"; -import { - bridgeInternals, - readPendingBridgeMessages, -} from "../../src/loop/bridge"; +import { bridgeInternals } from "../../src/loop/bridge"; +import { readPendingBridgeMessages } from "../../src/loop/bridge-store"; import { createRunManifest, readRunManifest,