diff --git a/frontend/src/lib/stores/chat.ts b/frontend/src/lib/stores/chat.ts index aa672d7..8701195 100644 --- a/frontend/src/lib/stores/chat.ts +++ b/frontend/src/lib/stores/chat.ts @@ -179,7 +179,13 @@ function mergeMessageContent( if (deltaFragments.length > 0) { fragments.push(...deltaFragments); } else if (deltaText) { - fragments.push({ type: 'text', text: deltaText }); + // Merge into last text fragment if possible, otherwise create new one + const lastFragment = fragments[fragments.length - 1]; + if (lastFragment && lastFragment.type === 'text' && typeof lastFragment.text === 'string') { + lastFragment.text += deltaText; + } else { + fragments.push({ type: 'text', text: deltaText }); + } } if (fragments.length === 0) { @@ -542,6 +548,27 @@ function createChatStore() { const serverMessageId = typeof payload.message_id === 'number' ? payload.message_id : null; + // Check if tool event has multimodal content with attachments + const payloadContent = payload.content; + const currentState = get(store); + let toolContent: ChatMessageContent; + let toolText: string; + let toolAttachments: AttachmentResource[] = []; + + if (Array.isArray(payloadContent)) { + // Multimodal content with potential attachments + const normalized = normalizeMessageContent(payloadContent); + toolAttachments = attachmentsFromParts(normalized.parts, currentState.sessionId); + toolContent = payloadContent; + toolText = normalized.text; + } else if (status === 'started') { + toolContent = `Running ${toolName}…`; + toolText = `Running ${toolName}…`; + } else { + toolContent = toolResult ?? `Tool ${toolName} responded.`; + toolText = toolResult ?? `Tool ${toolName} responded.`; + } + let messageId = toolMessageIds.get(callId); if (!messageId) { messageId = createId('tool'); @@ -563,15 +590,9 @@ function createChatStore() { { id: messageId as string, role: 'tool', - content: - status === 'started' - ? `Running ${toolName}…` - : toolResult ?? `Tool ${toolName} responded.`, - text: - status === 'started' - ? `Running ${toolName}…` - : toolResult ?? `Tool ${toolName} responded.`, - attachments: [], + content: toolContent, + text: toolText, + attachments: toolAttachments, pending: status === 'started', details: { toolName, @@ -598,11 +619,6 @@ function createChatStore() { serverMessageId: serverMessageId ?? message.details?.serverMessageId ?? null, }; - const nextText = - toolResult ?? - (status === 'started' - ? `Running ${toolName}…` - : `Tool ${toolName} ${status}.`); const nextCreatedAt = coalesceTimestamp( payload.created_at, payload.created_at_utc, @@ -614,9 +630,9 @@ function createChatStore() { ); return { ...message, - content: nextText, - text: nextText, - attachments: message.attachments ?? [], + content: toolContent, + text: toolText, + attachments: toolAttachments.length > 0 ? toolAttachments : message.attachments ?? [], pending: status === 'started', details, createdAt: nextCreatedAt, diff --git a/src/backend/chat/streaming/handler.py b/src/backend/chat/streaming/handler.py index 9c1283d..ed5f969 100644 --- a/src/backend/chat/streaming/handler.py +++ b/src/backend/chat/streaming/handler.py @@ -140,6 +140,8 @@ async def stream_conversation( ) total_tool_calls = 0 + # Track tool attachments to inject into next assistant response + pending_tool_attachments: list[dict[str, Any]] = [] while True: tools_available = bool(active_tools_payload) @@ -222,6 +224,30 @@ async def stream_conversation( payload.pop("tool_choice", None) content_builder = _AssistantContentBuilder() + + # Inject pending tool attachments from previous hop into this assistant response + if pending_tool_attachments: + content_builder.add_structured(pending_tool_attachments) + + # Emit attachments as SSE deltas so frontend can display them + for attachment_fragment in pending_tool_attachments: + yield { + "event": "message", + "data": json.dumps({ + "choices": [ + { + "delta": { + "content": [attachment_fragment], + "role": "assistant", + }, + "index": 0, + } + ], + }), + } + + pending_tool_attachments.clear() + streamed_tool_calls: list[dict[str, Any]] = [] finish_reason: str | None = None model_name: str | None = None @@ -768,18 +794,62 @@ async def stream_conversation( if cleaned_text: content_parts.append({"type": "text", "text": cleaned_text}) - # Add image parts for each attachment - # The attachment_urls service will populate these with signed URLs later + # Add image parts for each attachment with populated URLs for attachment_id in attachment_ids: - content_parts.append( - { + # Fetch attachment record to get signed URL + try: + attachment_record = await self._repo.get_attachment( + attachment_id + ) + signed_url = "" + attachment_metadata: dict[str, Any] = { + "attachment_id": attachment_id + } + + if attachment_record: + signed_url = ( + attachment_record.get("signed_url") + or attachment_record.get("display_url") + or "" + ) + # Include additional metadata + attachment_metadata.update( + { + "mime_type": attachment_record.get("mime_type"), + "size_bytes": attachment_record.get("size_bytes"), + "display_url": signed_url, + "delivery_url": signed_url, + } + ) + # Add filename from metadata if available + record_metadata = attachment_record.get("metadata") + if isinstance(record_metadata, dict): + filename = record_metadata.get("filename") + if filename: + attachment_metadata["filename"] = filename + + attachment_fragment = { "type": "image_url", "image_url": { - "url": "" - }, # Will be filled by attachment_urls service - "metadata": {"attachment_id": attachment_id}, + "url": signed_url + }, + "metadata": attachment_metadata, } - ) + content_parts.append(attachment_fragment) + + # Add to pending attachments for next assistant response + pending_tool_attachments.append(attachment_fragment) + except Exception: # pragma: no cover - best effort + # If we can't fetch the attachment, include placeholder + content_parts.append( + { + "type": "image_url", + "image_url": { + "url": "" + }, + "metadata": {"attachment_id": attachment_id}, + } + ) tool_message = { "role": "tool", @@ -801,19 +871,24 @@ async def stream_conversation( tool_message["created_at_utc"] = utc_iso conversation_state.append(tool_message) + # Build SSE event payload - include content for frontend rendering + tool_event_data: dict[str, Any] = { + "status": status, + "name": tool_name, + "call_id": tool_id, + "result": result_text, + "message_id": tool_record_id, + "created_at": edt_iso or tool_created_at, + "created_at_utc": utc_iso or tool_created_at, + } + + # If there are attachments, include the multimodal content structure + if attachment_ids: + tool_event_data["content"] = content_parts + yield { "event": "tool", - "data": json.dumps( - { - "status": status, - "name": tool_name, - "call_id": tool_id, - "result": result_text, - "message_id": tool_record_id, - "created_at": edt_iso or tool_created_at, - "created_at_utc": utc_iso or tool_created_at, - } - ), + "data": json.dumps(tool_event_data), } notice_reason = _classify_tool_followup( diff --git a/src/backend/chat/streaming/tooling.py b/src/backend/chat/streaming/tooling.py index 05888c4..e148cbc 100644 --- a/src/backend/chat/streaming/tooling.py +++ b/src/backend/chat/streaming/tooling.py @@ -10,6 +10,15 @@ SESSION_AWARE_TOOL_NAME = "chat_history" SESSION_AWARE_TOOL_SUFFIX = "__chat_history" +# Tools that require session_id to store attachments or access conversation state +SESSION_AWARE_TOOLS = { + "chat_history", + "download_gmail_attachment", + "read_gmail_attachment_text", + "extract_gmail_attachment_by_id", + "gdrive_display_image", +} + def summarize_tool_parameters(parameters: Mapping[str, Any] | None) -> str: if not isinstance(parameters, Mapping): @@ -121,8 +130,9 @@ def is_tool_support_error(error: OpenRouterError) -> bool: def tool_requires_session_id(tool_name: str) -> bool: - return tool_name == SESSION_AWARE_TOOL_NAME or tool_name.endswith( - SESSION_AWARE_TOOL_SUFFIX + return ( + tool_name in SESSION_AWARE_TOOLS + or tool_name.endswith(SESSION_AWARE_TOOL_SUFFIX) ) @@ -216,6 +226,7 @@ def finalize_tool_calls( __all__ = [ "SESSION_AWARE_TOOL_NAME", "SESSION_AWARE_TOOL_SUFFIX", + "SESSION_AWARE_TOOLS", "classify_tool_followup", "finalize_tool_calls", "is_tool_support_error", diff --git a/src/backend/mcp_servers/gdrive_server.py b/src/backend/mcp_servers/gdrive_server.py index 8068788..e8e28e5 100644 --- a/src/backend/mcp_servers/gdrive_server.py +++ b/src/backend/mcp_servers/gdrive_server.py @@ -1173,6 +1173,107 @@ async def get_drive_file_permissions( return "\n".join(lines) +@mcp.tool("gdrive_display_image") +async def display_drive_image( + file_id: str, + session_id: str, + user_email: str = DEFAULT_USER_EMAIL, +) -> str: + """Download an image from Google Drive and display it in the chat. + + CRITICAL: Call this tool whenever the user requests to see a specific image by: + - File ID from a gdrive_list_folder result + - Ordinal position ("show me the first/second/third image") + - Filename reference + + ALWAYS call this tool even if other images are already visible in the conversation. + When a user says "show me the [ordinal] image" (e.g., "first", "second"), they are + referring to items from the most recent file listing, NOT images already displayed. + Each call displays a NEW image. + + Args: + file_id: The Google Drive file ID from gdrive_list_folder results + session_id: The chat session ID (required to store the attachment) + user_email: The user's email address for authentication + + Returns: + A message with attachment details including signed URL for display + """ + if not session_id or not session_id.strip(): + return "session_id is required to display the image in chat." + + service, error_msg = _get_drive_service_or_error(user_email) + if error_msg: + return error_msg + assert service is not None + + # Get file metadata + try: + metadata = await asyncio.to_thread( + service.files() + .get( + fileId=file_id, + fields="id, name, mimeType, size, webViewLink", + supportsAllDrives=True, + ) + .execute + ) + except Exception as exc: + return f"Error retrieving metadata for file {file_id}: {exc}" + + mime_type = metadata.get("mimeType", "") + file_name = metadata.get("name", "image") + file_size = metadata.get("size") + + # Verify it's an image + if not mime_type.startswith("image/"): + return ( + f"Error: File '{file_name}' is not an image (type: {mime_type}). " + "Only image files can be displayed. Use gdrive_get_file_content for other file types." + ) + + # Download the image + request = service.files().get_media(fileId=file_id) + + try: + image_bytes = await _download_request_bytes(request, max_size=MAX_CONTENT_BYTES) + except ValueError as exc: + return f"Image too large: {exc}" + except Exception as exc: + return f"Error downloading image: {exc}" + + # Store the image using AttachmentService + try: + from backend.services.attachments import AttachmentError, AttachmentTooLarge + + attachment_service = await _get_attachment_service() + record = await attachment_service.save_bytes( + session_id=session_id, + data=image_bytes, + mime_type=mime_type, + filename_hint=file_name, + ) + except AttachmentTooLarge as exc: + return f"Image rejected: {exc}" + except AttachmentError as exc: + return f"Failed to store image: {exc}" + + # Extract attachment details + attachment_metadata = record.get("metadata") or {} + stored_filename = attachment_metadata.get("filename") or file_name + signed_url = record.get("signed_url") or record.get("display_url") + expires_at = record.get("expires_at") or record.get("signed_url_expires_at") + + lines = [ + f"Image '{stored_filename}' from Google Drive displayed in chat!", + f"attachment_id: {record.get('attachment_id')}", + f"Filename: {stored_filename}", + f"Size: {record.get('size_bytes')} bytes", + ] + + return "\n".join(lines) + + @mcp.tool("gdrive_check_public_access") async def check_drive_file_public_access( file_name: str, @@ -1279,5 +1380,6 @@ def run() -> None: # pragma: no cover - integration entrypoint "rename_drive_file", "create_drive_folder", "get_drive_file_permissions", + "display_drive_image", "check_drive_file_public_access", ]