Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions packages/app/src/types/stream-event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,37 @@ describe("applyStreamEvent", () => {
expect(result.tail[0].kind).toBe("assistant_message");
});

it("replaces stale tail content with finalized head content on turn completion", () => {
const result = applyStreamEvent({
tail: [
{
kind: "assistant_message",
id: "assistant-shared",
text: "Hello",
timestamp: new Date(0),
},
],
head: [
{
kind: "assistant_message",
id: "assistant-shared",
text: "Hello world",
timestamp: new Date(1),
},
],
event: completionEvent(),
timestamp: baseTimestamp,
});

expect(result.head).toHaveLength(0);
expect(result.tail).toHaveLength(1);
expect(result.tail[0]).toMatchObject({
kind: "assistant_message",
id: "assistant-shared",
text: "Hello world",
});
});

it("flushes reasoning when assistant message starts", () => {
let result = applyStreamEvent({
tail: [],
Expand Down
26 changes: 21 additions & 5 deletions packages/app/src/types/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -735,14 +735,30 @@ function flushHeadToTail(tail: StreamItem[], head: StreamItem[]): StreamItem[] {
}

const finalized = finalizeHeadItems(head);
const tailIds = new Set(tail.map((item) => item.id));
const newItems = finalized.filter((item) => !tailIds.has(item.id));
const tailIndexById = new Map(tail.map((item, index) => [item.id, index]));
let nextTail = tail;

if (newItems.length === 0) {
return tail;
for (const item of finalized) {
const existingIndex = tailIndexById.get(item.id);
if (existingIndex === undefined) {
if (nextTail === tail) {
nextTail = [...tail];
}
nextTail.push(item);
tailIndexById.set(item.id, nextTail.length - 1);
continue;
}

const existing = nextTail[existingIndex];
if (existing !== item) {
if (nextTail === tail) {
nextTail = [...tail];
}
nextTail[existingIndex] = item;
}
}

return [...tail, ...newItems];
return nextTail;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,4 +345,37 @@ describe("Codex app-server provider", () => {
},
});
});

test("emits buffered assistant text before task_complete closes the turn", () => {
const session = createSession();
const events: AgentStreamEvent[] = [];
session.subscribe((event) => events.push(event));

;(session as any).handleNotification("item/agentMessage/delta", {
itemId: "msg-late-final",
delta: "COMPLEX_REPRO_OK",
});

;(session as any).handleNotification("codex/event/task_complete", {
msg: { type: "task_complete" },
});

expect(events).toEqual([
{
type: "timeline",
provider: "codex",
turnId: "test-turn",
item: {
type: "assistant_message",
text: "COMPLEX_REPRO_OK",
},
},
{
type: "turn_completed",
provider: "codex",
turnId: "test-turn",
usage: undefined,
},
]);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -3174,6 +3174,21 @@ class CodexAppServerAgentSession implements AgentSession {
}
}

private emitBufferedAssistantMessages(): void {
for (const [itemId, text] of this.pendingAgentMessages.entries()) {
if (!text) {
continue;
}
this.emitEvent({
type: "timeline",
provider: CODEX_PROVIDER,
item: { type: "assistant_message", text },
});
this.emittedItemCompletedIds.add(itemId);
}
this.pendingAgentMessages.clear();
Comment on lines +3177 to +3189
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emitBufferedAssistantMessages() calls this.emitEvent() for each buffered entry, but emitEvent() clears pendingAgentMessages whenever it emits a timeline assistant_message. That means the first emitted buffered message will clear the map during iteration and can prevent remaining buffered messages from being emitted. Consider snapshotting entries (e.g., const entries = [...pendingAgentMessages.entries()]), clearing the map before emitting, or emitting via notifySubscribers() without triggering the emitEvent() side effect so all buffered messages reliably flush.

Copilot uses AI. Check for mistakes.
}

private createTurnId(): string {
return `codex-turn-${this.nextTurnOrdinal++}`;
}
Expand Down Expand Up @@ -3205,6 +3220,9 @@ class CodexAppServerAgentSession implements AgentSession {
}

if (parsed.kind === "turn_completed") {
if (parsed.status === "completed" && this.pendingAgentMessages.size > 0) {
this.emitBufferedAssistantMessages();
}
if (parsed.status === "failed") {
this.emitEvent({
type: "turn_failed",
Expand Down
Loading