Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 45 additions & 17 deletions packages/app/src/contexts/session-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ import { derivePendingPermissionKey, normalizeAgentSnapshot } from "@/utils/agen
import { resolveProjectPlacement } from "@/utils/project-placement";
import { buildDraftStoreKey } from "@/stores/draft-keys";
import type { AttachmentMetadata } from "@/attachments/types";
import {
shouldAutoReplayQueuedAgentMessage,
takeQueuedAgentMessageReplay,
type QueuedAgentReplaySource,
} from "@/contexts/session-queued-message-replay";

// Re-export types from session-store and draft-store for backward compatibility
export type { DraftInput } from "@/stores/draft-store";
Expand Down Expand Up @@ -272,7 +277,12 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider

const previousAgentStatusRef = useRef<Map<string, AgentLifecycleStatus>>(new Map());
const sendAgentMessageRef = useRef<
((agentId: string, message: string, images?: AttachmentMetadata[]) => Promise<void>) | null
((
agentId: string,
message: string,
images?: AttachmentMetadata[],
messageId?: string,
) => Promise<void>) | null
>(null);
const sessionStateTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const attentionNotifiedRef = useRef<Map<string, number>>(new Map());
Expand Down Expand Up @@ -349,7 +359,7 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider
);

const applyAuthoritativeAgentSnapshot = useCallback(
(agent: Agent) => {
(agent: Agent, options?: { source?: QueuedAgentReplaySource }) => {
setAgents(serverId, (prev) => {
const current = prev.get(agent.id);
if (current && agent.updatedAt.getTime() < current.updatedAt.getTime()) {
Expand Down Expand Up @@ -423,17 +433,27 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider
});

const prevStatus = previousAgentStatusRef.current.get(agent.id);
if (prevStatus === "running" && agent.status !== "running") {
if (
shouldAutoReplayQueuedAgentMessage({
previousStatus: prevStatus,
nextStatus: agent.status,
source: options?.source ?? "live",
})
) {
const session = useSessionStore.getState().sessions[serverId];
const queue = session?.queuedMessages.get(agent.id);
if (queue && queue.length > 0) {
const [next, ...rest] = queue;
const replay = takeQueuedAgentMessageReplay(session?.queuedMessages.get(agent.id));
if (replay) {
if (sendAgentMessageRef.current) {
void sendAgentMessageRef.current(agent.id, next.text, next.images);
void sendAgentMessageRef.current(
agent.id,
replay.text,
replay.images,
replay.messageId,
);
}
setQueuedMessages(serverId, (prev) => {
const updated = new Map(prev);
updated.set(agent.id, rest);
updated.set(agent.id, replay.remainingQueue);
return updated;
});
}
Expand Down Expand Up @@ -722,7 +742,7 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider
}),
};

applyAuthoritativeAgentSnapshot(agent);
applyAuthoritativeAgentSnapshot(agent, { source: "live" });
},
[
applyAuthoritativeAgentSnapshot,
Expand Down Expand Up @@ -772,10 +792,13 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider

if (payload.agent) {
const normalized = normalizeAgentSnapshot(payload.agent, serverId);
applyAuthoritativeAgentSnapshot({
...normalized,
projectPlacement: session?.agents.get(agentId)?.projectPlacement ?? null,
});
applyAuthoritativeAgentSnapshot(
{
...normalized,
projectPlacement: session?.agents.get(agentId)?.projectPlacement ?? null,
},
{ source: "hydrate" },
);
}

// Call pure reducer
Expand Down Expand Up @@ -1469,11 +1492,16 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider
]);

const sendAgentMessage = useCallback(
async (agentId: string, message: string, images?: AttachmentMetadata[]) => {
const messageId = generateMessageId();
async (
agentId: string,
message: string,
images?: AttachmentMetadata[],
messageId?: string,
) => {
const resolvedMessageId = messageId ?? generateMessageId();
const userMessage: StreamItem = {
kind: "user_message",
id: messageId,
id: resolvedMessageId,
text: message,
timestamp: new Date(),
};
Expand Down Expand Up @@ -1507,7 +1535,7 @@ function SessionProviderInternal({ children, serverId, client }: SessionProvider
}
void client
.sendAgentMessage(agentId, message, {
messageId,
messageId: resolvedMessageId,
...(imagesData && imagesData.length > 0 ? { images: imagesData } : {}),
})
.catch((error) => {
Expand Down
79 changes: 79 additions & 0 deletions packages/app/src/contexts/session-queued-message-replay.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { describe, expect, it } from "vitest";
import {
shouldAutoReplayQueuedAgentMessage,
takeQueuedAgentMessageReplay,
} from "./session-queued-message-replay";

describe("takeQueuedAgentMessageReplay", () => {
it("preserves the queued message id for idempotent replay", () => {
const replay = takeQueuedAgentMessageReplay([
{
id: "msg-1",
text: "resume this",
images: [
{
id: "img-1",
mimeType: "image/png",
storageType: "web-indexeddb",
storageKey: "attachment:img-1",
fileName: "test.png",
byteSize: 12,
createdAt: 1,
},
],
},
{
id: "msg-2",
text: "second",
},
]);

expect(replay).toEqual({
messageId: "msg-1",
text: "resume this",
images: [
{
id: "img-1",
mimeType: "image/png",
storageType: "web-indexeddb",
storageKey: "attachment:img-1",
fileName: "test.png",
byteSize: 12,
createdAt: 1,
},
],
remainingQueue: [{ id: "msg-2", text: "second" }],
});
});

it("returns null when the queue is empty", () => {
expect(takeQueuedAgentMessageReplay([])).toBeNull();
expect(takeQueuedAgentMessageReplay(undefined)).toBeNull();
});

it("only auto-replays queued messages for live running-to-idle transitions", () => {
expect(
shouldAutoReplayQueuedAgentMessage({
previousStatus: "running",
nextStatus: "idle",
source: "live",
}),
).toBe(true);

expect(
shouldAutoReplayQueuedAgentMessage({
previousStatus: "running",
nextStatus: "idle",
source: "hydrate",
}),
).toBe(false);

expect(
shouldAutoReplayQueuedAgentMessage({
previousStatus: "idle",
nextStatus: "idle",
source: "live",
}),
).toBe(false);
});
});
40 changes: 40 additions & 0 deletions packages/app/src/contexts/session-queued-message-replay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type { AttachmentMetadata } from "@/attachments/types";

export interface QueuedAgentMessageReplayItem {
id: string;
text: string;
images?: AttachmentMetadata[];
}

export interface QueuedAgentMessageReplay {
messageId: string;
text: string;
images?: AttachmentMetadata[];
remainingQueue: QueuedAgentMessageReplayItem[];
}

export type QueuedAgentReplaySource = "hydrate" | "live";

export function shouldAutoReplayQueuedAgentMessage(input: {
previousStatus: string | undefined;
nextStatus: string;
source: QueuedAgentReplaySource;
}): boolean {
return input.source === "live" && input.previousStatus === "running" && input.nextStatus !== "running";
}

export function takeQueuedAgentMessageReplay(
queue: readonly QueuedAgentMessageReplayItem[] | undefined,
): QueuedAgentMessageReplay | null {
if (!queue || queue.length === 0) {
return null;
}

const [next, ...remainingQueue] = queue;
return {
messageId: next.id,
text: next.text,
images: next.images,
remainingQueue,
};
}
28 changes: 28 additions & 0 deletions packages/server/src/server/agent/agent-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,34 @@ describe("AgentManager", () => {
}
});

test("notifyAgentState does not mutate updatedAt", async () => {
const workdir = mkdtempSync(join(tmpdir(), "agent-manager-test-"));
const storagePath = join(workdir, "agents");
const storage = new AgentStorage(storagePath, logger);
const manager = new AgentManager({
clients: {
codex: new TestAgentClient(),
},
registry: storage,
logger,
idFactory: () => "00000000-0000-4000-8000-000000000121",
});

const snapshot = await manager.createAgent({
provider: "codex",
cwd: workdir,
});

const before = manager.getAgent(snapshot.id);
expect(before).toBeDefined();

manager.notifyAgentState(snapshot.id);

const after = manager.getAgent(snapshot.id);
expect(after).toBeDefined();
expect(after!.updatedAt.getTime()).toBe(before!.updatedAt.getTime());
});

test("recordUserMessage can skip emitting agent_state when run start will emit running", async () => {
const workdir = mkdtempSync(join(tmpdir(), "agent-manager-test-"));
const storagePath = join(workdir, "agents");
Expand Down
62 changes: 61 additions & 1 deletion packages/server/src/server/agent/agent-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ type LiveEventStreamingSession = AgentSession & {

const DEFAULT_TIMELINE_FETCH_LIMIT = 200;
const LIVE_BACKLOG_TERMINAL_REPLAY_DELAY_MS = 300;
const RECENT_USER_MESSAGE_REPLAY_WINDOW_MS = 15_000;
const BUSY_STATUSES: AgentLifecycleStatus[] = ["initializing", "running"];
const AgentIdSchema = z.string().uuid();

Expand Down Expand Up @@ -847,7 +848,6 @@ export class AgentManager {
if (!agent || agent.internal) {
return;
}
this.touchUpdatedAt(agent);
this.emitState(agent);
}

Expand Down Expand Up @@ -931,6 +931,66 @@ export class AgentManager {
}
}

classifyRecordedUserMessage(
agentId: string,
text: string,
options?: { messageId?: string },
): "new" | "duplicate" | "conflict" {
const normalizedMessageId = normalizeMessageId(options?.messageId);
if (!normalizedMessageId) {
return "new";
}

const agent = this.requireAgent(agentId);
for (const row of agent.timelineRows) {
if (row.item.type !== "user_message") {
continue;
}
const rowMessageId = normalizeMessageId(row.item.messageId);
if (rowMessageId !== normalizedMessageId) {
continue;
}
return row.item.text === text ? "duplicate" : "conflict";
}

if (this.isRecentUserMessageReplay(agent, text)) {
return "duplicate";
}

return "new";
}

private isRecentUserMessageReplay(agent: ManagedAgent, text: string): boolean {
let latestUserMessageRow: AgentTimelineRow | null = null;
for (let index = agent.timelineRows.length - 1; index >= 0; index -= 1) {
const row = agent.timelineRows[index];
if (!row || row.item.type !== "user_message") {
continue;
}
latestUserMessageRow = row;
break;
}

const latestUserMessageItem =
latestUserMessageRow?.item.type === "user_message" ? latestUserMessageRow.item : null;
if (!latestUserMessageRow || !latestUserMessageItem || latestUserMessageItem.text !== text) {
return false;
}

if (agent.lifecycle === "running" || agent.pendingRun || agent.pendingReplacement) {
return true;
}

const rowTimestampMs = Date.parse(latestUserMessageRow.timestamp);
const latestActivityMs = Math.max(
Number.isFinite(rowTimestampMs) ? rowTimestampMs : 0,
agent.updatedAt.getTime(),
agent.lastUserMessageAt?.getTime() ?? 0,
);

return Date.now() - latestActivityMs <= RECENT_USER_MESSAGE_REPLAY_WINDOW_MS;
}

async appendTimelineItem(agentId: string, item: AgentTimelineItem): Promise<void> {
const agent = this.requireAgent(agentId);
this.touchUpdatedAt(agent);
Expand Down
Loading
Loading