Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions frontend/src/lib/stores/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ function mergeMessageContent(
if (deltaFragments.length > 0) {
fragments.push(...deltaFragments);
} else if (deltaText) {
fragments.push({ type: 'text', text: deltaText });
// Merge into last text fragment if possible, otherwise create new one
const lastFragment = fragments[fragments.length - 1];
if (lastFragment && lastFragment.type === 'text' && typeof lastFragment.text === 'string') {
lastFragment.text += deltaText;
} else {
fragments.push({ type: 'text', text: deltaText });
}
}

if (fragments.length === 0) {
Expand Down Expand Up @@ -542,6 +548,27 @@ function createChatStore() {
const serverMessageId =
typeof payload.message_id === 'number' ? payload.message_id : null;

// Check if tool event has multimodal content with attachments
const payloadContent = payload.content;
const currentState = get(store);
let toolContent: ChatMessageContent;
let toolText: string;
let toolAttachments: AttachmentResource[] = [];

if (Array.isArray(payloadContent)) {
// Multimodal content with potential attachments
const normalized = normalizeMessageContent(payloadContent);
toolAttachments = attachmentsFromParts(normalized.parts, currentState.sessionId);
toolContent = payloadContent;
toolText = normalized.text;
} else if (status === 'started') {
toolContent = `Running ${toolName}…`;
toolText = `Running ${toolName}…`;
} else {
toolContent = toolResult ?? `Tool ${toolName} responded.`;
toolText = toolResult ?? `Tool ${toolName} responded.`;
}

let messageId = toolMessageIds.get(callId);
if (!messageId) {
messageId = createId('tool');
Expand All @@ -563,15 +590,9 @@ function createChatStore() {
{
id: messageId as string,
role: 'tool',
content:
status === 'started'
? `Running ${toolName}…`
: toolResult ?? `Tool ${toolName} responded.`,
text:
status === 'started'
? `Running ${toolName}…`
: toolResult ?? `Tool ${toolName} responded.`,
attachments: [],
content: toolContent,
text: toolText,
attachments: toolAttachments,
pending: status === 'started',
details: {
toolName,
Expand All @@ -598,11 +619,6 @@ function createChatStore() {
serverMessageId:
serverMessageId ?? message.details?.serverMessageId ?? null,
};
const nextText =
toolResult ??
(status === 'started'
? `Running ${toolName}…`
: `Tool ${toolName} ${status}.`);
const nextCreatedAt = coalesceTimestamp(
payload.created_at,
payload.created_at_utc,
Expand All @@ -614,9 +630,9 @@ function createChatStore() {
);
return {
...message,
content: nextText,
text: nextText,
attachments: message.attachments ?? [],
content: toolContent,
text: toolText,
attachments: toolAttachments.length > 0 ? toolAttachments : message.attachments ?? [],
pending: status === 'started',
details,
createdAt: nextCreatedAt,
Expand Down
113 changes: 94 additions & 19 deletions src/backend/chat/streaming/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ async def stream_conversation(
)

total_tool_calls = 0
# Track tool attachments to inject into next assistant response
pending_tool_attachments: list[dict[str, Any]] = []

while True:
tools_available = bool(active_tools_payload)
Expand Down Expand Up @@ -222,6 +224,30 @@ async def stream_conversation(
payload.pop("tool_choice", None)

content_builder = _AssistantContentBuilder()

# Inject pending tool attachments from previous hop into this assistant response
if pending_tool_attachments:
content_builder.add_structured(pending_tool_attachments)

# Emit attachments as SSE deltas so frontend can display them
for attachment_fragment in pending_tool_attachments:
yield {
"event": "message",
"data": json.dumps({
"choices": [
{
"delta": {
"content": [attachment_fragment],
"role": "assistant",
},
"index": 0,
}
],
}),
}

pending_tool_attachments.clear()

streamed_tool_calls: list[dict[str, Any]] = []
finish_reason: str | None = None
model_name: str | None = None
Expand Down Expand Up @@ -768,18 +794,62 @@ async def stream_conversation(
if cleaned_text:
content_parts.append({"type": "text", "text": cleaned_text})

# Add image parts for each attachment
# The attachment_urls service will populate these with signed URLs later
# Add image parts for each attachment with populated URLs
for attachment_id in attachment_ids:
content_parts.append(
{
# Fetch attachment record to get signed URL
try:
attachment_record = await self._repo.get_attachment(
attachment_id
)
signed_url = ""
attachment_metadata: dict[str, Any] = {
"attachment_id": attachment_id
}

if attachment_record:
signed_url = (
attachment_record.get("signed_url")
or attachment_record.get("display_url")
or ""
)
# Include additional metadata
attachment_metadata.update(
{
"mime_type": attachment_record.get("mime_type"),
"size_bytes": attachment_record.get("size_bytes"),
"display_url": signed_url,
"delivery_url": signed_url,
}
)
# Add filename from metadata if available
record_metadata = attachment_record.get("metadata")
if isinstance(record_metadata, dict):
filename = record_metadata.get("filename")
if filename:
attachment_metadata["filename"] = filename

attachment_fragment = {
"type": "image_url",
"image_url": {
"url": ""
}, # Will be filled by attachment_urls service
"metadata": {"attachment_id": attachment_id},
"url": signed_url
},
"metadata": attachment_metadata,
}
)
content_parts.append(attachment_fragment)

# Add to pending attachments for next assistant response
pending_tool_attachments.append(attachment_fragment)
except Exception: # pragma: no cover - best effort
# If we can't fetch the attachment, include placeholder
content_parts.append(
{
"type": "image_url",
"image_url": {
"url": ""
},
"metadata": {"attachment_id": attachment_id},
}
)

tool_message = {
"role": "tool",
Expand All @@ -801,19 +871,24 @@ async def stream_conversation(
tool_message["created_at_utc"] = utc_iso
conversation_state.append(tool_message)

# Build SSE event payload - include content for frontend rendering
tool_event_data: dict[str, Any] = {
"status": status,
"name": tool_name,
"call_id": tool_id,
"result": result_text,
"message_id": tool_record_id,
"created_at": edt_iso or tool_created_at,
"created_at_utc": utc_iso or tool_created_at,
}

# If there are attachments, include the multimodal content structure
if attachment_ids:
tool_event_data["content"] = content_parts

yield {
"event": "tool",
"data": json.dumps(
{
"status": status,
"name": tool_name,
"call_id": tool_id,
"result": result_text,
"message_id": tool_record_id,
"created_at": edt_iso or tool_created_at,
"created_at_utc": utc_iso or tool_created_at,
}
),
"data": json.dumps(tool_event_data),
}

notice_reason = _classify_tool_followup(
Expand Down
15 changes: 13 additions & 2 deletions src/backend/chat/streaming/tooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
SESSION_AWARE_TOOL_NAME = "chat_history"
SESSION_AWARE_TOOL_SUFFIX = "__chat_history"

# Tools that require session_id to store attachments or access conversation state
SESSION_AWARE_TOOLS = {
"chat_history",
"download_gmail_attachment",
"read_gmail_attachment_text",
"extract_gmail_attachment_by_id",
"gdrive_display_image",
}


def summarize_tool_parameters(parameters: Mapping[str, Any] | None) -> str:
if not isinstance(parameters, Mapping):
Expand Down Expand Up @@ -121,8 +130,9 @@ def is_tool_support_error(error: OpenRouterError) -> bool:


def tool_requires_session_id(tool_name: str) -> bool:
return tool_name == SESSION_AWARE_TOOL_NAME or tool_name.endswith(
SESSION_AWARE_TOOL_SUFFIX
return (
tool_name in SESSION_AWARE_TOOLS
or tool_name.endswith(SESSION_AWARE_TOOL_SUFFIX)
)


Expand Down Expand Up @@ -216,6 +226,7 @@ def finalize_tool_calls(
__all__ = [
"SESSION_AWARE_TOOL_NAME",
"SESSION_AWARE_TOOL_SUFFIX",
"SESSION_AWARE_TOOLS",
"classify_tool_followup",
"finalize_tool_calls",
"is_tool_support_error",
Expand Down
102 changes: 102 additions & 0 deletions src/backend/mcp_servers/gdrive_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,107 @@ async def get_drive_file_permissions(
return "\n".join(lines)


@mcp.tool("gdrive_display_image")
async def display_drive_image(
file_id: str,
session_id: str,
user_email: str = DEFAULT_USER_EMAIL,
) -> str:
"""Download an image from Google Drive and display it in the chat.

CRITICAL: Call this tool whenever the user requests to see a specific image by:
- File ID from a gdrive_list_folder result
- Ordinal position ("show me the first/second/third image")
- Filename reference

ALWAYS call this tool even if other images are already visible in the conversation.
When a user says "show me the [ordinal] image" (e.g., "first", "second"), they are
referring to items from the most recent file listing, NOT images already displayed.
Each call displays a NEW image.

Args:
file_id: The Google Drive file ID from gdrive_list_folder results
session_id: The chat session ID (required to store the attachment)
user_email: The user's email address for authentication

Returns:
A message with attachment details including signed URL for display
"""
if not session_id or not session_id.strip():
return "session_id is required to display the image in chat."

service, error_msg = _get_drive_service_or_error(user_email)
if error_msg:
return error_msg
assert service is not None

# Get file metadata
try:
metadata = await asyncio.to_thread(
service.files()
.get(
fileId=file_id,
fields="id, name, mimeType, size, webViewLink",
supportsAllDrives=True,
)
.execute
)
except Exception as exc:
return f"Error retrieving metadata for file {file_id}: {exc}"

mime_type = metadata.get("mimeType", "")
file_name = metadata.get("name", "image")
file_size = metadata.get("size")

# Verify it's an image
if not mime_type.startswith("image/"):
return (
f"Error: File '{file_name}' is not an image (type: {mime_type}). "
"Only image files can be displayed. Use gdrive_get_file_content for other file types."
)

# Download the image
request = service.files().get_media(fileId=file_id)

try:
image_bytes = await _download_request_bytes(request, max_size=MAX_CONTENT_BYTES)
except ValueError as exc:
return f"Image too large: {exc}"
except Exception as exc:
return f"Error downloading image: {exc}"

# Store the image using AttachmentService
try:
from backend.services.attachments import AttachmentError, AttachmentTooLarge

attachment_service = await _get_attachment_service()
record = await attachment_service.save_bytes(
session_id=session_id,
data=image_bytes,
mime_type=mime_type,
filename_hint=file_name,
)
except AttachmentTooLarge as exc:
return f"Image rejected: {exc}"
except AttachmentError as exc:
return f"Failed to store image: {exc}"

# Extract attachment details
attachment_metadata = record.get("metadata") or {}
stored_filename = attachment_metadata.get("filename") or file_name
signed_url = record.get("signed_url") or record.get("display_url")
expires_at = record.get("expires_at") or record.get("signed_url_expires_at")

lines = [
f"Image '{stored_filename}' from Google Drive displayed in chat!",
f"attachment_id: {record.get('attachment_id')}",
f"Filename: {stored_filename}",
f"Size: {record.get('size_bytes')} bytes",
]

return "\n".join(lines)


@mcp.tool("gdrive_check_public_access")
async def check_drive_file_public_access(
file_name: str,
Expand Down Expand Up @@ -1279,5 +1380,6 @@ def run() -> None: # pragma: no cover - integration entrypoint
"rename_drive_file",
"create_drive_folder",
"get_drive_file_permissions",
"display_drive_image",
"check_drive_file_public_access",
]