Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 154 additions & 79 deletions frontend/src/lib/stores/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,57 @@ function attachmentsFromParts(
return attachments;
}

function resolveToolMessageContent(
payloadContent: unknown,
fallbackText: string,
sessionId: string | null,
): { content: ChatMessageContent; text: string; attachments: AttachmentResource[] } {
let content: ChatMessageContent = '';
let text = '';
let attachments: AttachmentResource[] = [];
let fragments: ChatContentFragment[] | null = null;

if (Array.isArray(payloadContent)) {
fragments = payloadContent as ChatContentFragment[];
content = fragments;
const normalized = normalizeMessageContent(fragments);
text = normalized.text;
attachments = attachmentsFromParts(normalized.parts, sessionId);
} else if (typeof payloadContent === 'string') {
content = payloadContent;
text = payloadContent;
} else if (payloadContent && typeof payloadContent === 'object') {
try {
const serialized = JSON.stringify(payloadContent);
content = serialized;
text = serialized;
} catch {
// Ignore serialization errors; rely on fallback text
}
}

const fallback = typeof fallbackText === 'string' ? fallbackText : '';
if ((!text || !text.trim()) && fallback) {
text = fallback;
if (!fragments && typeof content === 'string' && !content) {
content = fallback;
}
}

if (!content) {
content = fragments ?? fallback ?? '';
}

return { content, text, attachments };
}

function hasContentValue(value: ChatMessageContent): boolean {
if (typeof value === 'string') {
return value.trim().length > 0;
}
return Array.isArray(value) ? value.length > 0 : false;
}

function mergeMessageContent(
existingContent: ChatMessageContent,
existingText: string,
Expand Down Expand Up @@ -543,90 +594,114 @@ function createChatStore() {
typeof payload.message_id === 'number' ? payload.message_id : null;

let messageId = toolMessageIds.get(callId);
if (!messageId) {
messageId = createId('tool');
toolMessageIds.set(callId, messageId);
const fallbackCreatedAt = new Date().toISOString();
const createdAt = coalesceTimestamp(
payload.created_at,
payload.created_at_utc,
fallbackCreatedAt,
);
const createdAtUtc = coalesceTimestamp(
payload.created_at_utc,
createdAt ?? fallbackCreatedAt,
);
store.update((value) => ({
...value,
messages: [
...value.messages,
{
id: messageId as string,
role: 'tool',
content:
status === 'started'
? `Running ${toolName}…`
: toolResult ?? `Tool ${toolName} responded.`,
text:
status === 'started'
if (!messageId) {
messageId = createId('tool');
toolMessageIds.set(callId, messageId);
const fallbackCreatedAt = new Date().toISOString();
const createdAt = coalesceTimestamp(
payload.created_at,
payload.created_at_utc,
fallbackCreatedAt,
);
const createdAtUtc = coalesceTimestamp(
payload.created_at_utc,
createdAt ?? fallbackCreatedAt,
);
store.update((value) => {
const defaultStatusText =
typeof toolResult === 'string' && toolResult
? toolResult
: status === 'started'
? `Running ${toolName}…`
: toolResult ?? `Tool ${toolName} responded.`,
attachments: [],
pending: status === 'started',
details: {
toolName,
toolStatus: status,
toolResult: toolResult ?? null,
serverMessageId,
},
createdAt,
createdAtUtc,
},
],
}));
} else {
store.update((value) => {
const messages = value.messages.map((message) => {
if (message.id !== messageId) {
return message;
}
const details = {
...(message.details ?? {}),
toolName,
toolStatus: status,
toolResult: toolResult ?? (message.details?.toolResult ?? null),
serverMessageId:
serverMessageId ?? message.details?.serverMessageId ?? null,
};
const nextText =
toolResult ??
(status === 'started'
? `Running ${toolName}…`
: `Tool ${toolName} ${status}.`);
const nextCreatedAt = coalesceTimestamp(
payload.created_at,
payload.created_at_utc,
message.createdAt ?? null,
);
const nextCreatedAtUtc = coalesceTimestamp(
payload.created_at_utc,
message.createdAtUtc ?? null,
: `Tool ${toolName} responded.`;
const resolvedContent = resolveToolMessageContent(
payload.content,
defaultStatusText,
value.sessionId,
);
return {
...message,
content: nextText,
text: nextText,
attachments: message.attachments ?? [],
pending: status === 'started',
details,
createdAt: nextCreatedAt,
createdAtUtc:
nextCreatedAtUtc ?? nextCreatedAt ?? message.createdAtUtc ?? null,
...value,
messages: [
...value.messages,
{
id: messageId as string,
role: 'tool',
content: resolvedContent.content,
text: resolvedContent.text,
attachments: resolvedContent.attachments,
pending: status === 'started',
details: {
toolName,
toolStatus: status,
toolResult: toolResult ?? null,
serverMessageId,
},
createdAt,
createdAtUtc,
},
],
};
});
return { ...value, messages };
});
}
} else {
store.update((value) => {
const messages = value.messages.map((message) => {
if (message.id !== messageId) {
return message;
}
const details = {
...(message.details ?? {}),
toolName,
toolStatus: status,
toolResult: toolResult ?? (message.details?.toolResult ?? null),
serverMessageId:
serverMessageId ?? message.details?.serverMessageId ?? null,
};
const fallbackStatusText =
typeof toolResult === 'string' && toolResult
? toolResult
: message.text ??
(status === 'started'
? `Running ${toolName}…`
: `Tool ${toolName} ${status}.`);
const resolvedContent = resolveToolMessageContent(
payload.content ?? message.content,
fallbackStatusText,
value.sessionId,
);
const nextAttachments =
resolvedContent.attachments.length > 0
? resolvedContent.attachments
: message.attachments ?? [];
const nextContent = hasContentValue(resolvedContent.content)
? resolvedContent.content
: hasContentValue(message.content)
? message.content
: resolvedContent.text || message.text;
const nextText = resolvedContent.text || message.text;
const nextCreatedAt = coalesceTimestamp(
payload.created_at,
payload.created_at_utc,
message.createdAt ?? null,
);
const nextCreatedAtUtc = coalesceTimestamp(
payload.created_at_utc,
message.createdAtUtc ?? null,
);
return {
...message,
content: nextContent,
text: nextText,
attachments: nextAttachments,
pending: status === 'started',
details,
createdAt: nextCreatedAt,
createdAtUtc:
nextCreatedAtUtc ?? nextCreatedAt ?? message.createdAtUtc ?? null,
};
});
return { ...value, messages };
});
}
},
onDone() {
store.update((value) => {
Expand Down
64 changes: 31 additions & 33 deletions src/backend/chat/streaming/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ async def stream_conversation(
"""Yield SSE events while maintaining state and executing tools."""

hop_count = 0
settings = get_settings()
attachment_ttl = settings.attachment_signed_url_ttl
conversation_state = list(conversation)
assistant_client_message_id: str | None = None
request_metadata: dict[str, Any] | None = None
Expand Down Expand Up @@ -162,7 +164,7 @@ async def stream_conversation(
conversation_state = await refresh_message_attachments(
conversation_state,
self._repo,
ttl=get_settings().attachment_signed_url_ttl,
ttl=attachment_ttl,
)

payload = request.to_openrouter_payload(active_model)
Expand Down Expand Up @@ -748,51 +750,46 @@ async def stream_conversation(
"parent_client_message_id": assistant_client_message_id,
}

tool_record_id, tool_created_at = await self._repo.add_message(
session_id,
role="tool",
content=result_text,
tool_call_id=tool_id,
metadata=tool_metadata,
parent_client_message_id=assistant_client_message_id,
)

# Check if result contains attachment references that need conversion
cleaned_text, attachment_ids = _parse_attachment_references(result_text)

if attachment_ids:
# Convert to multimodal content with image references
content_parts: list[dict[str, Any]] = []

# Add text part if there's any cleaned text
if cleaned_text:
content_parts.append({"type": "text", "text": cleaned_text})

# Add image parts for each attachment
# The attachment_urls service will populate these with signed URLs later
for attachment_id in attachment_ids:
content_parts.append(
{
"type": "image_url",
"image_url": {
"url": ""
}, # Will be filled by attachment_urls service
"image_url": {"url": ""},
"metadata": {"attachment_id": attachment_id},
}
)

tool_message = {
"role": "tool",
"tool_call_id": tool_id,
"content": content_parts,
}
message_content: Any = content_parts
else:
# Plain text result
tool_message = {
"role": "tool",
"tool_call_id": tool_id,
"content": result_text,
}
message_content = result_text

tool_record_id, tool_created_at = await self._repo.add_message(
session_id,
role="tool",
content=message_content,
tool_call_id=tool_id,
metadata=tool_metadata,
parent_client_message_id=assistant_client_message_id,
)

tool_message = {
"role": "tool",
"tool_call_id": tool_id,
"content": message_content,
}

if attachment_ids:
hydrated = await refresh_message_attachments(
[tool_message],
self._repo,
ttl=attachment_ttl,
)
if hydrated:
tool_message = hydrated[0]

edt_iso, utc_iso = format_timestamp_for_client(tool_created_at)
if edt_iso is not None:
Expand All @@ -809,6 +806,7 @@ async def stream_conversation(
"name": tool_name,
"call_id": tool_id,
"result": result_text,
"content": tool_message.get("content"),
"message_id": tool_record_id,
"created_at": edt_iso or tool_created_at,
"created_at_utc": utc_iso or tool_created_at,
Expand Down