From a5c6acb73b0caf54ddc5568b12b2939cfc9ec532 Mon Sep 17 00:00:00 2001 From: thevortex8 Date: Sat, 21 Mar 2026 16:06:07 -0400 Subject: [PATCH 1/5] fix: make codex send replays idempotent --- .../server/src/server/agent/agent-manager.ts | 25 +++++++++ .../providers/codex-app-server-agent.test.ts | 21 ++++++++ .../agent/providers/codex-app-server-agent.ts | 35 +++++++++---- .../send-message-id-replay.e2e.test.ts | 52 +++++++++++++++++++ packages/server/src/server/session.ts | 46 ++++++++++++++++ 5 files changed, 169 insertions(+), 10 deletions(-) create mode 100644 packages/server/src/server/daemon-e2e/send-message-id-replay.e2e.test.ts diff --git a/packages/server/src/server/agent/agent-manager.ts b/packages/server/src/server/agent/agent-manager.ts index 75af92f33..1e258ec19 100644 --- a/packages/server/src/server/agent/agent-manager.ts +++ b/packages/server/src/server/agent/agent-manager.ts @@ -931,6 +931,31 @@ export class AgentManager { } } + classifyRecordedUserMessage( + agentId: string, + text: string, + options?: { messageId?: string }, + ): "new" | "duplicate" | "conflict" { + const normalizedMessageId = normalizeMessageId(options?.messageId); + if (!normalizedMessageId) { + return "new"; + } + + const agent = this.requireAgent(agentId); + for (const row of agent.timelineRows) { + if (row.item.type !== "user_message") { + continue; + } + const rowMessageId = normalizeMessageId(row.item.messageId); + if (rowMessageId !== normalizedMessageId) { + continue; + } + return row.item.text === text ? "duplicate" : "conflict"; + } + + return "new"; + } + async appendTimelineItem(agentId: string, item: AgentTimelineItem): Promise { const agent = this.requireAgent(agentId); this.touchUpdatedAt(agent); diff --git a/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts b/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts index 7a3ae71d7..da7386c09 100644 --- a/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts +++ b/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts @@ -1,5 +1,6 @@ import { describe, expect, test } from "vitest"; import { existsSync, rmSync } from "node:fs"; +import type { AgentStreamEvent } from "../agent-sdk-types.js"; import { __codexAppServerInternals, @@ -97,4 +98,24 @@ describe("Codex app-server provider", () => { expect(item.detail.newString).toBeUndefined(); } }); + + test("does not buffer terminal Codex events before turn_started", () => { + const terminalEvents: AgentStreamEvent[] = [ + { type: "turn_completed", provider: "codex", usage: undefined }, + { type: "turn_failed", provider: "codex", error: "boom" }, + { type: "turn_canceled", provider: "codex", reason: "interrupted" }, + ]; + + for (const event of terminalEvents) { + expect(__codexAppServerInternals.shouldBufferCodexEventUntilTurnStarted(event)).toBe(false); + } + + expect( + __codexAppServerInternals.shouldBufferCodexEventUntilTurnStarted({ + type: "timeline", + provider: "codex", + item: { type: "assistant_message", text: "stale chunk" }, + }), + ).toBe(true); + }); }); diff --git a/packages/server/src/server/agent/providers/codex-app-server-agent.ts b/packages/server/src/server/agent/providers/codex-app-server-agent.ts index 71b75dc6a..4c21d8df4 100644 --- a/packages/server/src/server/agent/providers/codex-app-server-agent.ts +++ b/packages/server/src/server/agent/providers/codex-app-server-agent.ts @@ -2034,8 +2034,29 @@ export async function codexAppServerTurnInputFromPrompt( export const __codexAppServerInternals = { mapCodexPatchNotificationToToolCall, + shouldBufferCodexEventUntilTurnStarted, }; +function isTerminalAgentStreamEvent(event: AgentStreamEvent): boolean { + return ( + event.type === "turn_completed" || + event.type === "turn_failed" || + event.type === "turn_canceled" + ); +} + +function shouldBufferCodexEventUntilTurnStarted(event: AgentStreamEvent): boolean { + if ( + event.type === "permission_requested" || + event.type === "permission_resolved" || + event.type === "turn_started" || + isTerminalAgentStreamEvent(event) + ) { + return false; + } + return true; +} + class CodexAppServerAgentSession implements AgentSession { readonly provider = CODEX_PROVIDER; readonly capabilities = CODEX_APP_SERVER_CAPABILITIES; @@ -2504,25 +2525,19 @@ class CodexAppServerAgentSession implements AgentSession { let sawTurnStarted = false; for await (const event of queue) { // Drop pre-start timeline noise that can leak from the previous turn. - // Keep permission events, which can legitimately arrive before turn_started. + // Keep permission and terminal events, which can legitimately arrive + // before turn_started on resumed/replayed requests. if (!sawTurnStarted) { - if (event.type === "permission_requested" || event.type === "permission_resolved") { - yield event; + if (shouldBufferCodexEventUntilTurnStarted(event)) { continue; } if (event.type === "turn_started") { sawTurnStarted = true; - } else { - continue; } } yield event; - if ( - event.type === "turn_completed" || - event.type === "turn_failed" || - event.type === "turn_canceled" - ) { + if (isTerminalAgentStreamEvent(event)) { break; } } diff --git a/packages/server/src/server/daemon-e2e/send-message-id-replay.e2e.test.ts b/packages/server/src/server/daemon-e2e/send-message-id-replay.e2e.test.ts new file mode 100644 index 000000000..20d143f93 --- /dev/null +++ b/packages/server/src/server/daemon-e2e/send-message-id-replay.e2e.test.ts @@ -0,0 +1,52 @@ +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import path from "node:path"; + +import { createDaemonTestContext, type DaemonTestContext } from "../test-utils/index.js"; + +function tmpCwd(): string { + return mkdtempSync(path.join(tmpdir(), "send-message-id-replay-")); +} + +describe("send_agent_message_request replay", () => { + let ctx: DaemonTestContext; + + beforeEach(async () => { + ctx = await createDaemonTestContext(); + }); + + afterEach(async () => { + await ctx.cleanup(); + }, 60_000); + + test("does not start a second run when the same client messageId is replayed", async () => { + const cwd = tmpCwd(); + try { + const agent = await ctx.client.createAgent({ + provider: "codex", + cwd, + title: "duplicate message replay", + modeId: "full-access", + }); + + const messageId = "msg-replay-1"; + await ctx.client.sendMessage(agent.id, "Reply with exactly hello", { messageId }); + await ctx.client.waitForFinish(agent.id, 5_000); + + const afterFirst = await ctx.client.fetchAgent(agent.id); + expect(afterFirst?.agent.status).toBe("idle"); + const firstUpdatedAt = afterFirst?.agent.updatedAt ?? null; + expect(firstUpdatedAt).not.toBeNull(); + + await ctx.client.sendMessage(agent.id, "Reply with exactly hello", { messageId }); + await new Promise((resolve) => setTimeout(resolve, 50)); + + const afterReplay = await ctx.client.fetchAgent(agent.id); + expect(afterReplay?.agent.status).toBe("idle"); + expect(afterReplay?.agent.updatedAt ?? null).toBe(firstUpdatedAt); + } finally { + rmSync(cwd, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/server/src/server/session.ts b/packages/server/src/server/session.ts index 9347e1079..995da27c7 100644 --- a/packages/server/src/server/session.ts +++ b/packages/server/src/server/session.ts @@ -2610,6 +2610,20 @@ export class Session { return; } + const replayDisposition = this.agentManager.classifyRecordedUserMessage(agentId, text, { + messageId, + }); + if (replayDisposition === "duplicate") { + this.sessionLogger.info( + { agentId, messageId }, + "Suppressing duplicate send replay for already-recorded user message", + ); + return; + } + if (replayDisposition === "conflict") { + throw new Error(`Client messageId '${messageId}' was reused with different text`); + } + try { this.agentManager.recordUserMessage(agentId, text, { messageId, @@ -6626,6 +6640,38 @@ export class Session { await this.ensureAgentLoaded(agentId); + const replayDisposition = this.agentManager.classifyRecordedUserMessage(agentId, msg.text, { + messageId: msg.messageId, + }); + if (replayDisposition === "duplicate") { + this.sessionLogger.info( + { agentId, messageId: msg.messageId, requestId: msg.requestId }, + "Suppressing duplicate send_agent_message_request replay", + ); + this.emit({ + type: "send_agent_message_response", + payload: { + requestId: msg.requestId, + agentId, + accepted: true, + error: null, + }, + }); + return; + } + if (replayDisposition === "conflict") { + this.emit({ + type: "send_agent_message_response", + payload: { + requestId: msg.requestId, + agentId, + accepted: false, + error: `Client messageId '${msg.messageId}' was reused with different text`, + }, + }); + return; + } + try { this.agentManager.recordUserMessage(agentId, msg.text, { messageId: msg.messageId, From 1d8a138d0d2bd522ece6660806cda8ec80abd736 Mon Sep 17 00:00:00 2001 From: thevortex8 Date: Sat, 21 Mar 2026 16:49:42 -0400 Subject: [PATCH 2/5] fix: preserve queued message ids on replay --- packages/app/src/contexts/session-context.tsx | 35 +++++++++---- .../session-queued-message-replay.test.ts | 50 +++++++++++++++++++ .../contexts/session-queued-message-replay.ts | 30 +++++++++++ 3 files changed, 105 insertions(+), 10 deletions(-) create mode 100644 packages/app/src/contexts/session-queued-message-replay.test.ts create mode 100644 packages/app/src/contexts/session-queued-message-replay.ts diff --git a/packages/app/src/contexts/session-context.tsx b/packages/app/src/contexts/session-context.tsx index 86e711ce0..1708c2b61 100644 --- a/packages/app/src/contexts/session-context.tsx +++ b/packages/app/src/contexts/session-context.tsx @@ -49,6 +49,7 @@ import { derivePendingPermissionKey, normalizeAgentSnapshot } from "@/utils/agen import { resolveProjectPlacement } from "@/utils/project-placement"; import { buildDraftStoreKey } from "@/stores/draft-keys"; import type { AttachmentMetadata } from "@/attachments/types"; +import { takeQueuedAgentMessageReplay } from "@/contexts/session-queued-message-replay"; // Re-export types from session-store and draft-store for backward compatibility export type { DraftInput } from "@/stores/draft-store"; @@ -272,7 +273,12 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider const previousAgentStatusRef = useRef>(new Map()); const sendAgentMessageRef = useRef< - ((agentId: string, message: string, images?: AttachmentMetadata[]) => Promise) | null + (( + agentId: string, + message: string, + images?: AttachmentMetadata[], + messageId?: string, + ) => Promise) | null >(null); const sessionStateTimeoutRef = useRef | null>(null); const attentionNotifiedRef = useRef>(new Map()); @@ -425,15 +431,19 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider const prevStatus = previousAgentStatusRef.current.get(agent.id); if (prevStatus === "running" && agent.status !== "running") { const session = useSessionStore.getState().sessions[serverId]; - const queue = session?.queuedMessages.get(agent.id); - if (queue && queue.length > 0) { - const [next, ...rest] = queue; + const replay = takeQueuedAgentMessageReplay(session?.queuedMessages.get(agent.id)); + if (replay) { if (sendAgentMessageRef.current) { - void sendAgentMessageRef.current(agent.id, next.text, next.images); + void sendAgentMessageRef.current( + agent.id, + replay.text, + replay.images, + replay.messageId, + ); } setQueuedMessages(serverId, (prev) => { const updated = new Map(prev); - updated.set(agent.id, rest); + updated.set(agent.id, replay.remainingQueue); return updated; }); } @@ -1469,11 +1479,16 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider ]); const sendAgentMessage = useCallback( - async (agentId: string, message: string, images?: AttachmentMetadata[]) => { - const messageId = generateMessageId(); + async ( + agentId: string, + message: string, + images?: AttachmentMetadata[], + messageId?: string, + ) => { + const resolvedMessageId = messageId ?? generateMessageId(); const userMessage: StreamItem = { kind: "user_message", - id: messageId, + id: resolvedMessageId, text: message, timestamp: new Date(), }; @@ -1507,7 +1522,7 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider } void client .sendAgentMessage(agentId, message, { - messageId, + messageId: resolvedMessageId, ...(imagesData && imagesData.length > 0 ? { images: imagesData } : {}), }) .catch((error) => { diff --git a/packages/app/src/contexts/session-queued-message-replay.test.ts b/packages/app/src/contexts/session-queued-message-replay.test.ts new file mode 100644 index 000000000..67186947b --- /dev/null +++ b/packages/app/src/contexts/session-queued-message-replay.test.ts @@ -0,0 +1,50 @@ +import { describe, expect, it } from "vitest"; +import { takeQueuedAgentMessageReplay } from "./session-queued-message-replay"; + +describe("takeQueuedAgentMessageReplay", () => { + it("preserves the queued message id for idempotent replay", () => { + const replay = takeQueuedAgentMessageReplay([ + { + id: "msg-1", + text: "resume this", + images: [ + { + id: "img-1", + mimeType: "image/png", + storageType: "web-indexeddb", + storageKey: "attachment:img-1", + fileName: "test.png", + byteSize: 12, + createdAt: 1, + }, + ], + }, + { + id: "msg-2", + text: "second", + }, + ]); + + expect(replay).toEqual({ + messageId: "msg-1", + text: "resume this", + images: [ + { + id: "img-1", + mimeType: "image/png", + storageType: "web-indexeddb", + storageKey: "attachment:img-1", + fileName: "test.png", + byteSize: 12, + createdAt: 1, + }, + ], + remainingQueue: [{ id: "msg-2", text: "second" }], + }); + }); + + it("returns null when the queue is empty", () => { + expect(takeQueuedAgentMessageReplay([])).toBeNull(); + expect(takeQueuedAgentMessageReplay(undefined)).toBeNull(); + }); +}); diff --git a/packages/app/src/contexts/session-queued-message-replay.ts b/packages/app/src/contexts/session-queued-message-replay.ts new file mode 100644 index 000000000..7b361ed90 --- /dev/null +++ b/packages/app/src/contexts/session-queued-message-replay.ts @@ -0,0 +1,30 @@ +import type { AttachmentMetadata } from "@/attachments/types"; + +export interface QueuedAgentMessageReplayItem { + id: string; + text: string; + images?: AttachmentMetadata[]; +} + +export interface QueuedAgentMessageReplay { + messageId: string; + text: string; + images?: AttachmentMetadata[]; + remainingQueue: QueuedAgentMessageReplayItem[]; +} + +export function takeQueuedAgentMessageReplay( + queue: readonly QueuedAgentMessageReplayItem[] | undefined, +): QueuedAgentMessageReplay | null { + if (!queue || queue.length === 0) { + return null; + } + + const [next, ...remainingQueue] = queue; + return { + messageId: next.id, + text: next.text, + images: next.images, + remainingQueue, + }; +} From ff89c852555752dd63fd6abd1e792a5ff72aa4fc Mon Sep 17 00:00:00 2001 From: thevortex8 Date: Sat, 21 Mar 2026 18:41:10 -0400 Subject: [PATCH 3/5] fix daemon stale-progress semantics and hydrate replay --- packages/app/src/contexts/session-context.tsx | 29 ++++++++++++----- .../session-queued-message-replay.test.ts | 31 ++++++++++++++++++- .../contexts/session-queued-message-replay.ts | 10 ++++++ .../src/server/agent/agent-manager.test.ts | 28 +++++++++++++++++ .../server/src/server/agent/agent-manager.ts | 1 - 5 files changed, 89 insertions(+), 10 deletions(-) diff --git a/packages/app/src/contexts/session-context.tsx b/packages/app/src/contexts/session-context.tsx index 1708c2b61..629222559 100644 --- a/packages/app/src/contexts/session-context.tsx +++ b/packages/app/src/contexts/session-context.tsx @@ -49,7 +49,11 @@ import { derivePendingPermissionKey, normalizeAgentSnapshot } from "@/utils/agen import { resolveProjectPlacement } from "@/utils/project-placement"; import { buildDraftStoreKey } from "@/stores/draft-keys"; import type { AttachmentMetadata } from "@/attachments/types"; -import { takeQueuedAgentMessageReplay } from "@/contexts/session-queued-message-replay"; +import { + shouldAutoReplayQueuedAgentMessage, + takeQueuedAgentMessageReplay, + type QueuedAgentReplaySource, +} from "@/contexts/session-queued-message-replay"; // Re-export types from session-store and draft-store for backward compatibility export type { DraftInput } from "@/stores/draft-store"; @@ -355,7 +359,7 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider ); const applyAuthoritativeAgentSnapshot = useCallback( - (agent: Agent) => { + (agent: Agent, options?: { source?: QueuedAgentReplaySource }) => { setAgents(serverId, (prev) => { const current = prev.get(agent.id); if (current && agent.updatedAt.getTime() < current.updatedAt.getTime()) { @@ -429,7 +433,13 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider }); const prevStatus = previousAgentStatusRef.current.get(agent.id); - if (prevStatus === "running" && agent.status !== "running") { + if ( + shouldAutoReplayQueuedAgentMessage({ + previousStatus: prevStatus, + nextStatus: agent.status, + source: options?.source ?? "live", + }) + ) { const session = useSessionStore.getState().sessions[serverId]; const replay = takeQueuedAgentMessageReplay(session?.queuedMessages.get(agent.id)); if (replay) { @@ -732,7 +742,7 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider }), }; - applyAuthoritativeAgentSnapshot(agent); + applyAuthoritativeAgentSnapshot(agent, { source: "live" }); }, [ applyAuthoritativeAgentSnapshot, @@ -782,10 +792,13 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider if (payload.agent) { const normalized = normalizeAgentSnapshot(payload.agent, serverId); - applyAuthoritativeAgentSnapshot({ - ...normalized, - projectPlacement: session?.agents.get(agentId)?.projectPlacement ?? null, - }); + applyAuthoritativeAgentSnapshot( + { + ...normalized, + projectPlacement: session?.agents.get(agentId)?.projectPlacement ?? null, + }, + { source: "hydrate" }, + ); } // Call pure reducer diff --git a/packages/app/src/contexts/session-queued-message-replay.test.ts b/packages/app/src/contexts/session-queued-message-replay.test.ts index 67186947b..fad4b679d 100644 --- a/packages/app/src/contexts/session-queued-message-replay.test.ts +++ b/packages/app/src/contexts/session-queued-message-replay.test.ts @@ -1,5 +1,8 @@ import { describe, expect, it } from "vitest"; -import { takeQueuedAgentMessageReplay } from "./session-queued-message-replay"; +import { + shouldAutoReplayQueuedAgentMessage, + takeQueuedAgentMessageReplay, +} from "./session-queued-message-replay"; describe("takeQueuedAgentMessageReplay", () => { it("preserves the queued message id for idempotent replay", () => { @@ -47,4 +50,30 @@ describe("takeQueuedAgentMessageReplay", () => { expect(takeQueuedAgentMessageReplay([])).toBeNull(); expect(takeQueuedAgentMessageReplay(undefined)).toBeNull(); }); + + it("only auto-replays queued messages for live running-to-idle transitions", () => { + expect( + shouldAutoReplayQueuedAgentMessage({ + previousStatus: "running", + nextStatus: "idle", + source: "live", + }), + ).toBe(true); + + expect( + shouldAutoReplayQueuedAgentMessage({ + previousStatus: "running", + nextStatus: "idle", + source: "hydrate", + }), + ).toBe(false); + + expect( + shouldAutoReplayQueuedAgentMessage({ + previousStatus: "idle", + nextStatus: "idle", + source: "live", + }), + ).toBe(false); + }); }); diff --git a/packages/app/src/contexts/session-queued-message-replay.ts b/packages/app/src/contexts/session-queued-message-replay.ts index 7b361ed90..f6d8c105c 100644 --- a/packages/app/src/contexts/session-queued-message-replay.ts +++ b/packages/app/src/contexts/session-queued-message-replay.ts @@ -13,6 +13,16 @@ export interface QueuedAgentMessageReplay { remainingQueue: QueuedAgentMessageReplayItem[]; } +export type QueuedAgentReplaySource = "hydrate" | "live"; + +export function shouldAutoReplayQueuedAgentMessage(input: { + previousStatus: string | undefined; + nextStatus: string; + source: QueuedAgentReplaySource; +}): boolean { + return input.source === "live" && input.previousStatus === "running" && input.nextStatus !== "running"; +} + export function takeQueuedAgentMessageReplay( queue: readonly QueuedAgentMessageReplayItem[] | undefined, ): QueuedAgentMessageReplay | null { diff --git a/packages/server/src/server/agent/agent-manager.test.ts b/packages/server/src/server/agent/agent-manager.test.ts index 3882a1a92..30eb814ce 100644 --- a/packages/server/src/server/agent/agent-manager.test.ts +++ b/packages/server/src/server/agent/agent-manager.test.ts @@ -1588,6 +1588,34 @@ describe("AgentManager", () => { } }); + test("notifyAgentState does not mutate updatedAt", async () => { + const workdir = mkdtempSync(join(tmpdir(), "agent-manager-test-")); + const storagePath = join(workdir, "agents"); + const storage = new AgentStorage(storagePath, logger); + const manager = new AgentManager({ + clients: { + codex: new TestAgentClient(), + }, + registry: storage, + logger, + idFactory: () => "00000000-0000-4000-8000-000000000121", + }); + + const snapshot = await manager.createAgent({ + provider: "codex", + cwd: workdir, + }); + + const before = manager.getAgent(snapshot.id); + expect(before).toBeDefined(); + + manager.notifyAgentState(snapshot.id); + + const after = manager.getAgent(snapshot.id); + expect(after).toBeDefined(); + expect(after!.updatedAt.getTime()).toBe(before!.updatedAt.getTime()); + }); + test("recordUserMessage can skip emitting agent_state when run start will emit running", async () => { const workdir = mkdtempSync(join(tmpdir(), "agent-manager-test-")); const storagePath = join(workdir, "agents"); diff --git a/packages/server/src/server/agent/agent-manager.ts b/packages/server/src/server/agent/agent-manager.ts index 1e258ec19..16800ed23 100644 --- a/packages/server/src/server/agent/agent-manager.ts +++ b/packages/server/src/server/agent/agent-manager.ts @@ -847,7 +847,6 @@ export class AgentManager { if (!agent || agent.internal) { return; } - this.touchUpdatedAt(agent); this.emitState(agent); } From 954046f67106f057d3d47a58c921e3000ffcb677 Mon Sep 17 00:00:00 2001 From: thevortex8 Date: Sat, 21 Mar 2026 19:28:45 -0400 Subject: [PATCH 4/5] fix codex stalled runs on idle thread status --- .../providers/codex-app-server-agent.test.ts | 23 ++++ .../agent/providers/codex-app-server-agent.ts | 102 ++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts b/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts index da7386c09..eaea9ba3a 100644 --- a/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts +++ b/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts @@ -118,4 +118,27 @@ describe("Codex app-server provider", () => { }), ).toBe(true); }); + + test("only synthesizes completion from idle thread status while a run is active", () => { + expect( + __codexAppServerInternals.shouldSynthesizeTurnCompletionFromThreadStatus({ + statusType: "idle", + hasActiveRun: true, + }), + ).toBe(true); + + expect( + __codexAppServerInternals.shouldSynthesizeTurnCompletionFromThreadStatus({ + statusType: "idle", + hasActiveRun: false, + }), + ).toBe(false); + + expect( + __codexAppServerInternals.shouldSynthesizeTurnCompletionFromThreadStatus({ + statusType: "running", + hasActiveRun: true, + }), + ).toBe(false); + }); }); diff --git a/packages/server/src/server/agent/providers/codex-app-server-agent.ts b/packages/server/src/server/agent/providers/codex-app-server-agent.ts index 4c21d8df4..6cd509a59 100644 --- a/packages/server/src/server/agent/providers/codex-app-server-agent.ts +++ b/packages/server/src/server/agent/providers/codex-app-server-agent.ts @@ -1195,6 +1195,18 @@ const ThreadStartedNotificationSchema = z }) .passthrough(); +const ThreadStatusChangedNotificationSchema = z + .object({ + threadId: z.string().optional(), + status: z + .object({ + type: z.string().optional(), + }) + .passthrough() + .optional(), + }) + .passthrough(); + const TurnStartedNotificationSchema = z .object({ turn: z.object({ id: z.string() }).passthrough(), @@ -1402,6 +1414,16 @@ const ItemFileChangeOutputDeltaNotificationSchema = z }) .passthrough(); +const ItemCommandExecutionOutputDeltaNotificationSchema = z + .object({ + itemId: z.string().optional(), + callId: z.string().optional(), + stream: z.string().optional(), + delta: z.string().optional(), + chunk: z.string().optional(), + }) + .passthrough(); + const CodexEventTurnDiffNotificationSchema = z .object({ msg: z @@ -1416,6 +1438,7 @@ const CodexEventTurnDiffNotificationSchema = z type ParsedCodexNotification = | { kind: "thread_started"; threadId: string } + | { kind: "thread_status_changed"; threadId: string | null; statusType: string | null } | { kind: "turn_started"; turnId: string } | { kind: "turn_completed"; status: string; errorMessage: string | null } | { kind: "plan_updated"; plan: Array<{ step: string | null; status: string | null }> } @@ -1499,6 +1522,25 @@ const CodexNotificationSchema = z.union([ params, }), ), + z + .object({ + method: z.literal("thread/status/changed"), + params: ThreadStatusChangedNotificationSchema, + }) + .transform( + ({ params }): ParsedCodexNotification => ({ + kind: "thread_status_changed", + threadId: params.threadId ?? null, + statusType: params.status?.type ?? null, + }), + ), + z.object({ method: z.literal("thread/status/changed"), params: z.unknown() }).transform( + ({ method, params }): ParsedCodexNotification => ({ + kind: "invalid_payload", + method, + params, + }), + ), z .object({ method: z.literal("turn/started"), params: TurnStartedNotificationSchema }) .transform( @@ -1872,6 +1914,28 @@ const CodexNotificationSchema = z.union([ params, }), ), + z + .object({ + method: z.literal("item/commandExecution/outputDelta"), + params: ItemCommandExecutionOutputDeltaNotificationSchema, + }) + .transform( + ({ params }): ParsedCodexNotification => ({ + kind: "exec_command_output_delta", + callId: params.callId ?? params.itemId ?? null, + stream: params.stream ?? null, + chunk: params.delta ?? params.chunk ?? null, + }), + ), + z + .object({ method: z.literal("item/commandExecution/outputDelta"), params: z.unknown() }) + .transform( + ({ method, params }): ParsedCodexNotification => ({ + kind: "invalid_payload", + method, + params, + }), + ), z .object({ method: z.literal("codex/event/turn_diff"), @@ -2035,6 +2099,7 @@ export async function codexAppServerTurnInputFromPrompt( export const __codexAppServerInternals = { mapCodexPatchNotificationToToolCall, shouldBufferCodexEventUntilTurnStarted, + shouldSynthesizeTurnCompletionFromThreadStatus, }; function isTerminalAgentStreamEvent(event: AgentStreamEvent): boolean { @@ -2057,6 +2122,13 @@ function shouldBufferCodexEventUntilTurnStarted(event: AgentStreamEvent): boolea return true; } +function shouldSynthesizeTurnCompletionFromThreadStatus(input: { + statusType: string | null; + hasActiveRun: boolean; +}): boolean { + return input.hasActiveRun && input.statusType === "idle"; +} + class CodexAppServerAgentSession implements AgentSession { readonly provider = CODEX_PROVIDER; readonly capabilities = CODEX_APP_SERVER_CAPABILITIES; @@ -2095,6 +2167,7 @@ class CodexAppServerAgentSession implements AgentSession { private warnedInvalidNotificationPayloads = new Set(); private warnedIncompleteEditToolCallIds = new Set(); private latestUsage: AgentUsage | undefined; + private runAwaitingTerminal = false; private connected = false; private collaborationModes: Array<{ name: string; @@ -2521,6 +2594,7 @@ class CodexAppServerAgentSession implements AgentSession { } await this.client.request("turn/start", params, TURN_START_TIMEOUT_MS); + this.runAwaitingTerminal = true; let sawTurnStarted = false; for await (const event of queue) { @@ -2542,6 +2616,7 @@ class CodexAppServerAgentSession implements AgentSession { } } } finally { + this.runAwaitingTerminal = false; if (this.eventQueue === queue) { this.eventQueue = null; } @@ -2875,6 +2950,31 @@ class CodexAppServerAgentSession implements AgentSession { return; } + if (parsed.kind === "thread_status_changed") { + if ( + shouldSynthesizeTurnCompletionFromThreadStatus({ + statusType: parsed.statusType, + hasActiveRun: this.runAwaitingTerminal, + }) + ) { + this.runAwaitingTerminal = false; + this.currentTurnId = null; + this.emittedItemStartedIds.clear(); + this.emittedItemCompletedIds.clear(); + this.emittedExecCommandStartedCallIds.clear(); + this.emittedExecCommandCompletedCallIds.clear(); + this.pendingCommandOutputDeltas.clear(); + this.pendingFileChangeOutputDeltas.clear(); + this.warnedIncompleteEditToolCallIds.clear(); + this.emitEvent({ + type: "turn_completed", + provider: CODEX_PROVIDER, + usage: this.latestUsage, + }); + } + return; + } + if (parsed.kind === "turn_started") { this.currentTurnId = parsed.turnId; this.emittedItemStartedIds.clear(); @@ -2889,6 +2989,8 @@ class CodexAppServerAgentSession implements AgentSession { } if (parsed.kind === "turn_completed") { + this.runAwaitingTerminal = false; + this.currentTurnId = null; if (parsed.status === "failed") { this.emitEvent({ type: "turn_failed", From 6d4b94acda99cc1bac4c9a4d8fc3e982f71773eb Mon Sep 17 00:00:00 2001 From: thevortex8 Date: Sat, 21 Mar 2026 20:25:06 -0400 Subject: [PATCH 5/5] fix duplicate prompt replay admission --- .../server/src/server/agent/agent-manager.ts | 36 ++++++++++ .../send-message-id-replay.e2e.test.ts | 70 +++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/packages/server/src/server/agent/agent-manager.ts b/packages/server/src/server/agent/agent-manager.ts index 16800ed23..b4968bab0 100644 --- a/packages/server/src/server/agent/agent-manager.ts +++ b/packages/server/src/server/agent/agent-manager.ts @@ -247,6 +247,7 @@ type LiveEventStreamingSession = AgentSession & { const DEFAULT_TIMELINE_FETCH_LIMIT = 200; const LIVE_BACKLOG_TERMINAL_REPLAY_DELAY_MS = 300; +const RECENT_USER_MESSAGE_REPLAY_WINDOW_MS = 15_000; const BUSY_STATUSES: AgentLifecycleStatus[] = ["initializing", "running"]; const AgentIdSchema = z.string().uuid(); @@ -952,9 +953,44 @@ export class AgentManager { return row.item.text === text ? "duplicate" : "conflict"; } + if (this.isRecentUserMessageReplay(agent, text)) { + return "duplicate"; + } + return "new"; } + private isRecentUserMessageReplay(agent: ManagedAgent, text: string): boolean { + let latestUserMessageRow: AgentTimelineRow | null = null; + for (let index = agent.timelineRows.length - 1; index >= 0; index -= 1) { + const row = agent.timelineRows[index]; + if (!row || row.item.type !== "user_message") { + continue; + } + latestUserMessageRow = row; + break; + } + + const latestUserMessageItem = + latestUserMessageRow?.item.type === "user_message" ? latestUserMessageRow.item : null; + if (!latestUserMessageRow || !latestUserMessageItem || latestUserMessageItem.text !== text) { + return false; + } + + if (agent.lifecycle === "running" || agent.pendingRun || agent.pendingReplacement) { + return true; + } + + const rowTimestampMs = Date.parse(latestUserMessageRow.timestamp); + const latestActivityMs = Math.max( + Number.isFinite(rowTimestampMs) ? rowTimestampMs : 0, + agent.updatedAt.getTime(), + agent.lastUserMessageAt?.getTime() ?? 0, + ); + + return Date.now() - latestActivityMs <= RECENT_USER_MESSAGE_REPLAY_WINDOW_MS; + } + async appendTimelineItem(agentId: string, item: AgentTimelineItem): Promise { const agent = this.requireAgent(agentId); this.touchUpdatedAt(agent); diff --git a/packages/server/src/server/daemon-e2e/send-message-id-replay.e2e.test.ts b/packages/server/src/server/daemon-e2e/send-message-id-replay.e2e.test.ts index 20d143f93..0f7933165 100644 --- a/packages/server/src/server/daemon-e2e/send-message-id-replay.e2e.test.ts +++ b/packages/server/src/server/daemon-e2e/send-message-id-replay.e2e.test.ts @@ -49,4 +49,74 @@ describe("send_agent_message_request replay", () => { rmSync(cwd, { recursive: true, force: true }); } }); + + test("does not start a second run when identical text is replayed with a new messageId right after finish", async () => { + const cwd = tmpCwd(); + try { + const agent = await ctx.client.createAgent({ + provider: "codex", + cwd, + title: "duplicate text replay after finish", + modeId: "full-access", + }); + + await ctx.client.sendMessage(agent.id, "Reply with exactly hello", { messageId: "msg-1" }); + await ctx.client.waitForFinish(agent.id, 5_000); + + const afterFirst = await ctx.client.fetchAgent(agent.id); + expect(afterFirst?.agent.status).toBe("idle"); + const firstUpdatedAt = afterFirst?.agent.updatedAt ?? null; + expect(firstUpdatedAt).not.toBeNull(); + + await ctx.client.sendMessage(agent.id, "Reply with exactly hello", { messageId: "msg-2" }); + await new Promise((resolve) => setTimeout(resolve, 50)); + + const afterReplay = await ctx.client.fetchAgent(agent.id); + expect(afterReplay?.agent.status).toBe("idle"); + expect(afterReplay?.agent.updatedAt ?? null).toBe(firstUpdatedAt); + + const timeline = await ctx.client.fetchAgentTimeline(agent.id, { + direction: "tail", + limit: 20, + projection: "canonical", + }); + const userMessages = timeline.entries.filter( + (entry) => entry.item.type === "user_message" && entry.item.text === "Reply with exactly hello", + ); + expect(userMessages).toHaveLength(1); + } finally { + rmSync(cwd, { recursive: true, force: true }); + } + }); + + test("does not replace an active run when identical text is resent with a new messageId", async () => { + const cwd = tmpCwd(); + try { + const agent = await ctx.client.createAgent({ + provider: "codex", + cwd, + title: "duplicate text replay while running", + modeId: "full-access", + }); + + await ctx.client.sendMessage(agent.id, "Run: sleep 30", { messageId: "msg-running-1" }); + await ctx.client.waitForAgentUpsert(agent.id, (snapshot) => snapshot.status === "running", 5_000); + + await ctx.client.sendMessage(agent.id, "Run: sleep 30", { messageId: "msg-running-2" }); + const afterReplay = await ctx.client.waitForFinish(agent.id, 5_000); + expect(afterReplay.status).toBe("idle"); + + const timeline = await ctx.client.fetchAgentTimeline(agent.id, { + direction: "tail", + limit: 20, + projection: "canonical", + }); + const userMessages = timeline.entries.filter( + (entry) => entry.item.type === "user_message" && entry.item.text === "Run: sleep 30", + ); + expect(userMessages).toHaveLength(1); + } finally { + rmSync(cwd, { recursive: true, force: true }); + } + }); });