Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 93 additions & 7 deletions src/loop/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import { dirname, join } from "node:path";
import { spawnSync } from "bun";
import { injectCodexMessage } from "./codex-app-server";
import { LOOP_VERSION } from "./constants";
import { sanitizeBase } from "./git";
import { buildLaunchArgv } from "./launch";
import {
appendRunTranscriptEntry,
buildTranscriptPath,
isActiveRunState,
parseRunLifecycleState,
readRunManifest,
touchRunManifest,
updateRunManifest,
} from "./run-state";
import type { Agent } from "./types";

Expand Down Expand Up @@ -126,6 +129,8 @@ const eventSignature = (event: BridgeMessage): string =>
bridgeSignature(event.source, event.target, event.message);

const bridgePath = (runDir: string): string => join(runDir, BRIDGE_FILE);
const manifestPath = (runDir: string): string => join(runDir, "manifest.json");
const bridgeCommandDeps = { spawnSync };

const ensureParentDir = (path: string): void => {
mkdirSync(dirname(path), { recursive: true });
Expand Down Expand Up @@ -351,13 +356,80 @@ const injectCodexTmuxMessage = async (
};

const tmuxSessionExists = (session: string): boolean => {
const result = spawnSync(["tmux", "has-session", "-t", session], {
stderr: "ignore",
stdout: "ignore",
});
const result = bridgeCommandDeps.spawnSync(
["tmux", "has-session", "-t", session],
{
stderr: "ignore",
stdout: "ignore",
}
);
return result.exitCode === 0;
};

const claudeChannelServerName = (runId: string): string =>
`${BRIDGE_SERVER}-${sanitizeBase(runId)}`;

const logClaudeChannelServerRemovalFailure = (
serverName: string,
detail: string
): void => {
console.error(
`[loop] failed to remove Claude channel server "${serverName}": ${detail}`
);
};

const removeClaudeChannelServer = (runId: string): void => {
if (!runId) {
return;
}
const serverName = claudeChannelServerName(runId);
try {
const result = bridgeCommandDeps.spawnSync(
["claude", "mcp", "remove", "--scope", "local", serverName],
{
stderr: "pipe",
stdout: "ignore",
}
);
if (result.exitCode === 0) {
return;
}
const stderr = result.stderr ? decodeOutput(result.stderr).trim() : "";
logClaudeChannelServerRemovalFailure(
serverName,
stderr || `exit code ${result.exitCode ?? "unknown"}`
);
} catch (error: unknown) {
// Cleanup should not fail the bridge flow.
logClaudeChannelServerRemovalFailure(
serverName,
error instanceof Error ? error.message : String(error)
);
}
};

export const clearStaleTmuxBridgeState = (runDir: string): boolean => {
let removedRunId = "";
const next = updateRunManifest(manifestPath(runDir), (manifest) => {
if (!manifest?.tmuxSession) {
return manifest;
}
removedRunId = manifest.runId;
return touchRunManifest(
{
...manifest,
tmuxSession: undefined,
},
new Date().toISOString()
);
});
if (!(next && removedRunId)) {
return false;
}
removeClaudeChannelServer(removedRunId);
return true;
};

const claudeChannelInstructions = (): string =>
[
`Messages from the Codex agent arrive as <channel source="${BRIDGE_SERVER}" chat_id="..." user="${CLAUDE_CHANNEL_USER}" ...>.`,
Expand Down Expand Up @@ -486,8 +558,11 @@ const deliverCodexBridgeMessage = async (
const status = readBridgeStatus(runDir);
// A stale tmux session entry should not block direct app-server delivery on a
// later non-tmux resume.
if (status.tmuxSession && tmuxSessionExists(status.tmuxSession)) {
return false;
if (status.tmuxSession) {
if (tmuxSessionExists(status.tmuxSession)) {
return false;
}
clearStaleTmuxBridgeState(runDir);
}
if (!(status.codexRemoteUrl && status.codexThreadId)) {
return false;
Expand Down Expand Up @@ -517,6 +592,10 @@ const drainCodexTmuxMessages = async (runDir: string): Promise<boolean> => {
if (!tmuxSession) {
return false;
}
if (!tmuxSessionExists(tmuxSession)) {
clearStaleTmuxBridgeState(runDir);
return false;
}
const message = readPendingBridgeMessages(runDir).find(
(entry) => entry.target === "codex"
);
Expand Down Expand Up @@ -1027,7 +1106,11 @@ export const runBridgeWorker = async (runDir: string): Promise<void> => {
if (!(state && isActiveRunState(state))) {
return;
}
if (!(status.tmuxSession && tmuxSessionExists(status.tmuxSession))) {
if (!status.tmuxSession) {
return;
}
if (!tmuxSessionExists(status.tmuxSession)) {
clearStaleTmuxBridgeState(runDir);
return;
}
const delivered = await drainCodexTmuxMessages(runDir);
Expand Down Expand Up @@ -1081,6 +1164,9 @@ export const ensureClaudeBridgeConfig = (
export const bridgeInternals = {
appendBridgeEvent,
bridgePath,
clearStaleTmuxBridgeState,
claudeChannelServerName,
commandDeps: bridgeCommandDeps,
drainCodexTmuxMessages,
deliverCodexBridgeMessage,
readBridgeEvents,
Expand Down
62 changes: 51 additions & 11 deletions src/loop/codex-app-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type TransportMode = "app-server" | "exec";
type Callback = (text: string) => void;
export interface AppServerLaunchOptions {
configValues?: string[];
orphanOnExit?: boolean;
persistentThread?: boolean;
resumeThreadId?: string;
threadModel?: string;
Expand Down Expand Up @@ -422,6 +423,7 @@ class AppServerClient {
private ws: import("./ws-client").WsClient | undefined;
private closed = false;
private lastThreadId = "";
private orphanOnExit = false;
private persistentThread = false;
private threadModel = "";
private started = false;
Expand Down Expand Up @@ -451,14 +453,17 @@ class AppServerClient {
if (!this.started) {
return false;
}
return !sameConfigValues(
this.configValues,
normalizeConfigValues(options.configValues)
return (
!sameConfigValues(
this.configValues,
normalizeConfigValues(options.configValues)
) || this.orphanOnExit !== (options.orphanOnExit ?? false)
);
}

configureLaunch(options: AppServerLaunchOptions = {}): void {
this.configValues = normalizeConfigValues(options.configValues);
this.orphanOnExit = options.orphanOnExit ?? false;
this.persistentThread = options.persistentThread ?? false;
if (options.resumeThreadId !== undefined) {
this.lastThreadId = options.resumeThreadId;
Expand Down Expand Up @@ -497,17 +502,26 @@ class AppServerClient {
{
detached: DETACH_CHILD_PROCESS,
env: process.env,
stderr: "pipe",
stdin: "pipe",
stdout: "pipe",
stderr: this.orphanOnExit ? "ignore" : "pipe",
stdin: this.orphanOnExit ? "ignore" : "pipe",
stdout: this.orphanOnExit ? "ignore" : "pipe",
}
);
this.child = child;
this.consumeFrames(child).finally(() => {
if (!this.closed) {
this.handleUnexpectedExit();
}
});
if (this.orphanOnExit) {
child.unref?.();
child.exited.then(() => {
if (!this.closed) {
this.handleUnexpectedExit();
}
});
} else {
this.consumeFrames(child).finally(() => {
if (!this.closed) {
this.handleUnexpectedExit();
}
});
}
const ws = await this.connectWebSocket(connectUrl);
this.ws = ws;
ws.onmessage = (data) => {
Expand Down Expand Up @@ -625,6 +639,24 @@ class AppServerClient {
this.started = false;
}

release(): void {
this.closed = true;
this.failAll(new Error("codex app-server released"));
const ws = this.ws;
this.ws = undefined;
if (ws) {
try {
ws.close();
} catch {
// ignore close errors
}
}
this.child = undefined;
this.connectUrl = "";
this.ready = false;
this.started = false;
}

private async ensureThread(
model: string,
resumeThreadId?: string
Expand Down Expand Up @@ -1221,3 +1253,11 @@ export const closeAppServer = async (): Promise<void> => {
await singleton.close();
singleton = undefined;
};

export const releaseAppServer = (): void => {
if (!singleton) {
return;
}
singleton.release();
singleton = undefined;
};
18 changes: 14 additions & 4 deletions src/loop/codex-tmux-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { ServerWebSocket } from "bun";
import { serve, spawnSync } from "bun";
import {
type BridgeMessage,
clearStaleTmuxBridgeState,
markBridgeMessage,
readPendingBridgeMessages,
} from "./bridge";
Expand Down Expand Up @@ -45,6 +46,8 @@ interface ProxyRoute {
threadId?: string;
}

type StopReason = "dead-tmux" | "inactive-run";

const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null;

Expand Down Expand Up @@ -421,30 +424,37 @@ class CodexTmuxProxy {
}
}

private shouldStop(): boolean {
private stopReason(): StopReason | undefined {
const manifest = readRunManifest(join(this.runDir, "manifest.json"));
if (!(manifest && isActiveRunState(manifest.state))) {
return true;
return "inactive-run";
}
const sessionAlive = manifest.tmuxSession
? isTmuxSessionAlive(manifest.tmuxSession)
: false;
if (sessionAlive) {
this.sawTmuxSession = true;
return undefined;
}
return shouldStopForTmuxSession(
sessionAlive,
this.sawTmuxSession,
this.startupDeadlineMs,
Date.now()
);
)
? "dead-tmux"
: undefined;
}

private drainBridgeMessages(): void {
if (this.stopped) {
return;
}
if (this.shouldStop()) {
const stopReason = this.stopReason();
if (stopReason) {
if (stopReason === "dead-tmux") {
clearStaleTmuxBridgeState(this.runDir);
}
this.stop();
return;
}
Expand Down
10 changes: 10 additions & 0 deletions src/loop/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import {
CODEX_TRANSPORT_EXEC,
CodexAppServerFallbackError,
CodexAppServerUnexpectedExitError,
closeAppServer,
hasAppServerProcess,
interruptAppServer,
releaseAppServer,
runCodexTurn,
startAppServer,
useAppServer,
Expand Down Expand Up @@ -604,3 +606,11 @@ export const startPersistentAgentSession = async (
persistent: true,
});
};

export const releasePersistentCodexSession = (): void => {
releaseAppServer();
};

export const closePersistentCodexSession = async (): Promise<void> => {
await closeAppServer();
};
Loading
Loading