From a1c16c08653178b09cea4c29b47e184e1eb5b101 Mon Sep 17 00:00:00 2001 From: David Longman Date: Tue, 7 Apr 2026 08:09:13 -0600 Subject: [PATCH] fix: ensure final assistant answer renders immediately in existing threads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related races prevented the last assistant message from appearing in the UI after a turn completed. Server (codex-app-server-agent): - Codex can emit codex/event/task_complete before the buffered assistant delta has been promoted to a timeline item. Add emitBufferedAssistantMessages() and call it when turn_completed status is 'completed' and pendingAgentMessages is non-empty. Client (stream.ts): - The head→tail completion flush only appended items whose IDs were absent from tail. If tail already held a stale/partial item with the same ID the finalized version was silently dropped. Update flushHeadToTail() to replace existing tail entries when the finalized head item differs. Tests: - Regression test: emits buffered assistant text before task_complete closes the turn (server). - Regression test: replaces stale tail content with finalized head content on turn completion (client). --- packages/app/src/types/stream-event.test.ts | 31 +++++++++++++++++ packages/app/src/types/stream.ts | 26 ++++++++++++--- .../providers/codex-app-server-agent.test.ts | 33 +++++++++++++++++++ .../agent/providers/codex-app-server-agent.ts | 18 ++++++++++ 4 files changed, 103 insertions(+), 5 deletions(-) diff --git a/packages/app/src/types/stream-event.test.ts b/packages/app/src/types/stream-event.test.ts index 2185b9339..f159ec119 100644 --- a/packages/app/src/types/stream-event.test.ts +++ b/packages/app/src/types/stream-event.test.ts @@ -131,6 +131,37 @@ describe("applyStreamEvent", () => { expect(result.tail[0].kind).toBe("assistant_message"); }); + it("replaces stale tail content with finalized head content on turn completion", () => { + const result = applyStreamEvent({ + tail: [ + { + kind: "assistant_message", + id: "assistant-shared", + text: "Hello", + timestamp: new Date(0), + }, + ], + head: [ + { + kind: "assistant_message", + id: "assistant-shared", + text: "Hello world", + timestamp: new Date(1), + }, + ], + event: completionEvent(), + timestamp: baseTimestamp, + }); + + expect(result.head).toHaveLength(0); + expect(result.tail).toHaveLength(1); + expect(result.tail[0]).toMatchObject({ + kind: "assistant_message", + id: "assistant-shared", + text: "Hello world", + }); + }); + it("flushes reasoning when assistant message starts", () => { let result = applyStreamEvent({ tail: [], diff --git a/packages/app/src/types/stream.ts b/packages/app/src/types/stream.ts index 677667a2e..95a302dbe 100644 --- a/packages/app/src/types/stream.ts +++ b/packages/app/src/types/stream.ts @@ -735,14 +735,30 @@ function flushHeadToTail(tail: StreamItem[], head: StreamItem[]): StreamItem[] { } const finalized = finalizeHeadItems(head); - const tailIds = new Set(tail.map((item) => item.id)); - const newItems = finalized.filter((item) => !tailIds.has(item.id)); + const tailIndexById = new Map(tail.map((item, index) => [item.id, index])); + let nextTail = tail; - if (newItems.length === 0) { - return tail; + for (const item of finalized) { + const existingIndex = tailIndexById.get(item.id); + if (existingIndex === undefined) { + if (nextTail === tail) { + nextTail = [...tail]; + } + nextTail.push(item); + tailIndexById.set(item.id, nextTail.length - 1); + continue; + } + + const existing = nextTail[existingIndex]; + if (existing !== item) { + if (nextTail === tail) { + nextTail = [...tail]; + } + nextTail[existingIndex] = item; + } } - return [...tail, ...newItems]; + return nextTail; } /** diff --git a/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts b/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts index 1faf6e213..1e70abc7f 100644 --- a/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts +++ b/packages/server/src/server/agent/providers/codex-app-server-agent.test.ts @@ -345,4 +345,37 @@ describe("Codex app-server provider", () => { }, }); }); + + test("emits buffered assistant text before task_complete closes the turn", () => { + const session = createSession(); + const events: AgentStreamEvent[] = []; + session.subscribe((event) => events.push(event)); + + ;(session as any).handleNotification("item/agentMessage/delta", { + itemId: "msg-late-final", + delta: "COMPLEX_REPRO_OK", + }); + + ;(session as any).handleNotification("codex/event/task_complete", { + msg: { type: "task_complete" }, + }); + + expect(events).toEqual([ + { + type: "timeline", + provider: "codex", + turnId: "test-turn", + item: { + type: "assistant_message", + text: "COMPLEX_REPRO_OK", + }, + }, + { + type: "turn_completed", + provider: "codex", + turnId: "test-turn", + usage: undefined, + }, + ]); + }); }); diff --git a/packages/server/src/server/agent/providers/codex-app-server-agent.ts b/packages/server/src/server/agent/providers/codex-app-server-agent.ts index 1b49b62df..dd4dc9bed 100644 --- a/packages/server/src/server/agent/providers/codex-app-server-agent.ts +++ b/packages/server/src/server/agent/providers/codex-app-server-agent.ts @@ -3174,6 +3174,21 @@ class CodexAppServerAgentSession implements AgentSession { } } + private emitBufferedAssistantMessages(): void { + for (const [itemId, text] of this.pendingAgentMessages.entries()) { + if (!text) { + continue; + } + this.emitEvent({ + type: "timeline", + provider: CODEX_PROVIDER, + item: { type: "assistant_message", text }, + }); + this.emittedItemCompletedIds.add(itemId); + } + this.pendingAgentMessages.clear(); + } + private createTurnId(): string { return `codex-turn-${this.nextTurnOrdinal++}`; } @@ -3205,6 +3220,9 @@ class CodexAppServerAgentSession implements AgentSession { } if (parsed.kind === "turn_completed") { + if (parsed.status === "completed" && this.pendingAgentMessages.size > 0) { + this.emitBufferedAssistantMessages(); + } if (parsed.status === "failed") { this.emitEvent({ type: "turn_failed",